2017-01-25 3 views
0

J'essaie d'exécuter un exemple de compte de mots intégrant AWS Kinesis stream et Apache Spark. Des lignes aléatoires sont mises en Kinesis à intervalles réguliers.Comment imprimer PythonTransformedDStream

lines = KinesisUtils.createStream(...) 

Quand je soumets ma demande, lines.pprint() je ne vois pas de valeurs imprimées.

essayé d'imprimer l'objet lines et je vois <pyspark.streaming.dstream.TransformedDStream object at 0x7fa235724950>

Comment imprimer l'objet PythonTransformedDStream? et vérifiez si les données sont reçues.

Je suis sûr qu'il n'y a pas de problème d'identification, si j'utilise de fausses informations d'identification, je reçois une exception d'accès.

Ajouté le code de référence

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream 

if __name__ == "__main__": 
    sc = SparkContext(appName="SparkKinesisApp") 
    ssc = StreamingContext(sc, 1) 

    lines = KinesisUtils.createStream(ssc, "SparkKinesisApp", "myStream", "https://kinesis.us-east-1.amazonaws.com","us-east-1", InitialPositionInStream.LATEST, 2) 

    # lines.saveAsTextFiles('/home/ubuntu/logs/out.txt') 
    lines.pprint() 

    counts = lines.flatMap(lambda line: line.split(" ")) 
          .map(lambda word: (word, 1)) 
          .reduceByKey(lambda a, b: a + b) 

    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

Répondre

0

Enfin je l'ai eu fonctionnant.

L'exemple de code que j'ai référencé sur https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py a une mauvaise commande pour soumettre une demande.

La commande correcte avec laquelle je l'ai travail est

$ bin/spark-submit --jars external/spark-streaming-kinesis-asl_2.11-2.1.0.jar --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0 /home/ubuntu/my_pyspark/spark_kinesis.py 
2

Depuis lines.pprint() n'imprime rien, pouvez-vous s'il vous plaît confirmer que vous exécutez:

ssc.start() 
ssc.awaitTermination() 

comme mentionné dans l'exemple ici: https://github.com/apache/spark/blob/v2.1.0/examples/src/main/python/streaming/network_wordcount.py

pprint() devrait fonctionner lorsque l'environnement est configuré correctement:

http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#output-operations-on-dstreams

opérations de sortie sur DStreams

print() - Affiche les dix premiers éléments de chaque lot de données dans un DSTREAM sur le noeud de pilote exécutant l'application de transmission en continu. Ce est utile pour le développement et le débogage. API Python Ceci est appelé pprint() dans l'API Python.

+0

Je l'ai déjà essayé le mot réseau programme de comptage et 'pprint' travaille pour cela, donc je suppose que, l'environnement est correctement configuré. De plus, les deux lignes mentionnées sont disponibles à la fin de mon code. Le programme s'exécute jusqu'à ce que j'appuie sur ctrl + c. – ArunDhaJ

+0

@ArunDhaJ - avez-vous installé le serveur netcat (http://landoflinux.com/linux_netcat_command.html) et l'exécuter en utilisant $ nc -lk 9999'? Avez-vous entré des mots dans la console netcat qui seront entrés dans votre programme de diffusion d'étincelles? – Yaron

+0

J'ai essayé le programme de comptage de mots réseau avec 'nc' et l'ai exécuté avec succès. Je ne suis confronté à des problèmes avec l'intégration d'Amazon Kinesis. Je publie des phrases aléatoires sur un flux de kinésis, mais mon client spark ne le sélectionne pas et ne le traite pas. – ArunDhaJ