2017-09-24 3 views
0

J'essaie de joindre deux flux de données provenant de deux sujets kafka.Obtenir uniquement les valeurs Null dans le flux de droite pour la jointure à gauche dans les flux kafka

Chaque sujet a une valeur paire de clés, où la clé est un entier et valeur contient le type de données JSON sous forme de chaîne. Les données des deux sources ressemble les exemples suivants ci-dessous (clé, valeur):

2232, {"uniqueID":"2164103","ConsumerID":"63357","CategoryID":"8","BrandID":"5","ProductID":"2232","ProductDetails":"[]","Date":"2013-03-28","Flag":"0"} 

1795, {"ProductName":"Frost Free","ProductID":"1795","BrandID":"16","BrandName":"ABC","CategoryID":"3"} 

Maintenant, je suis en train de rejoindre gauche ces deux cours d'eau en fonction de ProductID, de sorte que la clé est définie sur ProductID pour tous ces enregistrements. Mais malheureusement, je reçois en permanence des valeurs nulles dans la bonne valeur de l'adhésion. Pas même un seul enregistrement est correctement joint. Ce qui suit est mon code pour joindre les deux dossiers:

import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.streams.kstream.*; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.common.serialization.Serde; 
import org.apache.kafka.common.serialization.Serdes; 
import com.google.gson.Gson; 
import com.google.gson.reflect.TypeToken; 

import java.util.concurrent.TimeUnit; 
import java.util.*; 

public class Tester { 
    public static void main(String[] args){ 
     final Properties streamsConfiguration = new Properties(); 

     final Serde<String> stringSerde = Serdes.String(); 
     final Serde<Integer> intSerde = Serdes.Integer(); 

     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-streams"); 
     streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "joining-Client"); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 

     streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, intSerde.getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); 

     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); 
     streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 
     streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 9000); 

     final KStreamBuilder builder = new KStreamBuilder(); 
     KStream<Integer,String> pData = builder.stream(intSerde,stringSerde,"Ptopic"); 
     KStream<Integer,String> streamData = builder.stream(intSerde,stringSerde,"Dtopic"); 
