2017-06-30 1 views
2

Le code ci-dessous est lu à partir d'un socket, mais aucune entrée ne s'affiche dans le travail. J'ai nc -l 1111 en cours d'exécution, et le dumping des données si, je ne sais pas pourquoi mon travail d'étincelle ne peut pas lire les données de 10.176.110.112:1111.Comment lire des jeux de données en continu à partir d'une socket?

Dataset<Row> d = sparkSession.readStream().format("socket") 
            .option("host", "10.176.110.112") 
            .option("port", 1111).load(); 

Répondre

2

Ci-dessous le code lit à partir d'une prise, mais je ne vois aucune entrée d'entrer dans le travail.

Eh bien, honnêtement, vous faites pas lire quelque chose de n'importe où. Vous avez seulement décrit ce que vous êtes en allant au lorsque vous démarrez le pipeline en continu. Comme vous utilisez Structured Streaming pour lire des jeux de données à partir d'un socket, vous devez utiliser l'opérateur start pour déclencher la récupération de données (et seulement après avoir défini le récepteur).

start(): StreamingQuery Lance l'exécution de la requête en streaming, ce qui résultats sans cesse sortie au chemin donné que de nouvelles données arrivent. L'objet StreamingQuery renvoyé peut être utilisé pour interagir avec le flux.

Avant de définir start, vous devez définir où diffuser vos données. Il peut s'agir de Kafka, de fichiers, d'un récepteur de streaming personnalisé (utilisant éventuellement l'opérateur foreach) ou d'une console.

J'utilise console sink (format alias) dans l'exemple suivant. J'utilise aussi Scala et je le laisse réécrire à Java comme votre exercice à la maison.

d.writeStream. // <-- this is the most important part 
    trigger(Trigger.ProcessingTime("10 seconds")). 
    format("console"). 
    option("truncate", false). 
    start   // <-- and this