2017-07-10 3 views
0

J'ai remarqué que les documents indexés dans elasticsearch à l'aide du connecteur kafka elasticsearch ont leurs identifiants au format topic+partition+offset.Utilisation des ID générés par elasticsearch dans le connecteur kafka elasticsearch

Je préférerais utiliser des IDs générés par elasticsearch. Il semble que topic+partition+offset n'est généralement pas unique, donc je suis loosing data.

Comment puis-je changer cela?

+0

Pouvez-vous expliquer votre cas d'utilisation plus? Utilisez-vous le mode analytique ou valeur-clé du connecteur? Le 'topic-partition-offset' devrait être unique pour chaque enregistrement et le connecteur publie à Elastic chaque enregistrement qui vient. Quelles données perdez-vous? Le forum que vous liez ne dit pas ce que vous perdez. – Phil

Répondre

0

Comme dit Phil dans les commentaires - topic-partition-offset devrait être unique, donc je ne vois pas comment cela provoque une perte de données pour vous. Peu importe - vous pouvez laisser le connecteur générer la clé (comme vous le faites), ou vous pouvez définir la clé vous-même (key.ignore=false). Il n'y a pas d'autre option.

Vous pouvez utiliser Single Message Transformations avec Kafka Connect pour dériver une clé à partir des champs de vos données. Sur la base de votre message dans le forum Elasticsearch, il semble qu'il y ait un id dans vos données - si cela est unique, vous pouvez le définir comme votre clé, et donc comme votre ID de document Elasticsearch. Voici un exemple de définir une clé avec SMT:

# Add the `id` field as the key using Simple Message Transformations 
transforms=InsertKey, ExtractId 

# `ValueToKey`: push an object of one of the column fields (`id`) into the key 
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey 
transforms.InsertKey.fields=id 

# `ExtractField`: convert key from an object to a plain field 
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key 
transforms.ExtractId.field=id 

(via https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/)

+0

Merci @ robin-moffat, je n'utilise pas de clé dans mes messages. J'ai essayé quelques [transformations] (https://kafka.apache.org/documentation/#connect_transforms) sans succès. J'ai essayé les transformations TimestampRouter et RegexpRouter. J'utilise maintenant [datamountaineer elastic connector] (https://github.com/datamountaineer/stream-reactor/) car il stocke avec les ID générés par elasticsearch et je ne perds plus de données. Cela pourrait poser un problème avec ma configuration kafka. –

0

@Robin Moffatt, autant que je le vois, topic-partition-offset peut générer des doublons en cas de mettre à niveau votre cluster de kafka, mais pas dans la mise à niveau de roulement, mais juste remplacer le cluster avec le cluster (qui est parfois plus facile à remplacer). Dans ce cas, vous risquez de perdre des données en raison de l'écrasement des données.

En ce qui concerne votre excellent exemple, cela peut être la solution pour beaucoup de cas, mais j'ajouterais une autre option. Peut-être que vous pouvez ajouter l'élément epoc timestamp au topic-partition-offset donc ce sera comme ceci topic-partition-offset-current_timestamp.

Qu'en pensez-vous?

+0

Comment ajouteriez-vous l'horodatage? en utilisant des transformations? –

+0

Je l'ajouterais en dur à la valeur par défaut 'topic-partition-offset'. Mais, si nous pouvons l'ajouter manuellement, cela peut résoudre cela aussi. Mais il faut se souvenir que l'identifiant doc ne sera plus unique, donc on risque de perdre la sémantique une seule fois. – davidM

+0

Nous sommes probablement en train de diverger de la question originale ici, et il est possible que votre article soit mieux ajouté en tant que commentaire que réponse réelle. Je suis d'accord que vous pourriez obtenir des dups, mais pas en fonctionnement normal - et si on va produire quelque chose comme ça, alors on est responsable de la gestion des clés directement, si elles sont importantes pour l'application à portée de main. –