2017-10-21 58 views
0

J'essaie de créer un connecteur d'évier Kafka en utilisant le s predfast s3 connector. Cependant, pour une raison quelconque, la sortie du journal signale une SourceConnectorConfig:Pourquoi SourceConnectorConfig est-il signalé pour le connecteur sink?

INFO ConnectorConfig values: 
     connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector 
     key.converter = null 
     name = transactions-s3-sink 
     tasks.max = 1 
     transforms = null 
     value.converter = class org.apache.kafka.connect.storage.StringConverter 
(org.apache.kafka.connect.runtime.ConnectorConfig:180) 
INFO Creating connector transactions-s3-sink of type com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178) 
INFO Instantiated connector transactions-s3-sink with version 0.0.1 of type class com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181) 
INFO Finished creating connector transactions-s3-sink (org.apache.kafka.connect.runtime.Worker:194) 
INFO SourceConnectorConfig values: 
     connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector 
     key.converter = null 
     name = transactions-s3-sink 
     tasks.max = 1 
     transforms = null 
     value.converter = class org.apache.kafka.connect.storage.StringConverter 
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180) 
INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824) 
... 
INFO Sink task WorkerSinkTask{id=transactions-s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232) 

Pourquoi un SinkConnectorConfig rapporté encore plus loin dans la sortie du journal, je peux voir un WorkerSinkTask a été créé?

Répondre

1

La raison est que ce connecteur étend Connector classe abstraite au lieu de SinkConnector classe abstraite de l'API de Connect (voir le code source here). Ainsi, Connect framework ne peut pas dire si ce connecteur est une source ou un puits, et la logique dans le code est que si ce n'est pas un puits, supposons qu'il s'agit d'une source. C'est pourquoi vous rencontrez cette incohérence. La solution consiste à étendre la classe abstraite appropriée (ici org.apache.kafka.connect.sink.SinkConnector)