2017-10-04 8 views
1

le consommateur de kafka susmentionné qui devrait consommer à partir d'un seul sujet. Je ne peux pas utiliser botte de printemps comme je suis l'intégration api des consommateurs kafka avec une application web de base de printemps ..La méthode Kafka Listener n'est pas appelée. Le consommateur ne consomme pas.

configuration xml ressort est comme suit

<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties"> 
    <constructor-arg type="java.lang.String" value="127.0.0.1:9092" /> 
    <constructor-arg type="java.lang.String" value="tdm-group" /> 
    <constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" /> 
</bean> 
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig"> 
    <property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" /> 
</bean> 
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory" 
    factory-method="kafkaContainerFactory"> 
</bean> 

Ceci est la classe qui crée le ListenerContainerFactory

@EnableKafka 
public class KafkaListenerContainerFactory { 

public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(1); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.getContainerProperties().setPollTimeout(3000); 
    return factory; 
} 

@SuppressWarnings("unchecked") 
public static ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(), 
      KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer()); 
} 

} 

Ceci est mon classe Listener annotés avec @KafkaListener

package com.azuga.kafka.listeners; 

import org.springframework.kafka.annotation.KafkaListener; 
public class Listener { 

@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping") 
public void onMessage(String message) { 
    System.out.println(message.toString()); 
} 
} 

C'est la classe KafkaListenerConfig qui prend dans les serveurs bootstrap, les noms de sujet, etc.

@EnableKafka 
public class KafkaListenerConfig { 

private static KafkaConsumerProperties kafkaConsumerProperties; 

public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) { 
    this.kafkaConsumerProperties = kafkaConsumerProperties; 
} 

public static Map<String, Object> consumerProps() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap()); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup()); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    return props; 
} 

public static Deserializer stringKeyDeserializer() { 
    return new StringDeserializer(); 
} 

} 

Répondre

1

Vous avez un peu de configuration inhabituelle pour votre application.

Cependant, je suppose que vous manquez le fait que @EnableKafka est environ @Configuration classe. Par conséquent, selon la documentation Spring Framework, vous devez utiliser AnnotationConfigWebApplicationContext classe:

* {@link org.springframework.web.context.WebApplicationContext WebApplicationContext} 
* implementation which accepts annotated classes as input - in particular 
* {@link org.springframework.context.annotation.Configuration @Configuration}-annotated 
* classes, but also plain {@link org.springframework.stereotype.Component @Component} 
* classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows 
* for registering classes one by one (specifying class names as config location) as well 
* as for classpath scanning (specifying base packages as config location). 

Malheureusement, cela ne va pas travailler avec une configuration XML tout simplement.

Spring Kafka ne fournit aucun point d'ancrage pour les définitions XML.

+0

Merci pour la réponse rapide. Je n'étais pas au courant que ça ne marcherait pas avec xml config. Cependant, j'ai mis en œuvre en utilisant des annotations et cela a fonctionné comme un charme – Sabya