J'utilise Confluent-3.2.1 comme streamer Kafka. J'essaie d'agréger mon KGroupedStream<String, MyClass1>
en KTable<Windowed<String>,MsgAggr>
. En utilisant l'agrégation, j'utilise également TimeWindows.of(TimeUnit.SECONDS.toMillis(5))
. J'utilise "Serdes" défini par l'utilisateur comme argument d'agrégation. Le code pour l'utilisateur de définir "Serdes" est,Kafka Streamer: Problème avec les 'Serdes' définis par l'utilisateur
Map<String, Object> serdeProps = new HashMap<>();
final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewSerializer.configure(serdeProps, false);
final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewDeserializer.configure(serdeProps, false);
final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);`
code pour la diffusion est
KGroupedStream<String, MyClass1> msg_grp = message
.groupByKey();
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp
//.reduce(new Reduced(), arg1, arg2);
.aggregate(new Init(),
new Aggr(),
TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),
pageViewSerde,
"MySample_out");
Quand je lance le code que je suis les erreurs:
[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:249)
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Une autre question concerne le traitement de la fenêtre événementielle. À l'intérieur de l'agrégat j'utilise TimeWindow. J'extrais également l'horodatage du flux en utilisant 'streamsConfiguration.put (StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);' Je veux utiliser l'heure qui fait partie du message pour contrôler l'agrégation de la fenêtre. Comment puis-je atteindre cet objectif? – kadsank
L'implémentation 'MyEventTimeExtractor' peut utiliser la méthode' ConsumerRecord # timestamp' pour obtenir l'horodatage – Abhishek
Si vous souhaitez utiliser l'horodatage d'enregistrement intégré de Kafka, vous pouvez déjà utiliser 3 extracteurs prédéfinis: http: // docs .confluent.io/current/streams/developer-guide.html # streams-developer-guide-timestamp-extractor Donc je ne suis pas sûr s'il est nécessaire d'implémenter votre propre extracteur pour ce cas. –