J'utilise qubole's S3 sink pour charger des données Avro dans S3 au format Parquet.L'évier Kafka Connect S3 lance IllegalArgumentException lors du chargement Avro
Dans mon application Java je crée un producteur
Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);
convertir ensuite un GenericRecord
en byte[]
Format:
GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
for (Map.Entry<String, ?> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
avroRecord.put(key, value);
}
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);
J'utilise les valeurs suivantes dans mes Kafka Connect propriétés:
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
Et les options de configuration suivantes en m y propriétés de puits de fichier:
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
Quand je lance le connecteur je reçois le message d'erreur suivant: «java.lang.IllegalArgumentException: schéma Avro doit être un record. Je suis un peu nouveau sur Kafka Connect et je sais qu'un serveur de registre de schéma peut être configuré - mais je ne comprends pas si le récepteur a besoin du registre pour convertir les données Avro en Parquet ou si c'est le cas est une sorte de problème de formatage ou de configuration de mon côté. Quel type de format de données un "enregistrement" fait-il référence dans le contexte de cette erreur? Toute direction ou aide serait très appréciée.
Merci pour l'explication - J'ai été en mesure d'élaborer une solution en utilisant 'io.confluent.kafka.serializers.KafkaAvroSerializer'. Dans l'intérêt de la question pourriez-vous élaborer sur votre réponse et offrir une solution en plus de l'explication du problème? – kellanburket