// Test the data type and value of the key 
     pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;}); 
     streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;}); 

     KStream<Integer,String> joined = streamData.leftJoin(pbData,(table1Value,table2Value)->returnJoin(table1Value,table2Value),JoinWindows.of(TimeUnit.SECONDS.toMillis(30))); 

     final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 
     streams.cleanUp(); 
     streams.start(); 

     // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams 
     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 
    private static HashMap convertToHashMap(String jsonString, String tablename){ 
     try{ 
      HashMap<String,String> map = new Gson().fromJson(jsonString, new TypeToken<HashMap<String, String>>(){}.getType()); 
      return map; 
     } 
     catch(Exception x){ 
      //couldn't properly parse json 
      HashMap<String,String> record = new HashMap<>(); 
      if (tablename.equals("PB")){ 
       List<String> keys = new ArrayList<>(Arrays.asList("ProductName", ", "CategoryID", "ProductID", "BrandID", "BrandName", "ProductCategoryID")); 
       for(String key : keys){ 
        record.put(key,null); 
       } 
      } 
      else{ 
       List<String> keys = new ArrayList<>(Arrays.asList("UniqueID", "ConsumerID", "CategoryID", "BrandID", "ProductID", "Date","Flag","ProductDetails")); 
       for(String key : keys){ 
        record.put(key,null); 
       } 
      } 
      return record; 
     } 
    } 
    private static String returnJoin(String map1, String map2){ 
     HashMap h1 = convertToHashMap(map1,"consumer_product"); 
     HashMap h2 = convertToHashMap(map2,"PB"); 
     HashMap map3 = new HashMap<>(); 

     System.out.println("First : " + map1); 
     System.out.println("Second : " + map2); 
     //else{System.out.println("Null only");} 
     for (Object key : h1.keySet()) { 
      key = key.toString(); 
      if (map3.containsKey(key)) { 
       continue; 
      } 
      map3.put(key, h1.get(key)); 
     } 
     try { 
      for (Object key : h2.keySet()) { 
       key = key.toString(); 
       if (map3.containsKey(key)) { 
        continue; 
       } 
       map3.put(key, h2.get(key)); 
      } 
      System.out.println("Worked Okay PB!!!\n--------------------------------------------------------------------------------------"); 
     } 
     catch (NullPointerException ex){ 
      /*System.out.println("Exception\n----------------------------------------------------------------------------"); 
      HashMap fakeC = getHashMap("{","consumer"); 
      for (Object key : fakeC.keySet()) { 
       key = key.toString(); 
       if (map3.containsKey(key)) { 
        continue; 
       } 
       map3.put(key, fakeC.get(key)); 
      }*/ 
      return "INVALID"; 
     } 
     //return map3; 
     return serializeObjectJSON(map3); 
    } 
    private static String serializeObjectJSON(Map row){ 
     StringBuilder jsonString = new StringBuilder(); 
     jsonString.append("{"); 
     for (Object key : row.keySet()){ 
      jsonString.append("\""+key.toString()+"\":"); 
      try { 
       jsonString.append("\"" + row.get(key).toString() + "\","); 
      } 
      catch (NullPointerException Nexp){ 
       jsonString.append("\"" + "null" + "\","); 
      } 

     } 
     jsonString.deleteCharAt(jsonString.length()-1); 
     jsonString.append("}"); 
     String jsString = jsonString.toString(); 
     ////System.out.println("JString :"+jsString); 
     return jsString; 
    } 
} 

Je ne peux pas comprendre pourquoi je suis seulement obtenir nulle dans le flux droite de la jointure gauche, lorsque je tente de joindre les deux cours d'eau soit, mais quand j'essaie de joindre le même flux avec lui-même, la jointure fonctionne. Je me suis assuré que le type de clé est Integer pour tous les enregistrements dans les deux flux et aucune null sont présents comme je vérifie les types et les valeurs clés pour les deux flux (peut être vérifié dans le code ci-dessus) . Et aussi que les deux flux ont des clés qui se chevauchent pour que la jointure arrive, Puisque je pensais que les clés ne se chevaucheraient pas ou peut-être que le type de données peut différer, parce que nous obtenons des valeurs nulles dans les jointures. Est-ce que quelqu'un peut m'aider à comprendre ce que je fais mal?

Mise à jour:

Les données de ces deux sujets (sur lequel je rejoins) vient de deux cours d'eau. Où l'un des flux est un flux de paire (clé, valeur) de type personnalisé (Integer, recordHashmap) et autre est juste un flux de (Entier, chaîne). Ici recordHashmap est un objet personnalisé que j'ai défini pour analyser la chaîne json imbriquée dans un objet. Sa définition est définie comme suit:

public class recordHashmap { 
    private String database; 
    private String table; 
    private String type; 
    private Integer ts; 
    private Integer xid; 
    private Map<String,String> data; 

    public Map getdata(){ 
     return data; 
    } 
    public String getdatabase(){return database;} 
    public String gettable(){return table;} 
    public String gettype(){return type;} 
    public Integer getts(){return ts;} 
    public Integer getxid(){return xid;} 

    public void setdata(Map<String, String> dta){ 
     data=dta; 
    } 
    public void setdatabase(String db){ database=db; } 
    public void settable(String tble){ table=tble; } 
    public void settype(String optype){type=optype;} 
    public void setts(Integer unixTime){ts = unixTime;} 
    public void setxid(Integer Xid){xid = Xid;} 

    public String toString() { 
     return "Database=" + this.database + ", Table=" + this.table+", OperationType="+this.type+", UnixOpTime"+this.ts + ", Data=" 
       + this.data; 
    } 

} 

Et le code pour définir la clé d'identification du produit peut être vu ci-dessous:

KStream<Integer,recordHashmap> rekeyedProductID = inserts.selectKey((k,v)->setTheKey(v.getdata(),"ProductID")); 
KStream<Integer,String> consumer_product_Stream = rekeyedProductID.mapValues((v)->serializeObjectJSON(v.getdata())); 

Et la fonction setTheKey est définie comme

private static Integer setTheKey(Map map, String Key){ 
     try { 
      //System.out.println("New Key : " + map.get(Key)); 
      return Integer.parseInt(map.get(Key).toString()); 
     } 
     catch (NumberFormatException nmb){ 
      //fake return a custom value 
      return -1; 
     } 
    } 

La console enregistre l'exemple pour les deux instructions suivantes est indiqué ci-dessous (Remarque: L'ensemble les journaux sont trop volumineux pour être ajouté, mais la chose principale est que les clés de flux sont des nombres entiers et les clés se chevauchent):

pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;}); 
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;}); 

Journaux de la console:

Table : streamRecord, Type:class java.lang.Integer Value:1342 
Table : streamRecord, Type:class java.lang.Integer Value:595 
Table : streamRecord, Type:class java.lang.Integer Value:1934 
Table : streamRecord, Type:class java.lang.Integer Value:2384 
Table : streamRecord, Type:class java.lang.Integer Value:1666 
Table : streamRecord, Type:class java.lang.Integer Value:665 
Table : streamRecord, Type:class java.lang.Integer Value:2671 
Table : streamRecord, Type:class java.lang.Integer Value:949 
Table : streamRecord, Type:class java.lang.Integer Value:2455 
Table : streamRecord, Type:class java.lang.Integer Value:928 
Table : streamRecord, Type:class java.lang.Integer Value:1602 
Table : streamRecord, Type:class java.lang.Integer Value:74 
Table : P, Type:class java.lang.Integer Value:2 
Table : streamRecord, Type:class java.lang.Integer Value:1795 
Table : P, Type:class java.lang.Integer Value:21 
Table : streamRecord, Type:class java.lang.Integer Value:1265 
Table : P, Type:class java.lang.Integer Value:22 
Table : streamRecord, Type:class java.lang.Integer Value:2420 
Table : P, Type:class java.lang.Integer Value:23 
Table : streamRecord, Type:class java.lang.Integer Value:1419 
Table : P, Type:class java.lang.Integer Value:24 
Table : streamRecord, Type:class java.lang.Integer Value:1395 
Table : P, Type:class java.lang.Integer Value:26 
Table : streamRecord, Type:class java.lang.Integer Value:1783 
Table : P, Type:class java.lang.Integer Value:29 
Table : streamRecord, Type:class java.lang.Integer Value:1177 
Table : P, Type:class java.lang.Integer Value:34 
Table : streamRecord, Type:class java.lang.Integer Value:1395 
Table : P, Type:class java.lang.Integer Value:35 
Table : streamRecord, Type:class java.lang.Integer Value:2551 
Table : P, Type:class java.lang.Integer Value:36 
Table : P, Type:class java.lang.Integer Value:2551 
Table : streamRecord, Type:class java.lang.Integer Value:2530 
Table : P, Type:class java.lang.Integer Value:37 
Table : streamRecord, Type:class java.lang.Integer Value:541 
Table : P, Type:class java.lang.Integer Value:39 
Table : streamRecord, Type:class java.lang.Integer Value:787 
Table : P, Type:class java.lang.Integer Value:40 
Table : streamRecord, Type:class java.lang.Integer Value:2498 
Table : P, Type:class java.lang.Integer Value:41 
Table : streamRecord, Type:class java.lang.Integer Value:1439 
Table : P, Type:class java.lang.Integer Value:44 
Table : streamRecord, Type:class java.lang.Integer Value:784 
Table : P, Type:class java.lang.Integer Value:284 
Table : P, Type:class java.lang.Integer Value:285 
Table : P, Type:class java.lang.Integer Value:929 
Table : P, Type:class java.lang.Integer Value:286 
Table : P, Type:class java.lang.Integer Value:287 
Table : P, Type:class java.lang.Integer Value:2225 
Table : P, Type:class java.lang.Integer Value:288 
Table : P, Type:class java.lang.Integer Value:289 
Table : P, Type:class java.lang.Integer Value:290 
Table : P, Type:class java.lang.Integer Value:295 
Table : P, Type:class java.lang.Integer Value:297 
Table : P, Type:class java.lang.Integer Value:300 
Table : P, Type:class java.lang.Integer Value:302 
Table : P, Type:class java.lang.Integer Value:305 
Table : P, Type:class java.lang.Integer Value:306 
Table : P, Type:class java.lang.Integer Value:307 
Table : P, Type:class java.lang.Integer Value:308 
Table : P, Type:class java.lang.Integer Value:309 
Table : P, Type:class java.lang.Integer Value:310 
Table : streamRecord, Type:class java.lang.Integer Value:929 
Table : streamRecord, Type:class java.lang.Integer Value:1509 
Table : streamRecord, Type:class java.lang.Integer Value:136 
Table : streamRecord, Type:class java.lang.Integer Value:2225 
Table : streamRecord, Type:class java.lang.Integer Value:906 
Table : streamRecord, Type:class java.lang.Integer Value:1013 
Table : streamRecord, Type:class java.lang.Integer Value:1759 
Table : streamRecord, Type:class java.lang.Integer Value:1759 
Table : streamRecord, Type:class java.lang.Integer Value:885 
Table : streamRecord, Type:class java.lang.Integer Value:1165 
Table : streamRecord, Type:class java.lang.Integer Value:453 

