2017-01-23 7 views
0

J'utilise qubole's S3 sink pour charger des données Avro dans S3 au format Parquet.L'évier Kafka Connect S3 lance IllegalArgumentException lors du chargement Avro

Dans mon application Java je crée un producteur

Properties props = new Properties(); 
props.put("bootstrap.servers", KafkaHelper.getServers()); 
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
return new KafkaProducer<byte[], byte[]>(props); 

convertir ensuite un GenericRecord en byte[] Format:

GenericRecord avroRecord = new GenericData.Record(avroSchema); 
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema); 

for (Map.Entry<String, ?> entry : map.entrySet()) { 
    String key = entry.getKey(); 
    Object value = entry.getValue(); 
    avroRecord.put(key, value); 
} 

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord)); 
producer.send(record); 

J'utilise les valeurs suivantes dans mes Kafka Connect propriétés:

key.converter=com.qubole.streamx.ByteArrayConverter 
value.converter=com.qubole.streamx.ByteArrayConverter 
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter.schemas.enable=false 

Et les options de configuration suivantes en m y propriétés de puits de fichier:

connector.class=com.qubole.streamx.s3.S3SinkConnector 
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat 

Quand je lance le connecteur je reçois le message d'erreur suivant: «java.lang.IllegalArgumentException: schéma Avro doit être un record. Je suis un peu nouveau sur Kafka Connect et je sais qu'un serveur de registre de schéma peut être configuré - mais je ne comprends pas si le récepteur a besoin du registre pour convertir les données Avro en Parquet ou si c'est le cas est une sorte de problème de formatage ou de configuration de mon côté. Quel type de format de données un "enregistrement" fait-il référence dans le contexte de cette erreur? Toute direction ou aide serait très appréciée.

Répondre

4

Le ByteArrayConverter n'effectuera aucune traduction de données: au lieu d'effectuer une sérialisation/désérialisation, il suppose que le connecteur sait gérer les données byte[] brutes. Cependant, le ParquetFormat (et en fait la plupart des formats) ne peut pas gérer uniquement les données brutes. Au lieu de cela, ils s'attendent à ce que les données soient désérialisées et structurées comme un enregistrement (que vous pouvez considérer comme une structure C, un POJO, etc.).

Notez que le fichier README de qubole streamx indique que ByteArrayConverter est utile dans les cas où vous pouvez copier les données en toute sécurité. Des exemples seraient si vous avez les données comme JSON ou CSV. Ceux-ci n'ont pas besoin de désérialisation car les octets de la valeur de chaque enregistrement Kafka peuvent simplement être copiés dans le fichier de sortie. C'est une bonne optimisation dans ces cas, mais pas généralement applicable à tous les formats de fichiers de sortie.

+0

Merci pour l'explication - J'ai été en mesure d'élaborer une solution en utilisant 'io.confluent.kafka.serializers.KafkaAvroSerializer'. Dans l'intérêt de la question pourriez-vous élaborer sur votre réponse et offrir une solution en plus de l'explication du problème? – kellanburket