2016-04-14 1 views
1

Je suis dans le besoin de fusionner deux flux différents RDD. Uno du type de flux est org.apache.spark.streaming.dstream.DStream [String], et l'autre est de type org.apache.spark.streaming.dstream.DStream [twitter4j.Status].Comment fusionner deux flux de types différents RDD

J'ai essayé:

val streamRDD = stream.union(sentiments) 

Mais il ne réussirai pas:

[error] found : org.apache.spark.streaming.dstream.DStream[String] 
[error] required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status] 
[error]  val streamRDD = stream.union(sentiments) 
[error]         ^
+1

À quoi voulez-vous que le résultat ressemble? Le message d'erreur décrit correctement le problème: vos 'Dstream's contiennent des types différents et ne peuvent donc pas être fusionnés. Qu'attendez-vous de la fusion de 'DStream' résultante? Si 'String' vous devrez convertir l'autre en un' DStream [String] 'premier –

Répondre

2

Le problème est que union ne fonctionne que sur deux DStream du même type d'élément, alors que vous avez DStream[String] et DStream[twitter4j.Status] et String n'est pas twitter4j.Status.

Je suppose que vous avez les types suivants:

val stream: DStream[twitter4j.Status] 
val sentiments: DStream[String] 

Vous avez des choix différents pour résoudre ce:

    1. Vous êtes sûr que String et twitter4j.Status doit être mélangé dans un DStream car ils représentent la même information dans votre contexte: convertir soit DStream pour correspondre à l'autre

      • a) convertir stream pour correspondre sentiments, vous avez besoin d'une conversion twitter4j.Status => String, peut-être vous pouvez utiliser _.toString comme ceci:

        val stream2 = stream.map(_.toString) 
        val result = stream2.union(sentiments) 
        
      • b) convertir sentiments en fonction stream, nécessitant String => twitter4j.Status.
    1. String et twitter4j.Status sont deux choses différentes dans votre contexte, vous voulez garder la distinction entre les deux, mais encore de les combiner en un seul DStream

    En général, vous pouvez utiliser un Sum type pour représenter chaque cas, ici, nous avons seulement deux afin que nous puissions utiliser le Either prédéfini:

    type R = DStream[Either[String,twitter4j.Status] // shorter 
    val streamL: R = stream.map(Left(_)) 
    val sentimentR: R = sentiments.map(Right(_)) 
    val result: R = streamL.union(sentimentsR) 
    

    A la fin vous avez un flux, où chaque élément est soit un String enveloppé dans un Left ou un twitter4j.Status enveloppé dans un Right, ce qui permet de distinguer entre les deux lors du traitement du flux.

+0

Cela a fonctionné comme un charme, merci beaucoup –