2017-10-20 3 views
0

Il existe un moyen de PIVOT/UNPIVOT (exploser, trasposer) un flux avec des ruisseaux Kafka?Comment faire pivoter/dé-pivoter un flux

Si je le flux d'entrée avec

machineId ts VarName VarValue 
m1 2017-10-01 00:00:00 var1 1.0 
m1 2017-10-01 00:00:00 var2 2.0 
m2 2017-10-01 00:00:00 var1 3.0 
m2 2017-10-01 00:00:00 var3 4.0 
m3 2017-10-01 00:00:00 var4 5.0 
... 

Je veux un moyen d'obtenir le flux de sortie

machineId ts Vars 
m1 2017-10-01 00:00:00 [[var1, 1.0], [var2, 2.0]] 
m2 2017-10-01 00:00:00 [[var1, 3.0], [var3, 4.0]] 
m3 2017-10-01 00:00:00 [[var4, 5.0]] 
... 

Répondre

0

Vous pouvez utiliser une agrégation avec le type de sortie List. Quelque chose comme

KStream<MachineId, V> inputStream = ... 
KTable<MachineId, List<V>> result = inputStream.groupByKey() 
               .aggregate(...); 

Les Initializer renvoie un vide List<V> et Aggregator ajoutera les valeurs à la liste.

Vérifiez les docs et des exemples pour plus de détails: