1

Je construis un consommateur de kafka. J'ai placé le rappel de rétablissement semblable à ci-dessous. J'ai activé la validation manuelle. Comment puis-je reconnaître le message dans la méthode de rappel de récupération afin qu'il n'y ait pas de décalage.Printemps Kafka consommateur - validation manuelle avec mécanisme de rappel de récupération

@Bean 
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConcurrency(conncurrency); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setRetryTemplate(retryTemplate()); 
       factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      return null; 
     } 
    }); 
     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

Répondre

2

Votre question est trop large. Tu dois être plus précis.

Il n'y a aucune hypothèse dans le cadre que vous pourriez faire en cas de nouvelle tentative d'épuisement pendant les erreurs de consommation.

Je pense que vous devriez commencer par le projet Spring Retry pour comprendre ce que RecoveryCallback du tout et comment cela fonctionne:

Si la logique métier ne parvient pas avant le modèle décide d'abandonner, le client est donné la possibilité de faire un traitement alternatif à travers le rappel de récupération.

A RetryContext a:

/** 
* Accessor for the exception object that caused the current retry. 
* 
* @return the last exception that caused a retry, or possibly null. It will be null 
* if this is the first attempt, but also if the enclosing policy decides not to 
* provide it (e.g. because of concerns about memory usage). 
*/ 
Throwable getLastThrowable(); 

Aussi printemps Kafka remplit d'autres attributs que RetryContext à traiter dans les RecoveryCallback: https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

Le contenu du RetryContext passé dans le RecoveryCallback sera dépend du type d'écouteur. Le contexte aura toujours un attribut record qui est l'enregistrement pour lequel la défaillance s'est produite. Si votre interlocuteur est conscient et/ou conscient du consommateur, des attributs supplémentaires acknowledgment et/ou consumer seront disponibles. Pour plus de commodité, le RetryingAcknowledgingMessageListenerAdapter fournit des constantes statiques pour ces clés. Voir ses javadocs pour plus d'informations.

+0

Merci Artem. Basé sur la dernière déclaration, je suppose que je serais en mesure d'obtenir l'attribut de reconnaissance de retrycontext. Comment cela pourrait-il être récupéré? –

+0

'(Accusé de réception) retryContext.getAttribute (RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)' –

+0

Merci beaucoup! –