0

J'ai un problème de capture de données dans mysql avec debezium change data capture et le consomme dans un autre mysql en utilisant kafka connect jdbc sink. Parce que le schéma et la charge utile que debezium produit pour kafka topic n'est pas compatible avec le schéma attendu par le collecteur kafka connect jdbc.Comment capturer des données dans mysql avec debezium change data capture et consommer avec jdbc sink dans kafka connect?

Je reçois une exception lorsque jdbc sink veut consommer des données et créer des enregistrements dans un autre mysql.

Comment est-ce que je devrais résoudre ce problème?

Répondre

5

La structure de message produite par Debezium est en effet différente de celle attendue par le collecteur JDBC. Le récepteur JDBC s'attend à ce que chaque champ du message corresponde à un champ de la ligne et, par conséquent, le message correspond à l'état "après" de la ligne. OTOH, le Debezium MySQL connector effectue la capture de données de changement, ce qui signifie qu'il fait plus que simplement inclure le dernier état de la ligne. Plus précisément, le connecteur émet des messages avec une clé contenant des colonnes de clé primaire ou unique de la rangée et une valeur de message contenant une structure d'enveloppe avec:

  • l'opération, par exemple si elle est un insert, mise à jour ou supprimer
  • l'état de la ligne avant le changement
  • produite (null sur des inserts) l'état de la ligne après le changement est survenu (null sur les suppressions)
  • informations spécifiques à la source, y compris les métadonnées des serveurs, la transaction ID, da Les noms de tabase et de table, horodatage du serveur lors de l'événement, et des détails sur l'endroit où l'événement a été trouvé, etc.
  • horodatage auquel le connecteur a généré l'événement

La façon la plus simple de résoudre cet écart est d'utiliser Kafka 0.10.2.x (actuellement la dernière version est 0.10.2.1) et le nouveau Single Message Transforms (SMTs) de Kafka Connect. Chaque connecteur Kafka Connect peut être configuré avec des chaînes de zéro ou plusieurs SMT qui peuvent transformer la sortie des connecteurs source avant que les messages soient écrits dans Kafka, ou transformer les messages lus depuis Kafka avant qu'ils ne soient passés en entrée pour les connecteurs de puits. Les SMT sont intentionnellement très simples, traitent un seul message et ne doivent absolument pas accéder à des ressources externes ni maintenir aucun état, et ne remplacent donc pas Kafka Streams ou d'autres systèmes de traitement de flux beaucoup plus puissants, peuvent rejoindre plusieurs flux d'entrée, et peuvent effectuer des opérations très complexes et maintenir l'état sur plusieurs messages.

Si vous utilisez Kafka Streams pour tout type de traitement, vous devriez envisager de manipuler la structure des messages dans votre application Kafka Streams. Si non, alors les SMT sont un excellent moyen de résoudre votre problème. En fait, il existe deux façons d'utiliser les SMT pour ajuster la structure du message.

La première option consiste à utiliser un SMT avec le connecteur Debezium pour extraire/conserver l'état "après" de la ligne et rejeter toutes les autres informations avant qu'elles ne soient écrites dans Kafka. Bien sûr, vous stockez moins d'informations sur les sujets Kafka et vous jetez quelques informations du CDC qui pourraient être utiles à l'avenir.La seconde option préférée de l'OMI est de laisser le connecteur source tel quel et de conserver tous les messages CDC dans les rubriques Kafka, mais d'utiliser ensuite un SMT avec le connecteur du récepteur pour extraire/conserver le "après" état de la ligne et ignorer toutes les autres informations avant que le message ne soit transmis au connecteur de récepteur JDBC. Vous pouvez peut-être utiliser l'un des SMT existants inclus dans Kafka Connect, mais vous pouvez envisager d'écrire votre propre SMT pour faire exactement ce que vous voulez.

+0

Merci cher Randall pour une excellente réponse. –