2017-10-04 6 views
0

J'essaie de consommer un message json en utilisant kafka connect api dans les flux kafka. J'ai essayé de chercher dans google mais je n'ai pas pu trouver d'informations substantielles sur la façon de lire le message json dans les flux api.Consommer des valeurs JSON en utilisant kafka connect json api dans les flux kafka: JAVA

Par conséquent, avec la connaissance limitée j'ai essayé la méthode ci-dessous.

package com.kafka.api.serializers.json; 

import java.util.Properties; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.ForeachAction; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class ConsumerUtilities { 

    private static ObjectMapper om = new ObjectMapper(); 

    public static Properties getProperties() { 

     Properties configs = new Properties(); 
     configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
       "Kafka test application"); 
     configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
       "org.apache.kafka.connect.json.JsonDeserializer"); 
     return configs; 
    } 

    public static KStreamBuilder getStreamingConsumer() { 
     KStreamBuilder builder = new KStreamBuilder(); 
     return builder; 
    } 

    public static void printStreamData() { 
     KStreamBuilder builder = getStreamingConsumer(); 
     KStream<String, JsonNode> kStream = builder.stream("test"); 
     kStream.foreach(new ForeachAction<String, JsonNode>() { 
      @Override 
      public void apply(String key, JsonNode value) { 
       try { 
        System.out.println(key + " : " + om.treeToValue(value, Person.class)); 
       } catch (JsonProcessingException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

     }); 

     KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); 
     kafkaStreams.start(); 
    } 

} 

package com.kafka.api.serializers.json; 

import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import java.util.Properties; 

public class ProducerUtilities { 

    private static ObjectMapper om = new ObjectMapper(); 


    public static org.apache.kafka.clients.producer.Producer<String, JsonNode> getProducer() { 
     Properties configProperties = new Properties(); 
     configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, 
       "kafka json producer"); 
     configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
       "localhost:9092"); 
     configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.connect.json.JsonSerializer"); 

     org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(
       configProperties); 
     return producer; 
    } 

    public ProducerRecord<String,JsonNode> createRecord(Person person){ 
     JsonNode jsonNode = om.valueToTree(person); 
     ProducerRecord<String,JsonNode> record = new ProducerRecord<String,JsonNode>("test",jsonNode); 
     return record; 
    } 

} 

Quand j'exécute le code je reçois exception comme ci-dessous

[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group Kafka test application failed on partition assignment 
org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) 
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) 
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) 
    ... 19 more 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Shutting down 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from REBALANCING to ERROR. 
Exception in thread "Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance. 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) 
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) 
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) 
    ... 3 more 
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) 
    ... 19 more 

Je cherche quelques conseils pour résoudre le problème.

Créé sérialiseur et désérialiseur selon suggestion Matthias

package com.kafka.api.utilities; 

import java.util.Properties; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.ForeachAction; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import com.kafka.api.models.Person; 
import com.kafka.api.serdes.JsonDeserializer; 
import com.kafka.api.serdes.JsonSerializer; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.common.serialization.Serde; 

public class ConsumerUtilities { 

    //private static ObjectMapper om = new ObjectMapper(); 

    public static Properties getProperties() { 

     Properties configs = new Properties(); 
     configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
       "Kafka test application"); 
     configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
//  configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
//    "org.apache.kafka.common.serialization.ByteArraySerializer"); 
//  configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
//    "org.apache.kafka.connect.json.JsonDeserializer"); 
     return configs; 
    } 

    public static KStreamBuilder getStreamingConsumer() { 
     KStreamBuilder builder = new KStreamBuilder(); 
     return builder; 
    } 

    public static void printStreamData() { 
     JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>(); 
     JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(Person.class); 
     Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer, personJsonDeserializer); 

     KStreamBuilder builder = getStreamingConsumer(); 
     KStream<String, Person> kStream = builder.stream(Serdes.String(),personSerde , "test"); 
     kStream.foreach(new ForeachAction<String, Person>() { 
      @Override 
      public void apply(String key, Person value) { 
       System.out.println(key + " : " + value.toString()); 
      } 

     }); 

     KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); 
     kafkaStreams.start(); 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonDeserializer<T> implements Deserializer<T>{ 

    private ObjectMapper om = new ObjectMapper(); 
    private Class<T> type; 

    /* 
    * Default constructor needed by kafka 
    */ 
    public JsonDeserializer(Class<T> type) { 
     this.type = type; 
    } 
    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public void configure(Map<String, ?> map, boolean arg1) { 
     if(type == null){ 
      type = (Class<T>) map.get("type"); 
     } 

    } 

    @Override 
    public T deserialize(String undefined, byte[] bytes) { 
     if(bytes == null || bytes.length == 0){ 
      return null; 
     } 

     try{ 
      return om.readValue(bytes, type); 
     }catch(Exception e){ 
      throw new SerializationException(e); 
     } 
    } 

    protected Class<T> getType(){ 
     return type; 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonSerializer<T> implements Serializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public byte[] serialize(String topic, T data) { 
     // TODO Auto-generated method stub 
     try { 
      return om.writeValueAsBytes(data); 
     } catch (JsonProcessingException e) { 
      throw new SerializationException(); 
     } 
    } 

} 
personnalisée

Exception: Après l'exécution de l'application de streaming je reçois l'exception ci-dessous. Je suis confus.

[Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b] State transition from RUNNING to ERROR. 
Exception in thread "Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=test, partition=0, offset=0 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) 
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) 
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:43) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 

Répondre

1

API Streams a besoin de lire et écrire des données, et donc, il a utilisé l'abstraction d'un Serde qui est un wrapper pour un sérialiseur et désérialiseur en même temps. C'est ce que l'exception dit essentiellement.

Causée par: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer n'est pas une instance de org.apache.kafka.common.serialization.Serde

Ainsi, vous devez envelopper JsonSerializer et JsonDeserialzier dans un JsonSerde et utiliser ce JsonSerde dans StreamsConfig.

La méthode la plus simple consiste à utiliser la méthode Serdes.serdeFrom(...) (remarque: Serdes - pluriel). En guise d'alternative, vous pouvez également implémenter l'interface Serde (notez Serde - singulier) directement. Vous pouvez trouver des exemples dans la classe Serdes sur la façon de mettre en œuvre l'interface Serde.

+0

Selon votre suggestion, j'ai créé sérialiseur et désérialiseur et enveloppé. Mais je reçois l'exception ci-dessus. S'il vous plaît aider. – dataEnthusiast

+0

L'exception vient de votre désérialiseur. Je suppose que vous devez déboguer cette partie par vous-même - ce n'est pas un problème de flux ... Vous pouvez voir à partir de la trace de la pile, que votre désérialiseur est appelé ... Semble comme un problème de schéma? –