2017-03-25 3 views
0

J'essaye d'écrire d'un sujet (parent) à un autre sujet (enfant) dans kafka basé sur le contenu des enregistrements dans le parent. Un exemple d'enregistrement si je consomme du sujet parent est {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}.Ecrire à kafka Sujet basé sur le contenu du contenu de l'enregistrement en utilisant kafkastreams

Je voudrais utiliser la valeur de entité d'écrire sur un sujet du même nom que la valeur de l'entité (Il y a un montant fixe de valeurs de l'entité pour que je puisse créer statiquement que s'il est programme difficile à créer dynamiquement des sujets). Je suis en train d'utiliser cette

import org.apache.kafka.common.serialization.Serde; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.KeyValue; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 
import java.util.Properties; 

public class entityDataLoader { 
    public static void main(final String[] args) throws Exception { 
    final Properties streamsConfiguration = new Properties(); 
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example"); 
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

    // Set up serializers and deserializers, which we will use for overriding the default serdes 
    // specified above. 
    final Serde<String> stringSerde = Serdes.String(); 
    final Serde<byte[]> byteArraySerde = Serdes.ByteArray(); 

    // In the subsequent lines we define the processing topology of the Streams application. 
    final KStreamBuilder builder = new KStreamBuilder(); 

    // Read the input Kafka topic into a KStream instance. 
    final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events"); 

    String content = textLines.toString(); 
    String entity = JSONExtractor.returnJSONValue(content, "entity"); 
    System.out.println(entity); 

    textLines.to(entity); 

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 
    streams.cleanUp(); 
    streams.start(); 

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams 
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 
} 

Le contenu de contenu est [email protected] rendant évident que @ KStream.toString() n'est pas la bonne méthode à utiliser pour tenter d'obtenir la valeur de l'entité .

P.S. La classe JSONExtractor est définie comme

import org.json.simple.JSONObject; 
import org.json.simple.parser.ParseException; 
import org.json.simple.parser.JSONParser; 
class JSONExtractor { 

public static String returnJSONValue(String args, String value){ 
    JSONParser parser = new JSONParser(); 
    String app= null; 
    System.out.println(args); 
    try{ 
     Object obj = parser.parse(args); 
     JSONObject JObj = (JSONObject)obj; 
     app= (String) JObj.get(value); 
     return app; 
    } 
    catch(ParseException pe){ 
     System.out.println("No Object found"); 
     System.out.println(pe); 
    } 
    return app; 
} 
} 

Répondre

1

Vous pouvez utiliser branch() pour diviser votre flux parent en « flux de sous » et d'écrire chaque « flux de sous » à un sujet de sortie (voir http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations)

Votre branch() must créer un seul "sous-flux" pour tous vos sujets de sortie, mais comme vous connaissez tous vos sujets, cela ne devrait pas poser de problème. En outre, pour les flux Kafka, il est recommandé de créer tous les sujets de sortie avant de démarrer l'application (voir http://docs.confluent.io/current/streams/developer-guide.html#user-topics)