2017-07-13 2 views
9

J'utilise Spring-Kafka version 1.2.1 et, lorsque le serveur Kafka est en panne/inaccessible, le bloc asynchrone envoie des appels pendant un certain temps. Il semble que ce soit le timeout TCP. Le code est quelque chose comme ceci:Printemps Kafka envoyer asynchrone appels bloc

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); 
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { 
    @Override 
    public void onSuccess(SendResult<K, V> result) { 
     ... 
    } 

    @Override 
    public void onFailure(Throwable ex) { 
     ... 
    } 
}); 

J'ai pris un coup d'oeil très rapide au Code printemps-Kafka et il semble juste passer la tâche le long de la bibliothèque client kafka, traduisant une interaction de rappel à un avenir interaction d'objet. En regardant la bibliothèque cliente de kafka, le code devient plus complexe et je n'ai pas pris le temps de tout comprendre, mais je suppose que cela peut être faire des appels à distance (métadonnées, au moins?) Dans le même thread.

En tant qu'utilisateur, je m'attendais à ce que les méthodes de Spring-Kafka qui retournent un futur reviennent immédiatement, même si le serveur kafka distant est inaccessible.

Toute confirmation si ma compréhension est erronée ou s'il s'agit d'un bug serait la bienvenue. J'ai fini par le rendre asynchrone de mon côté pour le moment.

Un autre problème est que la documentation de Spring-Kafka indique, au début, qu'elle fournit des méthodes d'envoi synchrones et asynchrones. Je n'ai pas trouvé de méthodes qui ne renvoient pas d'avenir, peut-être que la documentation doit être mise à jour.

Je suis heureux de fournir plus de détails si nécessaire. Merci.

Répondre

1

Juste pour être sûr. Avez-vous l'annotation @EnableAsync appliquée? Je veux dire que cela pourrait être la clé de la spécification du comportement de Future <>

+0

Merci pour votre réponse. Non, je n'utilise pas cette annotation, il n'y avait rien dans la documentation. Je vais l'essayer et vous laisser savoir si cela résout le problème. –

+0

L'utilisation de @EnableAsync n'a malheureusement rien changé =/ –

4

En plus de l'annotation @EnableAsync sur une classe de configuration, l'annotation @Async doit être utilisée sur la méthode lorsque vous appelez ce code.

http://www.baeldung.com/spring-async

Voici quelques Fragements de code. Kafka config producteur:

@EnableAsync 
@Configuration 
public class KafkaProducerConfig { 

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); 

    @Value("${kafka.brokers}") 
    private String servers; 

    @Bean 
    public Map<String, Object> producerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     return props; 
    } 

    @Bean 
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { 
     return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); 
    } 

    @Bean 
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { 
     return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); 
    } 

    @Bean 
    public Producer producer() { 
     return new Producer(); 
    } 
} 

Et le producteur lui-même:

public class Producer { 

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); 

    @Autowired 
    private KafkaTemplate<String, GenericMessage> kafkaTemplate; 

    @Async 
    public void send(String topic, GenericMessage message) { 
     ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); 
     future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { 

      @Override 
      public void onSuccess(final SendResult<String, GenericMessage> message) { 
       LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(final Throwable throwable) { 
       LOGGER.error("unable to send message= " + message, throwable); 
      } 
     }); 
    } 
} 
+0

Merci pour votre réponse. Non, je n'utilise pas ces annotations, il n'y avait rien à leur sujet dans la documentation. Je vais essayer les deux et vous faire savoir si cela résout le problème. –

+0

L'utilisation de EnableAsync n'a malheureusement rien changé. Aussi, à partir du lien je comprends que c'est la bibliothèque spring-kafka qui devrait utiliser l'annotation Async, car elle me fournit le futur objet. –

+0

Je suis d'accord avec vous, pour moi, cela n'a pas de sens que vous fournissiez des contrats à terme mais je dois quand même placer les annotations. Dans notre cas, en plaçant ces deux annotations, cela a fonctionné comme un charme. Je vais éditer la réponse en ajoutant quelques fragments de code. –