2017-09-27 3 views
0

Je reçois des millions de messages du flux Kafka en étincelle. Il y a 15 types de message différents. Les messages proviennent d'un seul sujet. Je ne peux que différencier le message par son contenu. donc j'utilise la méthode rdd.contains pour obtenir le type différent de rdd.est rdd.contains fonction dans spark-scala cher

exemple de message

{ "a": "foo", "b": "bar", "type": "first" .......} {
"a": » foo1 "," b ":" bar1 "," type ":" second ".......}
{" a ":" foo2 "," b ":" bar2 "," type ":" troisième ".......}
{" a ":" foo "," b ":" bar "," type ":" premier ".......}
.... ..........
...............
.........
ainsi de suite

Code

DStream.foreachRDD { rdd => 
    if (!rdd.isEmpty()) { 
    val rdd_first = rdd.filter { 
     ele => ele.contains("First") 
    } 
    if (!rdd_first.isEmpty()) { 
     insertIntoTableFirst(hivecontext.read.json(rdd_first)) 
    } 
    val rdd_second = rdd.filter { 
     ele => ele.contains("Second") 
    } 
    if (!rdd_second.isEmpty()) { 
    insertIntoTableSecond(hivecontext.read.json(rdd_second)) 
    } 
     ............. 
     ...... 
    same way for 15 different rdd 

est-il possible d'obtenir différents RDD du message sujet kafka?

Répondre

1

Il n'y a pas de rdd.contains. La fonction contains utilisée ici est appliquée aux String s du RDD.

Comme ici:

val rdd_first = rdd.filter { 
    element => element.contains("First") // each `element` is a String 
} 

Cette méthode est robuste, car tout autre contenu dans la chaîne pourrait répondre à la comparaison, ce qui entraîne des erreurs.

par exemple.

{"a":"foo", "b":"bar","type":"second", "c": "first", .......} 

Une façon de traiter ce serait de transformer d'abord les données JSON dans les dossiers appropriés, puis appliquer une logique de regroupement ou de filtrage sur ces enregistrements. Pour cela, nous avons d'abord besoin d'une définition de schéma des données. Avec le schéma, nous pouvons analyser les enregistrements dans JSON et appliquer un traitement au-dessus de ce qui suit:

case class Record(a:String, b:String, `type`:String) 

import org.apache.spark.sql.types._ 
val schema = StructType(
       Array(
       StructField("a", StringType, true), 
       StructField("b", StringType, true), 
       StructField("type", String, true) 
       ) 
      ) 

val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...) 

stream.foreachRDD { rdd => 
    val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record] 
    processPerType.foreach{case (tpe, process) => 
     val target = records.filter(entry => entry.`type` == tpe) 
     process(target) 
    } 
} 

La question ne précise pas quel type de logique doit être appliquée à chaque type d'enregistrement. Ce qui est présenté ici est une manière générique d'aborder le problème où toute logique personnalisée peut être exprimée comme une fonction Dataset[Record] => Unit.

Si la logique peut être exprimée comme une agrégation, les fonctions d'agrégation Dataset seront probablement plus appropriées.

+0

Je dois stocker les données dans la ruche. Il y a 15 tables différentes créées dans la ruche. Question mise à jour En fait, il y a plus de 50 colonnes dans un seul type de JSON. Je dois donc créer 15 classes de cas. Y at-il d'autres au lieu de créer des classes de cas? –

+0

@KishoreKumarSuthar après que les données sont 'structurées' avec la 'classe de cas' initiale (selon Spark jargon), vous pouvez faire des projections sur les données pour correspondre à la table particulière' (val tableProjection1 = records select ($ "column", $ "colonne", ...) où ($ "type" === ...) ' – maasg