2017-10-18 10 views
1

Je travaille sur l'application de démarrage de printemps en utilisant spring-cloud-stream: 1.3.0.RELEASE, printemps-nuage-stream-liant-kafka: 1.3.0.RELEASE. J'utilise spring integration dsl pour diviser les lignes dans un fichier et beanio pour convertir les lignes en json, l'exigence est d'écrire des messages json réussis sur un sujet kafka et écrire des messages d'erreur sur différents sujets kafka. Voici la configuration dans application.yml.Envoyer un message d'erreur à la chaîne d'erreur en utilisant le flux de printemps nuage

spring: 
    cloud: 
    stream: 
     kafka: 
     binder: 
      autoAddPartitions: true 
     bindings.webmarketbasket: 
     destination: webmarketbasket 
     group: usproductrecommendationsgroup 
     producer: 
      partitionCount: 5 
      errorChannelEnabled: true 
     bindings.webmarketbasket.errors: 
     destination: webmarketbasketerrors 
     group: usproductrecommendationsgroup 
     producer: 
      partitionCount: 5 
     bindings.error: 
     destination: errorchannel 
     group: usproductrecommendationsgroup 
     producer: 
      partitionCount: 5 

je remarquai une demande de traction https://github.com/spring-cloud/spring-cloud-stream/pull/1039 au printemps nuage-stream-liant kafka: 1.3.0.RELEASE, ce qui crée PublishSubscribeChannel lorsque errorChannelEnabled est défini sur true, il y a aussi un testcase qui vérifie si le haricot est créé pour le canal d'erreur du producteur. Lorsque je vérifie l'URL de l'actionneur de ressort dans mon application http://localhost:8195/beans, le bean "errorChannel" pour le canal d'erreur globale est créé, mais le bean "webmarketbasket.errors" n'est pas créé. quand il y a "org.springframework.messaging.MessageHandlingException", un message d'erreur est envoyé à "errorchannel" rubrique kafka et arrête le traitement des lignes restantes du fichier. Le sujet de Kafka "webmarketbasketerrors" n'est jamais créé. Pouvez-vous aider, s'il vous plaît laissez-moi savoir si j'ai raté quelque chose.

Répondre

0

Vous semblez confondre deux choses.

Spring Integration Error Channel Support était lorsque vous souhaitez publier quelque chose au errorChannel global qui est lié à une destination via spring.cloud.stream.bindings.error.destination=myErrors. Le new support in 1.3 crée un canal d'erreur pour chaque programme d'écoute; c'est pub/sub et il est relié au global errorChannel. Par conséquent, le message sera également publié à ...bindings.error.destination (s'il est configuré, dans votre cas, la destination est appelée errorChannel). Le nom du bean pour le canal d'erreur dédié est webmarketbasket. usproductrecommendationsgroup.errors. Il n'y a pas de sujet kafka lié à ce canal par le framework.

Vous pouvez utiliser le canal d'erreur dédié ou global si vous souhaitez gérer vous-même l'erreur.

+0

Merci Gary. J'ai essayé avec "webmarketbasket.usproductrecommendationsgroup.errors". Mais le nom du bean pour le canal d'erreur dédié n'est pas créé. Il semble que le groupe est ajouté au consommateur seulement et non au producteur dans AbstractMessageChannelBinder.java. – mariappan

+0

J'ai ajouté un canal secondaire pour le canal d'erreur dédié par '@Output (" webmarketbasket.errors ") \t SubscribableChannel webMarketBasketErrorChannel();', le nom du bean "webmarketbasket.errors" est créé et le sujet kafka est également créé. Mais le message d'erreur n'est pas envoyé à la rubrique "webmarketbasket.errors", le message d'erreur est uniquement envoyé au canal d'erreur global. – mariappan

+0

Cela n'aidera pas; rien dans le cadre ne connaît ce canal; veuillez relire ma réponse. –