J'ai un connecteur Kafka avec le code suivant pour la méthode poll()
dans l'implémentation SourceTask.Pourquoi les métadonnées sont-elles ajoutées à la sortie de ce connecteur Kafka?
@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);
return records;
}
Si j'attache un consommateur au sujet de données, je reçois le message suivant envoyé par du connecteur:
{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
Si je publie un message directement au sujet en utilisant les commandes suivantes:
echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,
attacher au même consommateur, je reçois ce message:
foo bar
C'est ce que je m'attendais à voir comme la sortie de l'implémentation du connecteur, plutôt que le message {"schema":...
que j'ai reçu. Comment puis-je modifier l'implémentation de poll()
afin que le message soit envoyé sans que les métadonnées du schéma n'apparaissent dans la clé et la valeur du message?