Mise à jour-2: Le Intéressant chose ici est que leftJoin fonctionne bien pour KTables pour le même ensemble de paires clé-valeur. Mais pas pour KStreams pour une raison quelconque.Mais j'ai besoin d'utiliser KStreams puisque j'ai beaucoup d'enregistrements relatifs à une clé. Généralement la plupart du temps cette jointure sur les ruisseaux fonctionne comme un charme mais son action bizarre dans ce cas particulier. Je suppose que cela pourrait être avec RocksDB ou la mise en cache interne.

Répondre

0

Il semble que vous ne définissez pas la ProductID comme la clé:

pbData.selectKey((k,v)->{System.out.println("Table : P, Type : "+k.getClass()+" Value : "+k);return k;}); 
streamData.selectKey((k,v)->{System.out.println("Table : StreamRecord, Type : "+k.getClass()+" Value : "+k);return k;}); 

Dans les deux déclaration, vous retournez la clé d'origine ->return k; au lieu d'analyser le productId du JSON et le renvoyer.

Mise à jour

Je ne suis toujours pas sûr si je peux mettre tous les morceaux ensemble correctement, comme dans votre mise à jour, vous utilisez

KStream<Integer,recordHashmap> rekeyedProductID = inserts.selectKey((k,v)->setTheKey(v.getdata(),"ProductID")); 
KStream<Integer,String> consumer_product_Stream = 

rekeyedProductID.mapValues ​​((v) -> serializeObjectJSON (v.getdata()));

et il n'est pas clair ce que inserts et rekeyedProductID sont (quels sont les types?). Quoi qu'il en soit, je suppose que cette partie est juste correcte. Comme vous mentionnez que cela fonctionne si le côté droit est un KTable (en utilisant les mêmes données), je suppose simplement que la fenêtre de jointure n'est pas assez grande, de sorte que deux enregistrements avec la même clé sont plus éloignés (à temps) chaque autre que votre spécifié 30 secondes. Pouvez-vous vérifier les horodatages d'enregistrement des deux flux d'entrée? (cf https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information)

+0

Pas de problème, ce ne sont que quelques exemples de tests que j'ai utilisés pour tester le même type et imprimer des valeurs de clé pour la console. La clé est définie sur des entiers lorsque j'écris les données sur le sujet. – Anmol

+0

J'ai mis à jour la question ci-dessus avec le code pour définir les valeurs clés. – Anmol

+0

La chose intéressante ici est que leftJoin fonctionne bien pour ** KTables ** pour le même ensemble de paires clé-valeur. Mais pas pour KStreams pour une raison quelconque. Mais j'ai besoin d'utiliser KStreams puisque j'ai beaucoup d'enregistrements relatifs à une clé. La chose étrange est que j'ai utilisé cette approche pour joindre des enregistrements plus anciens de très grandes tailles dans un cluster distribué et cela fonctionne comme un charme, mais pas pour ce cas particulier. Cela peut-il avoir quelque chose à voir avec RocksDB ou la mise en cache ?? – Anmol