2017-06-15 1 views
0

J'ai besoin d'appliquer une fonction d'agrégation sur un flux de données avec diffusion d'étincelles apache (NO APACHE SPARK STREAMING SQL).Application de la fonction d'agrégation avec spark scala

Dans mon cas, j'ai un producteur de kafka qui envoie des messages au format JSON. Le format est {'a': String, 'b': String, 'c': Integer, 'd': Double}

je dois agréger sur les attributs 'a' et 'b' toutes les 5 secondes et je dois appliquer une fonction d'agrégation sur les 2 autres attributs (par exemple moyenne, ou la somme, ou Min ou Max).

Comment puis-je faire cela?

Merci

+0

Avez-vous déjà essayé la fonction 'reduce'? https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams – maasg

+0

le problème est que la fonction de réduction prend 2 paramètres et renvoie 1. J'ai besoin d'avoir le même schéma. I autres mots si mon schéma initial est '{'a': String, 'b': String, 'c': Entier, 'd': Double}' le schéma résultant (avec une fonction d'agrégation AVG) devrait être '{' GROUPBYa ': String,' GROUPBYb ': Chaîne,' AVGc ': Entier,' AVGd ': Double} ' –

+0

vous pouvez également utiliser' transform' ou 'foreachRDD' et appliquer n'importe quelle fonction RDD arbitraire, ou convertir en Dataframes et utiliser le API d'agrégation de données – maasg

Répondre

1

Pour commencer, vous pouvez aborder l'agrégation comme ceci:

import sparkSession.implicits._ 

jsonDstream.foreachRDD{jsonRDD => 
    val df = sparkSession.read.json(jsonRDD) 
    val aggr = df.groupBy($"a", $"b").agg(avg($"c")) 
    ... do something with aggr ... 
}