Je reçois exception de transtypage de classe tandis que les messages passent par l'API Streams Kafka,exception de transtypage de classe tandis que les messages passent par l'API Streams Kafka
Exception est:
java.lank/ClassCastException: [B ne peut pas être jeter à com.fasterxml.jackson.databind.JsonNode
Mon code Stream est:
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "BranchingTopics-API");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"samsmembershipkafka.dev.cloud.wal-mart.com:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final Serde<String> stringSerde = Serdes.String();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
/*
* config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, jsonSerde);
* config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
* jsonSerde);
*/
// Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> textlines = builder.stream("MainTopic");
Predicate<String, JsonNode> isAddComment = (key, value) -> value
.get("header").toString().contains("/addComment");
Predicate<String, JsonNode> is1M1C = (key, value) -> value
.get("header").toString().contains("/1Member1Account");
Predicate<String, JsonNode> isLostOrStolen = (key, value) -> (value
.get("header").toString()
.contains("/changeCardStatus?action=STOLEN") || value
.get("header").toString()
.contains("/changeCardStatus?action=LOST"));
KStream<String, JsonNode>[] topicTypes = textlines.branch(isAddComment,
is1M1C, isLostOrStolen);
topicTypes[0].to(stringSerde, jsonSerde, "CommentsTopic");
topicTypes[1].to(stringSerde, jsonSerde, "OneMemberOneAccountTopic");
topicTypes[2].to(stringSerde, jsonSerde, "LostOrStolenTopic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
Quelle est la trace complète de la pile? On dirait que vous utilisez le mauvais Serde quelque part. À partir de votre extrait de code, je suppose que vous devez spécifier JSON-Serde lorsque vous lisez le sujet 'builder.stream (stringSerde, jsonSerde," MainTopic ");' - Je suppose que vous devez spécifier des serdes corrects pour d'autres opérations, . Regardez attentivement la trace de la pile pour déterminer quel opérateur lance l'exception. –
oui c'était le problème, travaille maintenant, merci – user8677554
Mettez mon commentaire comme réponse afin que vous puissiez marquer la question comme étant une réponse. –