J'essaie de joindre deux flux de données provenant de deux sujets kafka.Obtenir uniquement les valeurs Null dans le flux de droite pour la jointure à gauche dans les flux kafka
Chaque sujet a une valeur paire de clés, où la clé est un entier et valeur contient le type de données JSON sous forme de chaîne. Les données des deux sources ressemble les exemples suivants ci-dessous (clé, valeur):
2232, {"uniqueID":"2164103","ConsumerID":"63357","CategoryID":"8","BrandID":"5","ProductID":"2232","ProductDetails":"[]","Date":"2013-03-28","Flag":"0"}
1795, {"ProductName":"Frost Free","ProductID":"1795","BrandID":"16","BrandName":"ABC","CategoryID":"3"}
Maintenant, je suis en train de rejoindre gauche ces deux cours d'eau en fonction de ProductID, de sorte que la clé est définie sur ProductID pour tous ces enregistrements. Mais malheureusement, je reçois en permanence des valeurs nulles dans la bonne valeur de l'adhésion. Pas même un seul enregistrement est correctement joint. Ce qui suit est mon code pour joindre les deux dossiers:
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.concurrent.TimeUnit;
import java.util.*;
public class Tester {
public static void main(String[] args){
final Properties streamsConfiguration = new Properties();
final Serde<String> stringSerde = Serdes.String();
final Serde<Integer> intSerde = Serdes.Integer();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-streams");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "joining-Client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, intSerde.getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 9000);
final KStreamBuilder builder = new KStreamBuilder();
KStream<Integer,String> pData = builder.stream(intSerde,stringSerde,"Ptopic");
KStream<Integer,String> streamData = builder.stream(intSerde,stringSerde,"Dtopic");
// Test the data type and value of the key
pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;});
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;});
KStream<Integer,String> joined = streamData.leftJoin(pbData,(table1Value,table2Value)->returnJoin(table1Value,table2Value),JoinWindows.of(TimeUnit.SECONDS.toMillis(30)));
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static HashMap convertToHashMap(String jsonString, String tablename){
try{
HashMap<String,String> map = new Gson().fromJson(jsonString, new TypeToken<HashMap<String, String>>(){}.getType());
return map;
}
catch(Exception x){
//couldn't properly parse json
HashMap<String,String> record = new HashMap<>();
if (tablename.equals("PB")){
List<String> keys = new ArrayList<>(Arrays.asList("ProductName", ", "CategoryID", "ProductID", "BrandID", "BrandName", "ProductCategoryID"));
for(String key : keys){
record.put(key,null);
}
}
else{
List<String> keys = new ArrayList<>(Arrays.asList("UniqueID", "ConsumerID", "CategoryID", "BrandID", "ProductID", "Date","Flag","ProductDetails"));
for(String key : keys){
record.put(key,null);
}
}
return record;
}
}
private static String returnJoin(String map1, String map2){
HashMap h1 = convertToHashMap(map1,"consumer_product");
HashMap h2 = convertToHashMap(map2,"PB");
HashMap map3 = new HashMap<>();
System.out.println("First : " + map1);
System.out.println("Second : " + map2);
//else{System.out.println("Null only");}
for (Object key : h1.keySet()) {
key = key.toString();
if (map3.containsKey(key)) {
continue;
}
map3.put(key, h1.get(key));
}
try {
for (Object key : h2.keySet()) {
key = key.toString();
if (map3.containsKey(key)) {
continue;
}
map3.put(key, h2.get(key));
}
System.out.println("Worked Okay PB!!!\n--------------------------------------------------------------------------------------");
}
catch (NullPointerException ex){
/*System.out.println("Exception\n----------------------------------------------------------------------------");
HashMap fakeC = getHashMap("{","consumer");
for (Object key : fakeC.keySet()) {
key = key.toString();
if (map3.containsKey(key)) {
continue;
}
map3.put(key, fakeC.get(key));
}*/
return "INVALID";
}
//return map3;
return serializeObjectJSON(map3);
}
private static String serializeObjectJSON(Map row){
StringBuilder jsonString = new StringBuilder();
jsonString.append("{");
for (Object key : row.keySet()){
jsonString.append("\""+key.toString()+"\":");
try {
jsonString.append("\"" + row.get(key).toString() + "\",");
}
catch (NullPointerException Nexp){
jsonString.append("\"" + "null" + "\",");
}
}
jsonString.deleteCharAt(jsonString.length()-1);
jsonString.append("}");
String jsString = jsonString.toString();
////System.out.println("JString :"+jsString);
return jsString;
}
}
Je ne peux pas comprendre pourquoi je suis seulement obtenir nulle dans le flux droite de la jointure gauche, lorsque je tente de joindre les deux cours d'eau soit, mais quand j'essaie de joindre le même flux avec lui-même, la jointure fonctionne. Je me suis assuré que le type de clé est Integer pour tous les enregistrements dans les deux flux et aucune null sont présents comme je vérifie les types et les valeurs clés pour les deux flux (peut être vérifié dans le code ci-dessus) . Et aussi que les deux flux ont des clés qui se chevauchent pour que la jointure arrive, Puisque je pensais que les clés ne se chevaucheraient pas ou peut-être que le type de données peut différer, parce que nous obtenons des valeurs nulles dans les jointures. Est-ce que quelqu'un peut m'aider à comprendre ce que je fais mal?
Mise à jour:
Les données de ces deux sujets (sur lequel je rejoins) vient de deux cours d'eau. Où l'un des flux est un flux de paire (clé, valeur) de type personnalisé (Integer, recordHashmap) et autre est juste un flux de (Entier, chaîne). Ici recordHashmap est un objet personnalisé que j'ai défini pour analyser la chaîne json imbriquée dans un objet. Sa définition est définie comme suit:
public class recordHashmap {
private String database;
private String table;
private String type;
private Integer ts;
private Integer xid;
private Map<String,String> data;
public Map getdata(){
return data;
}
public String getdatabase(){return database;}
public String gettable(){return table;}
public String gettype(){return type;}
public Integer getts(){return ts;}
public Integer getxid(){return xid;}
public void setdata(Map<String, String> dta){
data=dta;
}
public void setdatabase(String db){ database=db; }
public void settable(String tble){ table=tble; }
public void settype(String optype){type=optype;}
public void setts(Integer unixTime){ts = unixTime;}
public void setxid(Integer Xid){xid = Xid;}
public String toString() {
return "Database=" + this.database + ", Table=" + this.table+", OperationType="+this.type+", UnixOpTime"+this.ts + ", Data="
+ this.data;
}
}
Et le code pour définir la clé d'identification du produit peut être vu ci-dessous:
KStream<Integer,recordHashmap> rekeyedProductID = inserts.selectKey((k,v)->setTheKey(v.getdata(),"ProductID"));
KStream<Integer,String> consumer_product_Stream = rekeyedProductID.mapValues((v)->serializeObjectJSON(v.getdata()));
Et la fonction setTheKey est définie comme
private static Integer setTheKey(Map map, String Key){
try {
//System.out.println("New Key : " + map.get(Key));
return Integer.parseInt(map.get(Key).toString());
}
catch (NumberFormatException nmb){
//fake return a custom value
return -1;
}
}
La console enregistre l'exemple pour les deux instructions suivantes est indiqué ci-dessous (Remarque: L'ensemble les journaux sont trop volumineux pour être ajouté, mais la chose principale est que les clés de flux sont des nombres entiers et les clés se chevauchent):
pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;});
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;});
Journaux de la console:
Table : streamRecord, Type:class java.lang.Integer Value:1342
Table : streamRecord, Type:class java.lang.Integer Value:595
Table : streamRecord, Type:class java.lang.Integer Value:1934
Table : streamRecord, Type:class java.lang.Integer Value:2384
Table : streamRecord, Type:class java.lang.Integer Value:1666
Table : streamRecord, Type:class java.lang.Integer Value:665
Table : streamRecord, Type:class java.lang.Integer Value:2671
Table : streamRecord, Type:class java.lang.Integer Value:949
Table : streamRecord, Type:class java.lang.Integer Value:2455
Table : streamRecord, Type:class java.lang.Integer Value:928
Table : streamRecord, Type:class java.lang.Integer Value:1602
Table : streamRecord, Type:class java.lang.Integer Value:74
Table : P, Type:class java.lang.Integer Value:2
Table : streamRecord, Type:class java.lang.Integer Value:1795
Table : P, Type:class java.lang.Integer Value:21
Table : streamRecord, Type:class java.lang.Integer Value:1265
Table : P, Type:class java.lang.Integer Value:22
Table : streamRecord, Type:class java.lang.Integer Value:2420
Table : P, Type:class java.lang.Integer Value:23
Table : streamRecord, Type:class java.lang.Integer Value:1419
Table : P, Type:class java.lang.Integer Value:24
Table : streamRecord, Type:class java.lang.Integer Value:1395
Table : P, Type:class java.lang.Integer Value:26
Table : streamRecord, Type:class java.lang.Integer Value:1783
Table : P, Type:class java.lang.Integer Value:29
Table : streamRecord, Type:class java.lang.Integer Value:1177
Table : P, Type:class java.lang.Integer Value:34
Table : streamRecord, Type:class java.lang.Integer Value:1395
Table : P, Type:class java.lang.Integer Value:35
Table : streamRecord, Type:class java.lang.Integer Value:2551
Table : P, Type:class java.lang.Integer Value:36
Table : P, Type:class java.lang.Integer Value:2551
Table : streamRecord, Type:class java.lang.Integer Value:2530
Table : P, Type:class java.lang.Integer Value:37
Table : streamRecord, Type:class java.lang.Integer Value:541
Table : P, Type:class java.lang.Integer Value:39
Table : streamRecord, Type:class java.lang.Integer Value:787
Table : P, Type:class java.lang.Integer Value:40
Table : streamRecord, Type:class java.lang.Integer Value:2498
Table : P, Type:class java.lang.Integer Value:41
Table : streamRecord, Type:class java.lang.Integer Value:1439
Table : P, Type:class java.lang.Integer Value:44
Table : streamRecord, Type:class java.lang.Integer Value:784
Table : P, Type:class java.lang.Integer Value:284
Table : P, Type:class java.lang.Integer Value:285
Table : P, Type:class java.lang.Integer Value:929
Table : P, Type:class java.lang.Integer Value:286
Table : P, Type:class java.lang.Integer Value:287
Table : P, Type:class java.lang.Integer Value:2225
Table : P, Type:class java.lang.Integer Value:288
Table : P, Type:class java.lang.Integer Value:289
Table : P, Type:class java.lang.Integer Value:290
Table : P, Type:class java.lang.Integer Value:295
Table : P, Type:class java.lang.Integer Value:297
Table : P, Type:class java.lang.Integer Value:300
Table : P, Type:class java.lang.Integer Value:302
Table : P, Type:class java.lang.Integer Value:305
Table : P, Type:class java.lang.Integer Value:306
Table : P, Type:class java.lang.Integer Value:307
Table : P, Type:class java.lang.Integer Value:308
Table : P, Type:class java.lang.Integer Value:309
Table : P, Type:class java.lang.Integer Value:310
Table : streamRecord, Type:class java.lang.Integer Value:929
Table : streamRecord, Type:class java.lang.Integer Value:1509
Table : streamRecord, Type:class java.lang.Integer Value:136
Table : streamRecord, Type:class java.lang.Integer Value:2225
Table : streamRecord, Type:class java.lang.Integer Value:906
Table : streamRecord, Type:class java.lang.Integer Value:1013
Table : streamRecord, Type:class java.lang.Integer Value:1759
Table : streamRecord, Type:class java.lang.Integer Value:1759
Table : streamRecord, Type:class java.lang.Integer Value:885
Table : streamRecord, Type:class java.lang.Integer Value:1165
Table : streamRecord, Type:class java.lang.Integer Value:453
Mise à jour-2: Le Intéressant chose ici est que leftJoin fonctionne bien pour KTables pour le même ensemble de paires clé-valeur. Mais pas pour KStreams pour une raison quelconque.Mais j'ai besoin d'utiliser KStreams puisque j'ai beaucoup d'enregistrements relatifs à une clé. Généralement la plupart du temps cette jointure sur les ruisseaux fonctionne comme un charme mais son action bizarre dans ce cas particulier. Je suppose que cela pourrait être avec RocksDB ou la mise en cache interne.
Pas de problème, ce ne sont que quelques exemples de tests que j'ai utilisés pour tester le même type et imprimer des valeurs de clé pour la console. La clé est définie sur des entiers lorsque j'écris les données sur le sujet. – Anmol
J'ai mis à jour la question ci-dessus avec le code pour définir les valeurs clés. – Anmol
La chose intéressante ici est que leftJoin fonctionne bien pour ** KTables ** pour le même ensemble de paires clé-valeur. Mais pas pour KStreams pour une raison quelconque. Mais j'ai besoin d'utiliser KStreams puisque j'ai beaucoup d'enregistrements relatifs à une clé. La chose étrange est que j'ai utilisé cette approche pour joindre des enregistrements plus anciens de très grandes tailles dans un cluster distribué et cela fonctionne comme un charme, mais pas pour ce cas particulier. Cela peut-il avoir quelque chose à voir avec RocksDB ou la mise en cache ?? – Anmol