2

J'ai un module de processeur http spring-xd avec http-outbound-gateway qui a un errorChannel et un outputChannel. Tout message avec HTTP 200 arrive à outputChannel, et le reste d'entre eux atterrit dans failureChannel. À l'heure actuelle, le module de traitement http se connecte à un Kafka-Sink avec un adaptateur Kafka-Outbound avec TopicX. TopicX reçoit uniquement les messages HTTP 200 pour un traitement ultérieur. Maintenant, nous avons besoin que les messages de failureChannel soient routés vers TopicY.Comment acheminer des messages de kafka-sink à plusieurs sujets

Comment puis-je envoyer des messages à plusieurs sujets kafka dans le kafka-sink. J'ai le httpStatusCode dans l'en-tête du message. La version de Kafka dans mon projet est 0.8.2 et la version java est 1,7

<!-- http-processor-config --> 
<int-http:outbound-gateway 
     request-channel="input" 
     url-expression="'myUrlLink'" 
     http-method="POST" 
     expected-response-type="java.lang.String" 
     charset="UTF-8" 
     reply-timeout="10" 
     reply-channel="output"> 

     <int-http:request-handler-advice-chain> 
        <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice"> 
         <property name="recoveryCallback"> 
          <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer"> 
           <constructor-arg ref="errorChannel" /> 
          </bean> 
         </property> 
         <property name="retryTemplate" ref="retryTemplate" /> 
        </bean> 
     </int-http:request-handler-advice-chain> 

</int-http:outbound-gateway> 


<!-- Handle failed messages and route to failureChannel for specific http codes--> 
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/> 

sur Kafka Sink, je suis le contexte producteur suivant:

<int-kafka:producer-context id="kafkaProducerContext"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration broker-list="localhost:9092" 
              topic="${topicX}" 
              key-class-type="java.lang.String" 
              key-serializer="serializer" 
              value-class-type="[B" 
              value-serializer="valueSerializer"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

Répondre

1

Je l'ai fonctionné enfin. En ce moment, j'ai trouvé une solution de contournement avec la version 0.8.x, en ajoutant un séparateur dans le module http-processeur, et ajouté une variable kafka_topic à l'en-tête du message. Basé sur le code d'état Http, je viens de définir les différents sujets.

Sur Kafka-sink, j'ai ajouté une autre configuration de producteur avec la nouvelle variable de nom de sujet, définie via les paramètres XD. Je ne suis pas capable de penser à une autre solution parce que je réutilise le module kafka-source et kafka-sink dans plusieurs flux.

Cette fonction kafka-sink spécifique envoie l'entrée à un autre flux XD. Donc, ajouté un en-tête-filtre pour supprimer le kafka_topic dans le module kafka-source lorsque le flux suivant commence.

Pour en savoir plus: http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.0.2.RELEASE/reference/html/_spring_integration.html

Recherchez les lignes pour définir le sujet cible de kafka. C'est la clé.

1

C'est vrai qu'il est pas pris en charge et ne sera pas. Spring XD est EOL cette année déjà. Tout le monde est encouragé à migrer vers le Spring Cloud Data Flow.

Pour votre cas d'utilisation, vous pouvez éditer la configuration du module Kafka Sink. Ajouter un de plus <int-kafka:outbound-channel-adapter> pour un autre sujet. Pour décider à quel sujet envoyer un message entrant, vous pouvez ajouter <router> à cette config.

Ou pensez simplement à utiliser Router Sink. Et avoir deux flux distincts pour chaque type de message et, par conséquent, chaque sujet.

+0

Je l'ai fonctionné finalement. – Vidhya