2017-10-05 5 views
0

J'ai écrit un sérialiseur et un désérialiseur personnalisés pour lire des chaînes json sur kafka.Problème de jeton non reconnu avec jackson api lors de la désérialisation

JSON sérialiseur et désérialiseur regarde comme ci-dessous

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonSerializer<T> implements Serializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public byte[] serialize(String topic, T data) { 
     // TODO Auto-generated method stub 
     try { 
      return om.writeValueAsBytes(data); 
     } catch (JsonProcessingException e) { 
      throw new SerializationException(); 
     } 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonDeserializer<T> implements Deserializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 
    private Class<T> type; 

    /* 
    * Default constructor needed by kafka 
    */ 
    public JsonDeserializer() { 

    } 

    public JsonDeserializer(Class<T> type) { 
     this.type = type; 
    } 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public void configure(Map<String, ?> map, boolean arg1) { 
     if (type == null) { 
      type = (Class<T>) map.get("type"); 
     } 

    } 

    @Override 
    public T deserialize(String undefined, byte[] bytes) { 
     if (bytes == null || bytes.length == 0) { 
      return null; 
     } 

     try { 
      String s = new String(bytes); 
      System.out.println("The erreneous string is " + s + " " 
        + "The length is " + s.length()); 
      System.out.println("The type is " + type); 
      return (T) om.readValue(bytes, type); 
     } catch (Exception e) { 
      throw new SerializationException(e); 
     } 
    } 

    protected Class<T> getType() { 
     return type; 
    } 

} 

Les données JSON ressemble ci-dessous

dgerssam0, f1d0d29a-f067-45a1-b753-e3d1e8e3d32f, Guinée, développeur III

madamou1, cf8c06c7-bff1-47ce-944f-0f1975aa5e73, Portugal, assistant physique

correspondant POJO

package com.kafka.api.models; 

public class Person { 

    private String name; 
    private String personalID; 
    private String country; 
    private String occupation; 

    public Person(){ 

    } 

    public String getName() { 
     return name; 
    } 
    public void setName(String name) { 
     this.name = name; 
    } 
    public String getPersonalID() { 
     return personalID; 
    } 
    public void setPersonalID(String personalID) { 
     this.personalID = personalID; 
    } 
    public String getCountry() { 
     return country; 
    } 
    public void setCountry(String country) { 
     this.country = country; 
    } 
    public String getOccupation() { 
     return occupation; 
    } 
    public void setOccupation(String occupation) { 
     this.occupation = occupation; 
    } 

    @Override 
    public String toString(){ 
     return "{" + " "+"Name :" + " " + name 
        + " "+"ID :" + " " + personalID 
        + " "+"Country :" + " " + country 
        + " "+"Occupation :" + " " + occupation 
        + 
       "}"; 
    } 
} 

Mais pendant désérialisation JSON je suis face à un problème bizarre

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) 
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) 
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:52) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 

La raison pour laquelle je trouve étrange est le stacktace dit que la classe attend une valeur booléenne, mais ni mon Daya ni mon POJO a des données booléennes. J'ai vérifié sur Internet mais je n'ai pas trouvé la réponse et je suis incapable de comprendre où le code se passe mal.

Répondre

0

Le problème est résolu après avoir rendu la classe POJO sérialisable et ajouté des annotations jackson à la classe.

package com.kafka.api.models; 

import java.io.Serializable; 

import com.fasterxml.jackson.annotation.JsonCreator; 
import com.fasterxml.jackson.annotation.JsonProperty; 
import com.fasterxml.jackson.annotation.JsonRootName; 

@JsonRootName("person") 
public class Person implements Serializable { 

    /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 
    private String name; 
    private String personalID; 
    private String country; 
    private String occupation; 

    public Person() { 

    } 

    @JsonCreator 
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID, 
      @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){ 
     this.name= name; 
     this.personalID = personalID; 
     this.country = country; 
     this.occupation = occupation; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public String getPersonalID() { 
     return personalID; 
    } 

    public void setPersonalID(String personalID) { 
     this.personalID = personalID; 
    } 

    public String getCountry() { 
     return country; 
    } 

    public void setCountry(String country) { 
     this.country = country; 
    } 

    public String getOccupation() { 
     return occupation; 
    } 

    public void setOccupation(String occupation) { 
     this.occupation = occupation; 
    } 

    @Override 
    public String toString() { 
     return "{" + " " + "Name :" + " " + name + " " + "ID :" + " " 
       + personalID + " " + "Country :" + " " + country + " " 
       + "Occupation :" + " " + occupation + "}"; 
    } 
}