2017-10-13 9 views
0

J'essaie d'obtenir chaque enregistrement du flux de socket. Je veux que l'enregistrement soit un type de données chaîne à partir de lignes. Comment écrire le code en python? Merci!Comment obtenir un enregistrement au format chaîne à partir de socketTextStream

modèle

= pipeline.PipelineModel.read() charge. (Model_path)

sc = spark.sparkContext ssc = StreamingContext (sc, 1)

lines = ssc.socketTextStream (sys.argv [ 1], int (sys.argv [2]))

si (lignes ne sont pas None): lines.foreachRDD (lambda rdd: rdd.foreach (processRecord))

def processRecord (enregistrement):

print("test") 
... 

Répondre

0
from __future__ import print_function 
import sys 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 


if __name__ == "__main__": 
    sc = SparkContext(appName="Demo") 
    ssc = StreamingContext(sc, 1) 

    #record = ssc.socketTextStream("localhost", 9999) 
    record = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
    # print out each single word 
    record.flatMap(lambda line: line.split(" ")).pprint() 

    # start streaming 
    ssc.start() 
    # stop when the socket we are listening is dead 
    ssc.awaitTermination() 

Merci.

+0

enregistrement n'est pas le type de chaîne – icecream

+0

J'ai ajouté plus de codes là. SVP vérifier ce qui ne va pas avec mes codes. Merci! – icecream