2017-08-30 1 views
1

Actuellement, je travaille sur la construction d'un pipeline de données. Je lis à partir d'une base de données sql 2 tables et je dois les stocker au format dénormalisé dans un entrepôt de données OLAP après les avoir rejoint dans le flux en utilisant des flux kafka. Au lieu d'avoir un sujet individuel pour chaque table, j'ai deux tables qui insèrent des données sur un seul sujet.Rejoindre des flux Kafka contenant des objets Java Hash Map

Je convertis la ligne en hashmap puis, en utilisant le sérialiseur bytes, convertis cette information en tableau bytes et la propage aux rubriques, ainsi toutes les informations d'une rangée sont stockées dans un seul objet. Dont le code est le suivant:

ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
ObjectOutput out = null; 
byte[] yourBytes = null; 
try { 
     out = new ObjectOutputStream(bos); 
     out.writeObject(record); 
     //here record is the row hashmap 
     out.flush(); 
     yourBytes = bos.toByteArray(); 
    } 
catch (IOException ex) { 
    // ignore close exception 
    } 

Dans l'application de traitement de flux i désérialiser le tableau arrière octets aux documents HashMap et de filtrage en deux courants séparés chacun pour une table.

donc mes dossiers à la phase de traitement après désérialisation le tableau d'octets retour à l'objet hashmap, enregistrements ressemble à ce qui suit, où un enregistrement pour chaque flux se rapportant à chaque table est présentée ci-dessous:

(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57}) 

(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"}) 

Maintenant, j'ai pour joindre les données dans deux flux où chaque valeur est une carte de hachage, et joindre sur la clé PRODUCTID qui est le champ commun pour les deux tables et finalement générer un hashmap unique pour chaque ligne et pousser ce flux à un sujet.

Ainsi, les dossiers joints il ressembleront les éléments suivants:

(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"}) 

est-il possible de le faire en utilisant les flux de kafka et si oui, comment?

Répondre

2

Si vous souhaitez vous joindre à Kafka Streams, vous devez extraire l'attribut de jointure et le définir comme la clé du message:

KStream streamOfTable1 = ... 
streamOfTable1.selectKey(/*extract productId and set as key*/).to("newTopic1"); 

KStream streamOfTable2 = ... 
streamOfTable2.selectKey(/*extract productId and set as key*/).to("newTopic2"); 

KTable table1 = builder.table("newTopic1"); 
KTable table2 = builder.table("newTopic2"); 

table1.join(table2, ...).to("resultTopic"); 

Pour plus de détails, voir les docs: http://docs.confluent.io/current/streams/developer-guide.html#joining

I a supposé qu'une jointure KTable-KTable est ce dont vous avez besoin. Notez que vous devez créer "newTopic1" et "newTopic2" manuellement et que les deux doivent avoir le même nombre de partitions. (Voir http://docs.confluent.io/current/streams/developer-guide.html#user-topics)

Vérifiez également les autres types de jointures disponibles, dans le cas où les jointures KTable-KTable ne correspondent pas à ce que vous voulez.

+0

Merci à @Matthias cela a fonctionné. J'ai utilisé KTables pour ça. – Anmol