0

J'utilise confluentes pour connecter mon DB et ES se exception comme:Confluent kafka connecter le document ElasticSearch création d'ID

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id. 
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75) 
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84) 
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210) 
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) 
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Ma configuration dans kafka-connect-JDBC est:.

name=task-view-list-stage 
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test 
table.types=TABLE 
query=select * from employee_master 
mode=timestamp+incrementing 
incrementing.column.name=employee_master_id 
timestamp.column.name=modified_date 
validate.non.null=false 
topic.prefix=my-id-app 

Et mon kafka-connect configuration ElasticSearch est:

name=es-id-view 
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector 
tasks.max=1 
topics=my-id-app 
topics.key.ignore=false 
transforms=InsertKey 
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey 
transforms.InsertKey.fields=employee_master_id 
connection.url=http://localhost:9200 
type.name=type_id 

Ma structure de table est:

employee_master_id | emp_name | modified_date 
----------------------------------------------------------- 
1     | Bala | "2017-05-18 11:51:46.721182+05:30" 
------------------------------------------------------------------- 
2     | murugan | "2017-05-21 15:59:11.443901+05:30" 
------------------------------------------------------------------- 

S'il vous plaît aidez-moi à résoudre ce problème

+0

Pour d'autres qui traversent ce sujet, voir discussion liés: https://groups.google.com/d/topic/confluent-platform/2jaRg-oT -p4/discussion –

Répondre

2

En plus comme ValueToKey vous avez besoin de ExtractField pour convertir la clé d'un objet à un champ simple:

transforms=InsertKey,ExtractId 
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey 
transforms.InsertKey.fields=employee_master_id  
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key 
transforms.ExtractId.field=employee_master_id 
+0

Dommage que OP n'ait pas approuvé cela. Cela m'a aidé! –