2017-07-28 1 views
0

J'ai une application basée sur micro service qui lit les messages d'un sujet Kafka. Lorsque le service est arrêté, si des messages sont écrits sur le sujet, je souhaite que le consommateur lise ces messages lorsqu'il sera opérationnel la prochaine fois. Mais il me manque tous les messages quand le service était en panne. Comment puis-je obtenir que le consommateur lise les messages qui n'ont pas été lus lorsque le service était arrêté? Je reçois tous les messages lorsque mon micro-service était en service et que tous les messages revenaient sur le sujet.Spring Cloud Stream avec Kafka - message ne pas être lu après le redémarrage du consommateur

Mes application.properties:

spring.cloud.stream.bindings.input.destination=test 
spring.cloud.stream.bindings.input.consumer.headerMode=raw 
spring.cloud.stream.bindings.input.consumer.startOffset=latest 
spring.cloud.stream.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.bindings.input.consumer.instanceCount=3 
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false 

// ceci est mon code de la consommation sous mon service micro racine dir

@EnableBinding(Sink.class) 
public class Consumer { 
    @ServiceActivator(inputChannel = Sink.INPUT) 
    public void consoleSink(Object payload){ 
     logger.info("Type: "+ payload.getClass() + " which is byte array"); 
     logger.info("Payload: " + new String((byte[])payload)); 
    } } 

J'apprécie toute idée pour résoudre ce problème.

Répondre

0

La définition des propriétés ci-dessous m'a aidé à résoudre mon problème.

spring.cloud.stream.bindings.input.destination=test 
spring.cloud.stream.bindings.input.consumer.headerMode=raw 
spring.cloud.stream.bindings.input.consumer.startOffset=latest 
spring.cloud.stream.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.bindings.input.consumer.instanceCount=3 
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false 
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false 
spring.cloud.stream.kafka.binder.autoCreateTopics=false 
spring.cloud.stream.bindings.input.group=testGroup50 
spring.cloud.stream.bindings.input.partitioned=false 

Merci,

BR