2017-07-09 3 views
0

J'utilise les flux Kafka et je veux réinitialiser certains décalages de consommation de Java au début. KafkaConsumer.seekToBeginning(...) sonne comme la bonne chose à faire, mais je travaille avec Kafka Streams:Réinitialiser le décalage du consommateur au début à partir de Kafka Streams

KafkaStreams streams = new KafkaStreams(builder, props); 
... 
streams.start(); 

je suppose que, selon le béton flux pipeline je définis cela créerait plusieurs consommateurs sous le capot. Puis-je avoir accès à ceux-ci? Ou existe-t-il un autre moyen de réinitialiser les offsets par programme?

Répondre

1

Étant donné que vous utilisez Kafka Streams, vous devez réinitialiser non seulement les décalages de consommation, mais également le magasin d'état interne de Streams.

Heureusement, il existe un outil de réinitialisation d'application Streams fourni avec Kafka.

Voir https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

+1

Voilà ce que je sais déjà, voir ci-dessus ... Le mot clé ici est par programme. – micgn

+0

Bien l'outil de réinitialisation est un programme et il est open source donc la réponse est oui, vous pouvez réinitialiser par programme les offsets de consommateurs et les magasins d'état de flux. –

0

bâtiment sur Hans Jespersens réponse, j'ai utilisé avec succès ce code pour faire ce que le script fait dans le code Java:

import kafka.tools.StreamsResetter; 

StreamsResetter resetter = new StreamsResetter(); 
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER}; 
resetter.run(args); 

La classe fait partie de la bibliothèque de base de kafka que j'importé dans maven en utilisant:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.12</artifactId> 
     <version>${kafka.version}</version> 
    </dependency>