1

J'essaie de comprendre comment extraire toutes les données d'une requête initialement, puis modifie de manière incrémentielle uniquement en utilisant le connecteur kafka. La raison en est que je veux charger toutes les données dans la recherche élastique, puis garder es en synchronisation avec mes flux kafka. Actuellement, je fais cela en utilisant d'abord le connecteur avec mode = bulk, puis je le change en timestamp. Cela fonctionne bien. Cependant, si nous voulons recharger toutes les données vers les Streams et les ES, cela signifie que nous devons écrire des scripts qui nettoient ou suppriment les flux kafka et les données des index, modifient les ini de connexion en mode set, redémarrez tout, donnez-lui le temps de charger toutes ces données, puis modifiez à nouveau les scripts en mode horodatage, puis redémarrez le tout une fois de plus (la raison en est que les mises à jour groupées arrivent à corriger les données historiques via un processus etl encore avoir le contrôle, et ce processus ne met pas à jour les horodateurs)Kafka JDBC connecteur charger toutes les données, puis incrémentiel

Est-ce que quelqu'un fait quelque chose de similaire et a trouvé une solution plus élégante?

Répondre

0

revenant à cela après une longue période. La façon dont a pu résoudre cela et ne jamais avoir à utiliser le mode en vrac

  1. connecteurs d'arrêt
  2. lingette fichiers décalage pour chaque connecteur jvm
  3. (en option) si vous voulez faire une lingette et la charge complète, vous voulez probablement aussi supprimer vos sujets utiliser le kafka/connecter utils/rest api (et ne pas oublier les sujets d'état)
  4. redémarrer se connecte.
0

comment récupérer toutes les données d'une requête initialement, puis change de façon incrémentielle uniquement en utilisant le connecteur kafka.

Peut-être que cela pourrait vous aider. Par exemple, j'ai une table:

╔════╦═════════════╦═══════════╗ 
║ Id ║ Name  ║ Surname ║ 
╠════╬═════════════╬═══════════╣ 
║ 1 ║ Martin  ║ Scorsese ║ 
║ 2 ║ Steven  ║ Spielberg ║ 
║ 3 ║ Christopher ║ Nolan  ║ 
╚════╩═════════════╩═══════════╝ 

Dans ce cas, je vais créer une vue:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS 
SELECT 0 AS EXID, ID, NAME, SURNAME 
FROM DIRECTORS WHERE ID =< 2 
UNION ALL 
SELECT ID AS EXID, ID, NAME, SURNAME 
FROM DIRECTORS WHERE ID > 2; 

Dans le fichier de propriétés pour le connecteur kafka jdbc vous pouvez utiliser:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
mode=incrementing 
incrementing.column.name=EXID 
topic.prefix= 
tasks.max=1 
name=gv-jdbc-source-connector 
connection.url= 
table.types=VIEW 
table.whitelist=EDGE_DIRECTORS 

Donc, le connecteur kafka jdbc prendra des mesures:

  1. toutes les données où EXID = 0;
  2. Il stockera dans le fichier connector.offsets la valeur de décalage = 0;
  3. La nouvelle ligne sera insérée dans la table DIRECTORS.
  4. Le connecteur JDBC Kafka exécutera: Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS et notera que EXID a été incrémenté.
  5. Les données seront mises à jour dans Kafka Streams.
+0

pas exactement ce que je demandais. actuellement im en utilisant des colonnes d'horodatage. Je dois changer le mode en vrac pour tout recharger, puis revenir à l'horodatage pour que kafka charge ensuite de manière incrémentielle ou de nouvelles données (il ajoute la requête avec un horodatage de et à partir de là).J'espérais éviter d'avoir à faire ce basculement de mode chaque fois que je veux partir d'une ardoise «propre». – mike01010