J'essaie d'utiliser spring-integration-kafka-2.1.0.RELEASE dans le projet de mon entreprise.Cependant, cela ne fonctionne pas en raison de l'exception qui est énumérée ci-dessous: org .springframework.messaging.MessageDeliveryException: Dispatcher n'a aucun abonné pour le canal 'org.springframework.web.context.WebApplicationContext: /order.inputToKafka' .; L'exception imbriquée est org.springframework.integration.MessageDispatchingException: Dispatcher n'a aucun abonné.MessageDeliveryException: Dispatcher n'a pas d'abonnés
La configuration xml comme ci-dessous: CTX-kafka-producer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int:channel id="inputToKafka"/>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="kafkaTemplate"
auto-startup="false"
channel="inputToKafka"
topic="customerpos-order"/>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg index="0" ref="producerFactory"/>
</bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="127.0.0.1:9092"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<!--<entry key="acks" value="${acks}"/>-->
<!--<entry key="buffer.memory" value="${buffer.memory}"/>-->
<!--<entry key="compression.type" value="${compression.type}"/>-->
<!--<entry key="retries" value="${retries}"/>-->
<!--<entry key="batch.size" value="${batch.size}"/>-->
<!--<entry key="max.block.ms" value="${max.block.ms}"/>-->
<!--<entry key="max.request.size" value="${max.request.size}"/>-->
<!--<entry key="partitioner.class" value="${partitioner.class}"/>-->
<!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
<!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
<!--<entry key="security.protocol" value="${security.protocol}"/>-->
<entry key="send.buffer.bytes" value="${send.buffer.bytes}"/>
<!--<entry key="ssl.protocol" value="${ssl.protocol}"/>-->
<!--<entry key="ssl.truststore.type" value="${ssl.truststore.type}"/>-->
<!--<entry key="timeout.ms" value="${timeout.ms}"/>-->
<!--<entry key="block.on.buffer.full" value="${block.on.buffer.full}"/>-->
<!--<entry key="max.in.flight.requests.per.connection" value="${max.in.flight.requests.per.connection}"/>-->
<!--<entry key="metadata.fetch.timeout.ms" value="${metadata.fetch.timeout.ms}"/>-->
<!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
<!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
<!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
<!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
<!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
<!--<entry key="sasl.kerberos.min.time.before.relogin" value="${sasl.kerberos.min.time.before.relogin}"/>-->
<!--<entry key="sasl.kerberos.ticket.renew.jitter" value="${sasl.kerberos.ticket.renew.jitter}"/>-->
<!--<entry key="sasl.kerberos.ticket.renew.window.factor" value="${sasl.kerberos.ticket.renew.window.factor}"/>-->
<!--<entry key="ssl.keymanager.algorithm" value="${ssl.keymanager.algorithm}"/>-->
<!--<entry key="linger.ms" value="1"/>-->
</map>
</constructor-arg>
</bean>
<!--<bean id="kafkaProducer" class="com.angho.cloud.manager.kafka.impl.KafkaProducerImpl"/>-->
</beans>
CTX-kafka-consumer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int-kafka:message-driven-channel-adapter id="kafkaListener"
listener-container="container1"
auto-startup="true"
phase="100"
send-timeout="5000"
channel="inputFromChannel"
error-channel="errorChannel"/>
<int:service-activator input-channel="inputFromChannel" ref="kafkaOrderConsumer"/>
<!--<bean id="kafkaOrderConsumer" class="com.angho.cloud.manager.kafka.KafkaOrderConsumer"/>-->
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="127.0.0.1:9092"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<!--<entry key="fetch.min.bytes" value="${fetch.min.bytes}"/>-->
<entry key="group.id" value="customerpos-order"/>
<!--<entry key="heartbeat.interval.ms" value="${heartbeat.interval.ms}"/>-->
<!--<entry key="max.partition.fetch.bytes" value="${max.partition.fetch.bytes}"/>-->
<entry key="session.timeout.ms" value="15000"/>
<!--<entry key="auto.offset.reset" value="${auto.offset.reset}"/>-->
<!--<entry key="connections.max.idle.ms" value="${connections.max.idle.ms}"/>-->
<entry key="enable.auto.commit" value="false"/>
<!--<entry key="exclude.internal.topics" value="${exclude.internal.topics}"/>-->
<!--<entry key="fetch.max.bytes" value="${fetch.max.bytes}"/>-->
<!--<entry key="max.poll.interval.ms" value="${max.poll.interval.ms}"/>-->
<!--<entry key="max.poll.records" value="${max.poll.records}"/>-->
<!--<entry key="partition.assignment.strategy" value="${partition.assignment.strategy}"/>-->
<!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
<!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
<entry key="auto.commit.interval.ms" value="100"/>
<!--<entry key="fetch.max.wait.ms" value="${fetch.max.wait.ms}"/>-->
<!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
<!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
<!--<entry key="metrics.recording.level" value="${metrics.recording.level}"/>-->
<!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
<!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
<!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="customerpos-order"/>
</bean>
</constructor-arg>
</bean>
</beans>
consommateur est initialisé avant producteur en utilisant dépendent -on attribut.
code Java comme ci-dessous:
Producteur:
package com.angho.cloud.manager.kafka.impl;
import com.angho.cloud.bo.data.order.SyncOrderBO;
import com.angho.cloud.bo.result.order.CPosOrderSyncResult;
import com.angho.cloud.manager.kafka.CPosOrderSyncManager;
import com.angho.data.common.ResultConstant;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
public class CPosOrderKafkaProducer implements CPosOrderSyncManager {
private static final Logger LOG = Logger.getLogger(CPosOrderKafkaProducer.class);
@PostConstruct
public void init(){
System.out.println("KafkaProducer start...");
}
@Resource(name = "inputToKafka")
private MessageChannel messageChannel;
@Override
public CPosOrderSyncResult sendToKafka(SyncOrderBO order) {
CPosOrderSyncResult result = new CPosOrderSyncResult();
result.setStatus(ResultConstant.CODE.ERROR);
Message<SyncOrderBO> message = new GenericMessage<>(order);
try {
boolean flag = this.messageChannel.send(message);
if (flag) {
result.setStatus(ResultConstant.CODE.SUCCESS);
result.setMessage(ResultConstant.MESSAGE.DEFAULT_SUCCESS_MESSAGE);
} else {
result.setMessage("Failed to send message to Kafka.");
}
} catch (Exception ex) {
LOG.error(ex);
result.setException(ex);
}
return result;
}
}
Le producteur est défini comme un haricot par:
<bean id="kafkaOrderProducer" class="com.angho.cloud.manager.kafka.impl.CPosOrderKafkaProducer" depends-on="kafkaOrderConsumer"/>
code Java consommateur:
package com.angho.cloud.manager.kafka;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component("kafkaOrderConsumer")
public class KafkaOrderConsumer {
@PostConstruct
public void init(){
System.out.println("what?KafkaConsumer start...");
}
@SuppressWarnings("unchecked")
@ServiceActivator
public void process(Message<?> message){
System.out.println("Message =======>" + message);
System.out.println("Content =======>" + message.getPayload());
}
}
I je n'ai aucune idée ab la raison pour laquelle l'exception se produit.
Que dois-je faire pour que cela fonctionne? PS: J'essaie de tout configurer par XML plutôt que JavaCode comme @Bean.
Pardonnez mon pauvre anglais ....
TAT .....
Un grand merci ..
Merci pour le support.J'ai défini le démarrage automatique vrai, mais l'exception existe toujours .... Avant d'avoir soulevé cette question, je l'ai googlé pendant longtemps et le démarrage automatique dans certaines configurations xml était faux . – Merorin
Oh, ça marche enfin ... Pardonnez mon insouciance ... L'exception qui a été mentionnée ci-dessus était une autre sorte ... Désolé pour mon commentaire stupide ... Merci beaucoup. – Merorin