1

Je construis un kafka de printemps. J'ai mis en place un nouveau mécanisme. Après que les tentatives sont épuisées, je voudrais pousser le message échoué à un sujet de lettre morte.Kafka Consumer - Récupère les paramètres reçus par l'écouteur dans la méthode de récupération

La méthode a écouter les paramètres ci-dessous

public void listen(@Payload Map<String, Object> conciseMap, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     Acknowledgment ack) throws JsonProcessingException { 

Dans le cadre de la méthode de récupérer, je veux aller chercher le conciseMap passé comme entrée à la carte d'auditeur ou le message d'origine qui a été reçu par mon sujet. Y a-t-il un moyen de le faire?

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(conncurrency); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setRetryTemplate(retryTemplate()); 
    factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge(); 

      return null; 
     } 
    }); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

Répondre

1

Vous ne pouvez pas obtenir ce converti conciseMap dans le RetryContext de RecoveryCallback, mais vous pouvez récupérer un ConsumerRecord qui est un original du sujet avant la conversion:

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD) 
+0

Merci Artem. Est-ce que ConsumerRecord.value fournit les octets que nous obtenons dans la méthode de l'écouteur? –

+0

M-m-m. Pourrait être. C'est le résultat du 'Désérialiseur 'fourni –