J'ai écrit un sérialiseur et un désérialiseur personnalisés pour lire des chaînes json sur kafka.Problème de jeton non reconnu avec jackson api lors de la désérialisation
JSON sérialiseur et désérialiseur regarde comme ci-dessous
package com.kafka.api.serdes;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonSerializer<T> implements Serializer<T> {
private ObjectMapper om = new ObjectMapper();
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, T data) {
// TODO Auto-generated method stub
try {
return om.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException();
}
}
}
package com.kafka.api.serdes;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonDeserializer<T> implements Deserializer<T> {
private ObjectMapper om = new ObjectMapper();
private Class<T> type;
/*
* Default constructor needed by kafka
*/
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> type) {
this.type = type;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> map, boolean arg1) {
if (type == null) {
type = (Class<T>) map.get("type");
}
}
@Override
public T deserialize(String undefined, byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
try {
String s = new String(bytes);
System.out.println("The erreneous string is " + s + " "
+ "The length is " + s.length());
System.out.println("The type is " + type);
return (T) om.readValue(bytes, type);
} catch (Exception e) {
throw new SerializationException(e);
}
}
protected Class<T> getType() {
return type;
}
}
Les données JSON ressemble ci-dessous
dgerssam0, f1d0d29a-f067-45a1-b753-e3d1e8e3d32f, Guinée, développeur III
madamou1, cf8c06c7-bff1-47ce-944f-0f1975aa5e73, Portugal, assistant physique
correspondant POJO
package com.kafka.api.models;
public class Person {
private String name;
private String personalID;
private String country;
private String occupation;
public Person(){
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPersonalID() {
return personalID;
}
public void setPersonalID(String personalID) {
this.personalID = personalID;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getOccupation() {
return occupation;
}
public void setOccupation(String occupation) {
this.occupation = occupation;
}
@Override
public String toString(){
return "{" + " "+"Name :" + " " + name
+ " "+"ID :" + " " + personalID
+ " "+"Country :" + " " + country
+ " "+"Occupation :" + " " + occupation
+
"}";
}
}
Mais pendant désérialisation JSON je suis face à un problème bizarre
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [[email protected]; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: [[email protected]; line: 1, column: 11]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:52)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
La raison pour laquelle je trouve étrange est le stacktace dit que la classe attend une valeur booléenne, mais ni mon Daya ni mon POJO a des données booléennes. J'ai vérifié sur Internet mais je n'ai pas trouvé la réponse et je suis incapable de comprendre où le code se passe mal.