Je reçois des millions de messages du flux Kafka en étincelle. Il y a 15 types de message différents. Les messages proviennent d'un seul sujet. Je ne peux que différencier le message par son contenu. donc j'utilise la méthode rdd.contains pour obtenir le type différent de rdd.est rdd.contains fonction dans spark-scala cher
exemple de message
{ "a": "foo", "b": "bar", "type": "first" .......} {
"a": » foo1 "," b ":" bar1 "," type ":" second ".......}
{" a ":" foo2 "," b ":" bar2 "," type ":" troisième ".......}
{" a ":" foo "," b ":" bar "," type ":" premier ".......}
.... ..........
...............
.........
ainsi de suite
Code
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
est-il possible d'obtenir différents RDD du message sujet kafka?
Je dois stocker les données dans la ruche. Il y a 15 tables différentes créées dans la ruche. Question mise à jour En fait, il y a plus de 50 colonnes dans un seul type de JSON. Je dois donc créer 15 classes de cas. Y at-il d'autres au lieu de créer des classes de cas? –
@KishoreKumarSuthar après que les données sont 'structurées' avec la 'classe de cas' initiale (selon Spark jargon), vous pouvez faire des projections sur les données pour correspondre à la table particulière' (val tableProjection1 = records select ($ "column", $ "colonne", ...) où ($ "type" === ...) ' – maasg