2017-05-23 2 views
0

J'utilise Confluent-3.2.1 comme streamer Kafka. J'essaie d'agréger mon KGroupedStream<String, MyClass1> en KTable<Windowed<String>,MsgAggr>. En utilisant l'agrégation, j'utilise également TimeWindows.of(TimeUnit.SECONDS.toMillis(5)). J'utilise "Serdes" défini par l'utilisateur comme argument d'agrégation. Le code pour l'utilisateur de définir "Serdes" est,Kafka Streamer: Problème avec les 'Serdes' définis par l'utilisateur

Map<String, Object> serdeProps = new HashMap<>(); 

final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>(); 
serdeProps.put("JsonPOJOClass", MsgAggr.class); 
pageViewSerializer.configure(serdeProps, false); 

final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>(); 
serdeProps.put("JsonPOJOClass", MsgAggr.class); 
pageViewDeserializer.configure(serdeProps, false); 

final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);` 

code pour la diffusion est

KGroupedStream<String, MyClass1> msg_grp = message 
      .groupByKey(); 
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp 
      //.reduce(new Reduced(), arg1, arg2); 
      .aggregate(new Init(), 
        new Aggr(), 
        TimeWindows.of(TimeUnit.SECONDS.toMillis(5)), 
        pageViewSerde, 
        "MySample_out"); 

Quand je lance le code que je suis les erreurs:

[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:249) 
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String 
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String 
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) 
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 

Répondre

3

Le problème est avec message.groupByKey(); . Son utilisation de la chaîne Serde pour votre classe personnalisée MyClass1. Veuillez mettre en œuvre un sérialiseur et un désérialiseur personnalisés pour MyClass1 et utiliser la même chose dans la version surchargée de groupByKey - https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde)

+0

Une autre question concerne le traitement de la fenêtre événementielle. À l'intérieur de l'agrégat j'utilise TimeWindow. J'extrais également l'horodatage du flux en utilisant 'streamsConfiguration.put (StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);' Je veux utiliser l'heure qui fait partie du message pour contrôler l'agrégation de la fenêtre. Comment puis-je atteindre cet objectif? – kadsank

+0

L'implémentation 'MyEventTimeExtractor' peut utiliser la méthode' ConsumerRecord # timestamp' pour obtenir l'horodatage – Abhishek

+0

Si vous souhaitez utiliser l'horodatage d'enregistrement intégré de Kafka, vous pouvez déjà utiliser 3 extracteurs prédéfinis: http: // docs .confluent.io/current/streams/developer-guide.html # streams-developer-guide-timestamp-extractor Donc je ne suis pas sûr s'il est nécessaire d'implémenter votre propre extracteur pour ce cas. –