0

J'ai un connecteur Kafka avec le code suivant pour la méthode poll() dans l'implémentation SourceTask.Pourquoi les métadonnées sont-elles ajoutées à la sortie de ce connecteur Kafka?

@Override 
public List<SourceRecord> poll() throws InterruptedException 
{ 
    SomeType item = mQueue.take(); 
    List<SourceRecord> records = new ArrayList<>(); 
    SourceRecord[] sourceRecords = new SourceRecord[]{ 
     new SourceRecord(null, null, "data", null, 
         Schema.STRING_SCHEMA, "foo", 
         Schema.STRING_SCHEMA, "bar") 
    }; 
    Collections.addAll(records, sourceRecords); 

    return records; 
} 

Si j'attache un consommateur au sujet de données, je reçois le message suivant envoyé par du connecteur:

{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} 

Si je publie un message directement au sujet en utilisant les commandes suivantes:

echo -e 'foo,bar' > /tmp/test_kafka.txt 
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=, 

attacher au même consommateur, je reçois ce message:

foo bar 

C'est ce que je m'attendais à voir comme la sortie de l'implémentation du connecteur, plutôt que le message {"schema":... que j'ai reçu. Comment puis-je modifier l'implémentation de poll() afin que le message soit envoyé sans que les métadonnées du schéma n'apparaissent dans la clé et la valeur du message?

Répondre

1

Ok, se révèle c'était juste parce que j'avais les lignes suivantes dans connect-standalone.properties

key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 

j'aurais dû

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 

En tant que solution de rechange, je suis aussi capable de changer les éléments suivants réglage de vrai à faux

value.converter.schemas.enable=false 

Puis dans mon processeur classe I cha a mis le code à:

SourceRecord[] sourceRecords = new SourceRecord[]{ 
    new SourceRecord(null, null, "data", null, 
        Schema.STRING_SCHEMA, "foo", 
        null, "bar") 
}; 

Cela diffère car je ne spécifie plus de schéma pour la valeur.