Actuellement, je travaille sur la construction d'un pipeline de données. Je lis à partir d'une base de données sql 2 tables et je dois les stocker au format dénormalisé dans un entrepôt de données OLAP après les avoir rejoint dans le flux en utilisant des flux kafka. Au lieu d'avoir un sujet individuel pour chaque table, j'ai deux tables qui insèrent des données sur un seul sujet.Rejoindre des flux Kafka contenant des objets Java Hash Map
Je convertis la ligne en hashmap puis, en utilisant le sérialiseur bytes, convertis cette information en tableau bytes et la propage aux rubriques, ainsi toutes les informations d'une rangée sont stockées dans un seul objet. Dont le code est le suivant:
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] yourBytes = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(record);
//here record is the row hashmap
out.flush();
yourBytes = bos.toByteArray();
}
catch (IOException ex) {
// ignore close exception
}
Dans l'application de traitement de flux i désérialiser le tableau arrière octets aux documents HashMap et de filtrage en deux courants séparés chacun pour une table.
donc mes dossiers à la phase de traitement après désérialisation le tableau d'octets retour à l'objet hashmap, enregistrements ressemble à ce qui suit, où un enregistrement pour chaque flux se rapportant à chaque table est présentée ci-dessous:
(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57})
(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"})
Maintenant, j'ai pour joindre les données dans deux flux où chaque valeur est une carte de hachage, et joindre sur la clé PRODUCTID qui est le champ commun pour les deux tables et finalement générer un hashmap unique pour chaque ligne et pousser ce flux à un sujet.
Ainsi, les dossiers joints il ressembleront les éléments suivants:
(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"})
est-il possible de le faire en utilisant les flux de kafka et si oui, comment?
Merci à @Matthias cela a fonctionné. J'ai utilisé KTables pour ça. – Anmol