J'ai besoin d'appliquer une fonction d'agrégation sur un flux de données avec diffusion d'étincelles apache (NO APACHE SPARK STREAMING SQL).Application de la fonction d'agrégation avec spark scala
Dans mon cas, j'ai un producteur de kafka qui envoie des messages au format JSON. Le format est {'a': String, 'b': String, 'c': Integer, 'd': Double}
je dois agréger sur les attributs 'a'
et 'b'
toutes les 5 secondes et je dois appliquer une fonction d'agrégation sur les 2 autres attributs (par exemple moyenne, ou la somme, ou Min ou Max).
Comment puis-je faire cela?
Merci
Avez-vous déjà essayé la fonction 'reduce'? https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams – maasg
le problème est que la fonction de réduction prend 2 paramètres et renvoie 1. J'ai besoin d'avoir le même schéma. I autres mots si mon schéma initial est '{'a': String, 'b': String, 'c': Entier, 'd': Double}' le schéma résultant (avec une fonction d'agrégation AVG) devrait être '{' GROUPBYa ': String,' GROUPBYb ': Chaîne,' AVGc ': Entier,' AVGd ': Double} ' –
vous pouvez également utiliser' transform' ou 'foreachRDD' et appliquer n'importe quelle fonction RDD arbitraire, ou convertir en Dataframes et utiliser le API d'agrégation de données – maasg