2017-09-26 5 views
-1

Je reçois exception de transtypage de classe tandis que les messages passent par l'API Streams Kafka,exception de transtypage de classe tandis que les messages passent par l'API Streams Kafka

Exception est:

java.lank/ClassCastException: [B ne peut pas être jeter à com.fasterxml.jackson.databind.JsonNode

Mon code Stream est:

public static void main(String[] args) { 
    Properties config = new Properties(); 
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "BranchingTopics-API"); 
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "samsmembershipkafka.dev.cloud.wal-mart.com:9092"); 
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
    final Serde<String> stringSerde = Serdes.String(); 
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, 
      jsonDeserializer); 

    /* 
    * config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, jsonSerde); 
    * config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
    * jsonSerde); 
    */ 

    // Building Stream 
    KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, JsonNode> textlines = builder.stream("MainTopic"); 

    Predicate<String, JsonNode> isAddComment = (key, value) -> value 
      .get("header").toString().contains("/addComment"); 
    Predicate<String, JsonNode> is1M1C = (key, value) -> value 
      .get("header").toString().contains("/1Member1Account"); 
    Predicate<String, JsonNode> isLostOrStolen = (key, value) -> (value 
      .get("header").toString() 
      .contains("/changeCardStatus?action=STOLEN") || value 
      .get("header").toString() 
      .contains("/changeCardStatus?action=LOST")); 

    KStream<String, JsonNode>[] topicTypes = textlines.branch(isAddComment, 
      is1M1C, isLostOrStolen); 

    topicTypes[0].to(stringSerde, jsonSerde, "CommentsTopic"); 
    topicTypes[1].to(stringSerde, jsonSerde, "OneMemberOneAccountTopic"); 
    topicTypes[2].to(stringSerde, jsonSerde, "LostOrStolenTopic"); 

    KafkaStreams streams = new KafkaStreams(builder, config); 

    streams.start(); 
} 
+0

Quelle est la trace complète de la pile? On dirait que vous utilisez le mauvais Serde quelque part. À partir de votre extrait de code, je suppose que vous devez spécifier JSON-Serde lorsque vous lisez le sujet 'builder.stream (stringSerde, jsonSerde," MainTopic ");' - Je suppose que vous devez spécifier des serdes corrects pour d'autres opérations, . Regardez attentivement la trace de la pile pour déterminer quel opérateur lance l'exception. –

+1

oui c'était le problème, travaille maintenant, merci – user8677554

+0

Mettez mon commentaire comme réponse afin que vous puissiez marquer la question comme étant une réponse. –

Répondre

0

Semble que vous utilisez le mauvais Serde quelque part. A partir de vous extrait de code, je présume que vous devez spécifier JSON-Serde lors de la lecture du sujet:

builder.stream(stringSerde, jsonSerde, "MainTopic"); 

Je suppose que vous devez spécifier Serde correct s pour d'autres opérations, aussi. Regardez attentivement la trace de la pile pour savoir quel opérateur lance l'exception