J'essaie d'exécuter un exemple de compte de mots intégrant AWS Kinesis stream et Apache Spark. Des lignes aléatoires sont mises en Kinesis à intervalles réguliers.Comment imprimer PythonTransformedDStream
lines = KinesisUtils.createStream(...)
Quand je soumets ma demande, lines.pprint()
je ne vois pas de valeurs imprimées.
essayé d'imprimer l'objet lines
et je vois <pyspark.streaming.dstream.TransformedDStream object at 0x7fa235724950>
Comment imprimer l'objet PythonTransformedDStream
? et vérifiez si les données sont reçues.
Je suis sûr qu'il n'y a pas de problème d'identification, si j'utilise de fausses informations d'identification, je reçois une exception d'accès.
Ajouté le code de référence
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
if __name__ == "__main__":
sc = SparkContext(appName="SparkKinesisApp")
ssc = StreamingContext(sc, 1)
lines = KinesisUtils.createStream(ssc, "SparkKinesisApp", "myStream", "https://kinesis.us-east-1.amazonaws.com","us-east-1", InitialPositionInStream.LATEST, 2)
# lines.saveAsTextFiles('/home/ubuntu/logs/out.txt')
lines.pprint()
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Je l'ai déjà essayé le mot réseau programme de comptage et 'pprint' travaille pour cela, donc je suppose que, l'environnement est correctement configuré. De plus, les deux lignes mentionnées sont disponibles à la fin de mon code. Le programme s'exécute jusqu'à ce que j'appuie sur ctrl + c. – ArunDhaJ
@ArunDhaJ - avez-vous installé le serveur netcat (http://landoflinux.com/linux_netcat_command.html) et l'exécuter en utilisant $ nc -lk 9999'? Avez-vous entré des mots dans la console netcat qui seront entrés dans votre programme de diffusion d'étincelles? – Yaron
J'ai essayé le programme de comptage de mots réseau avec 'nc' et l'ai exécuté avec succès. Je ne suis confronté à des problèmes avec l'intégration d'Amazon Kinesis. Je publie des phrases aléatoires sur un flux de kinésis, mais mon client spark ne le sélectionne pas et ne le traite pas. – ArunDhaJ