2017-09-11 1 views
0

J'essaie d'utiliser spring-integration-kafka-2.1.0.RELEASE dans le projet de mon entreprise.Cependant, cela ne fonctionne pas en raison de l'exception qui est énumérée ci-dessous: org .springframework.messaging.MessageDeliveryException: Dispatcher n'a aucun abonné pour le canal 'org.springframework.web.context.WebApplicationContext: /order.inputToKafka' .; L'exception imbriquée est org.springframework.integration.MessageDispatchingException: Dispatcher n'a aucun abonné.MessageDeliveryException: Dispatcher n'a pas d'abonnés

La configuration xml comme ci-dessous: CTX-kafka-producer.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:int="http://www.springframework.org/schema/integration" 
     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"> 

    <int:channel id="inputToKafka"/> 

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
             kafka-template="kafkaTemplate" 
             auto-startup="false" 
             channel="inputToKafka" 
             topic="customerpos-order"/> 


    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 
     <constructor-arg index="0" ref="producerFactory"/> 
    </bean> 

    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
     <constructor-arg> 
      <map> 
       <entry key="bootstrap.servers" value="127.0.0.1:9092"/> 
       <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> 
       <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> 
       <!--<entry key="acks" value="${acks}"/>--> 
       <!--<entry key="buffer.memory" value="${buffer.memory}"/>--> 
       <!--<entry key="compression.type" value="${compression.type}"/>--> 
       <!--<entry key="retries" value="${retries}"/>--> 
       <!--<entry key="batch.size" value="${batch.size}"/>--> 
       <!--<entry key="max.block.ms" value="${max.block.ms}"/>--> 
       <!--<entry key="max.request.size" value="${max.request.size}"/>--> 
       <!--<entry key="partitioner.class" value="${partitioner.class}"/>--> 
       <!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>--> 
       <!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>--> 
       <!--<entry key="security.protocol" value="${security.protocol}"/>--> 
       <entry key="send.buffer.bytes" value="${send.buffer.bytes}"/> 
       <!--<entry key="ssl.protocol" value="${ssl.protocol}"/>--> 
       <!--<entry key="ssl.truststore.type" value="${ssl.truststore.type}"/>--> 
       <!--<entry key="timeout.ms" value="${timeout.ms}"/>--> 
       <!--<entry key="block.on.buffer.full" value="${block.on.buffer.full}"/>--> 
       <!--<entry key="max.in.flight.requests.per.connection" value="${max.in.flight.requests.per.connection}"/>--> 
       <!--<entry key="metadata.fetch.timeout.ms" value="${metadata.fetch.timeout.ms}"/>--> 
       <!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>--> 
       <!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>--> 
       <!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>--> 
       <!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>--> 
       <!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>--> 
       <!--<entry key="sasl.kerberos.min.time.before.relogin" value="${sasl.kerberos.min.time.before.relogin}"/>--> 
       <!--<entry key="sasl.kerberos.ticket.renew.jitter" value="${sasl.kerberos.ticket.renew.jitter}"/>--> 
       <!--<entry key="sasl.kerberos.ticket.renew.window.factor" value="${sasl.kerberos.ticket.renew.window.factor}"/>--> 
       <!--<entry key="ssl.keymanager.algorithm" value="${ssl.keymanager.algorithm}"/>--> 
       <!--<entry key="linger.ms" value="1"/>--> 
      </map> 
     </constructor-arg> 
    </bean> 

    <!--<bean id="kafkaProducer" class="com.angho.cloud.manager.kafka.impl.KafkaProducerImpl"/>--> 
</beans> 

CTX-kafka-consumer.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
     xmlns:int="http://www.springframework.org/schema/integration" 
     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"> 

    <int-kafka:message-driven-channel-adapter id="kafkaListener" 
               listener-container="container1" 
               auto-startup="true" 
               phase="100" 
               send-timeout="5000" 
               channel="inputFromChannel" 
               error-channel="errorChannel"/> 

    <int:service-activator input-channel="inputFromChannel" ref="kafkaOrderConsumer"/> 

    <!--<bean id="kafkaOrderConsumer" class="com.angho.cloud.manager.kafka.KafkaOrderConsumer"/>--> 

    <bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
     <constructor-arg> 
      <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
       <constructor-arg> 
        <map> 
         <entry key="bootstrap.servers" value="127.0.0.1:9092"/> 
         <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 
         <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 
         <!--<entry key="fetch.min.bytes" value="${fetch.min.bytes}"/>--> 
         <entry key="group.id" value="customerpos-order"/> 
         <!--<entry key="heartbeat.interval.ms" value="${heartbeat.interval.ms}"/>--> 
         <!--<entry key="max.partition.fetch.bytes" value="${max.partition.fetch.bytes}"/>--> 
         <entry key="session.timeout.ms" value="15000"/> 
         <!--<entry key="auto.offset.reset" value="${auto.offset.reset}"/>--> 
         <!--<entry key="connections.max.idle.ms" value="${connections.max.idle.ms}"/>--> 
         <entry key="enable.auto.commit" value="false"/> 
         <!--<entry key="exclude.internal.topics" value="${exclude.internal.topics}"/>--> 
         <!--<entry key="fetch.max.bytes" value="${fetch.max.bytes}"/>--> 
         <!--<entry key="max.poll.interval.ms" value="${max.poll.interval.ms}"/>--> 
         <!--<entry key="max.poll.records" value="${max.poll.records}"/>--> 
         <!--<entry key="partition.assignment.strategy" value="${partition.assignment.strategy}"/>--> 
         <!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>--> 
         <!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>--> 
         <entry key="auto.commit.interval.ms" value="100"/> 
         <!--<entry key="fetch.max.wait.ms" value="${fetch.max.wait.ms}"/>--> 
         <!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>--> 
         <!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>--> 
         <!--<entry key="metrics.recording.level" value="${metrics.recording.level}"/>--> 
         <!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>--> 
         <!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>--> 
         <!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>--> 
        </map> 
       </constructor-arg> 
      </bean> 
     </constructor-arg> 
     <constructor-arg> 
      <bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
       <constructor-arg name="topics" value="customerpos-order"/> 
      </bean> 
     </constructor-arg> 
    </bean> 

</beans> 

consommateur est initialisé avant producteur en utilisant dépendent -on attribut.

code Java comme ci-dessous:

Producteur:

package com.angho.cloud.manager.kafka.impl; 

import com.angho.cloud.bo.data.order.SyncOrderBO; 
import com.angho.cloud.bo.result.order.CPosOrderSyncResult; 
import com.angho.cloud.manager.kafka.CPosOrderSyncManager; 
import com.angho.data.common.ResultConstant; 
import org.apache.log4j.Logger; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.support.GenericMessage; 


import javax.annotation.PostConstruct; 
import javax.annotation.Resource; 

public class CPosOrderKafkaProducer implements CPosOrderSyncManager { 

    private static final Logger LOG = Logger.getLogger(CPosOrderKafkaProducer.class); 

    @PostConstruct 
    public void init(){ 
     System.out.println("KafkaProducer start..."); 
    } 

    @Resource(name = "inputToKafka") 
    private MessageChannel messageChannel; 

    @Override 
    public CPosOrderSyncResult sendToKafka(SyncOrderBO order) { 
     CPosOrderSyncResult result = new CPosOrderSyncResult(); 
     result.setStatus(ResultConstant.CODE.ERROR); 

     Message<SyncOrderBO> message = new GenericMessage<>(order); 

     try { 
      boolean flag = this.messageChannel.send(message); 
      if (flag) { 
       result.setStatus(ResultConstant.CODE.SUCCESS); 
       result.setMessage(ResultConstant.MESSAGE.DEFAULT_SUCCESS_MESSAGE); 
      } else { 
       result.setMessage("Failed to send message to Kafka."); 
      } 
     } catch (Exception ex) { 
      LOG.error(ex); 
      result.setException(ex); 
     } 

     return result; 
    } 
} 

Le producteur est défini comme un haricot par:

<bean id="kafkaOrderProducer" class="com.angho.cloud.manager.kafka.impl.CPosOrderKafkaProducer" depends-on="kafkaOrderConsumer"/> 

code Java consommateur:

package com.angho.cloud.manager.kafka; 

import org.springframework.integration.annotation.ServiceActivator; 
import org.springframework.messaging.Message; 
import org.springframework.stereotype.Component; 

import javax.annotation.PostConstruct; 

@Component("kafkaOrderConsumer") 
public class KafkaOrderConsumer { 

    @PostConstruct 
    public void init(){ 
     System.out.println("what?KafkaConsumer start..."); 
    } 

    @SuppressWarnings("unchecked") 
    @ServiceActivator 
    public void process(Message<?> message){ 
     System.out.println("Message =======>" + message); 
     System.out.println("Content =======>" + message.getPayload()); 
    } 
} 

I je n'ai aucune idée ab la raison pour laquelle l'exception se produit.

Que dois-je faire pour que cela fonctionne? PS: J'essaie de tout configurer par XML plutôt que JavaCode comme @Bean.

Pardonnez mon pauvre anglais ....
TAT .....

Un grand merci ..

Répondre

0

auto-startup="false"

Pourquoi avez-vous cela?

L'adaptateur ne s'abonnera pas au canal tant qu'il ne sera pas start().

+0

Merci pour le support.J'ai défini le démarrage automatique vrai, mais l'exception existe toujours .... Avant d'avoir soulevé cette question, je l'ai googlé pendant longtemps et le démarrage automatique dans certaines configurations xml était faux . – Merorin

+0

Oh, ça marche enfin ... Pardonnez mon insouciance ... L'exception qui a été mentionnée ci-dessus était une autre sorte ... Désolé pour mon commentaire stupide ... Merci beaucoup. – Merorin