2017-09-03 4 views
-3

J'utilise kafka_2.10-0.10.0.1 et scala_2.10.3. Je veux écrire Serializer et Deserializer personnalisé en utilisant scala. J'ai essayé avec ces sérialiseur (de CustomType) et désérialiseur (obtenir un CustomType):Créer un sérialiseur et un désérialiseur personnalisés en kafka en utilisant scala

class CustomTypeSerializer extends Serializer[CustomType] { 
    private val gson: Gson = new Gson() 

    override def configure(configs: util.Map[String, _], isKey: Boolean): 
    Unit = { 
     // nothing to do 
    } 

    override def serialize(topic: String, data: CustomType): Array[Byte] = { 
     if (data == null) 
     null 
     else 
     gson.toJson(data).getBytes 
    } 

    override def close(): Unit = { 
     //nothing to do 
    } 
    } 

    class CustomTypeDeserializer extends Deserializer[CustomType] { 
    private val gson: Gson = new Gson() 

    override def deserialize(topic: String, bytes: Array[Byte]): CustomType = { 
     val offerJson = gson.toJson(bytes.toString) 
     val psType: Type = new TypeToken[CustomType]() {}.getType() 
     val ps: CustomType = gson.fromJson(offerJson, psType) 
     ps 
    } 

    override def configure(configs: util.Map[String, _], isKey: Boolean): 
    Unit = { 
     // nothing to do 
    } 

    override def close(): Unit = { 
     //nothing to do 
    } 
    } 

Mais, je suis arrivé cette erreur:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic_0_1-1 at offset 26 
Caused by: com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was BEGIN_ARRAY at line 1 column 2 path $ 
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:224) 
at com.google.gson.Gson.fromJson(Gson.java:887) 
at com.google.gson.Gson.fromJson(Gson.java:852) 
at com.google.gson.Gson.fromJson(Gson.java:801) 
at kafka.PSDeserializer.deserialize(PSDeserializer.scala:24) 
at kafka.PSDeserializer.deserialize(PSDeserializer.scala:18) 
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:627) 
at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:548) 
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) 
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) 

Pouvez-vous me aider s'il vous plaît

+0

Qu'avez-vous essayé? Quelles sont vos exigences? Où êtes-vous coincé? – pedromss

+0

J'ai trouvé des exemples java mais pas dans scala. Si vous êtes conscient d'écrire un exemple en utilisant scala, s'il vous plaît ne partager – DaliMidou

+0

Votre désérialisateur attend un objet mais le message est un tableau – pedromss

Répondre

0

Trouvez ci-dessous le sérialiseur et le désérialiseur personnalisés pour la classe de cas Utilisateur, Utilisateur (nom: Chaîne, id: Int). Remplacer l'utilisateur dans le code avec votre classe de cas. Ça va marcher.

import java.io.{ObjectInputStream, ByteArrayInputStream} 
import java.util 

import org.apache.kafka.common.serialization.{Deserializer, Serializer} 

class CustomDeserializer extends Deserializer[User]{ 

    override def configure(configs: util.Map[String,_],isKey: Boolean):Unit = { 

    } 
    override def deserialize(topic:String,bytes: Array[Byte]) = { 
    val byteIn = new ByteArrayInputStream(bytes) 
    val objIn = new ObjectInputStream(byteIn) 
    val obj = objIn.readObject().asInstanceOf[User] 
    byteIn.close() 
    objIn.close() 
    obj 
    } 
    override def close():Unit = { 

    } 

} 

import java.io.{ObjectOutputStream, ByteArrayOutputStream} 
import java.util 
import org.apache.kafka.common.serialization.Serializer 


class CustomSerializer extends Serializer[User]{ 

    override def configure(configs: util.Map[String,_],isKey: Boolean):Unit = { 

    } 


    override def serialize(topic:String, data:User):Array[Byte] = { 
    try { 
     val byteOut = new ByteArrayOutputStream() 
     val objOut = new ObjectOutputStream(byteOut) 
     objOut.writeObject(data) 
     objOut.close() 
     byteOut.close() 
     byteOut.toByteArray 
    } 
    catch { 
     case ex:Exception => throw new Exception(ex.getMessage) 
    } 
    } 

    override def close():Unit = { 

    } 


} 
+0

ça marche bien :) merci – DaliMidou