1

Dans un flux d'intégration, une division avec sa stratégie par défaut génère un élément de la liste. Le traitement de cet article peut échouer. Je veux traiter cette erreur et diriger un nouveau message avec les informations de mappage de la précédente (en plus d'un en-tête d'erreur personnalisé) vers le canal de messagerie normal.Comment personnaliser la logique d'agrégation de messages dans Spring Integration Java DSL

Dans l'agrégateur, je souhaite personnaliser la logique d'agrégation pour générer d'autres types de messages avec le nombre de processus ayant échoué et le résultat des messages qui n'ont pas échoué.

Ici, je vous expliquer comment j'envoie le message d'erreur avec l'en-tête:

@Bean 
public IntegrationFlow socialMediaErrorFlow() { 
    return IntegrationFlows.from("socialMediaErrorChannel") 
      .wireTap(sf -> sf.handle("errorService", "handleException")) 
      .<MessagingException>handle((p, h) 
       -> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList()) 
        .copyHeaders(p.getFailedMessage().getHeaders()) 
        .setHeader("ERROR", true) 
        .build() 
      ) 
      .channel("directChannel_1") 
      .get(); 
} 

Je veux l'agrégateur pour générer un objet de ce type:

public class Result { 

    private Integer totalTask; 
    private Integer taskFailed; 
    private List<CommentEntity> comments; 

} 

Comment dois-je aborder ce sujet?

merci d'avance.

Merci à l'aide de Artem J'ai fait cette mise en œuvre:

.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() { 
     @Override 
     public Object processMessageGroup(MessageGroup mg) { 
      Integer failedTaskCount = 0; 
      Integer totalTaskCount = mg.getMessages().size(); 
      List<CommentEntity> comments = new ArrayList<>(); 
      for(Message<?> message: mg.getMessages()){ 
       if(message.getHeaders().containsKey("ERROR")) 
        failedTaskCount++; 
       else 
          comments.addAll((List<CommentEntity>)message.getPayload()); 
     } 

    return new IterationResult(totalTaskCount, failedTaskCount, comments); 

    } 
})) 

Répondre

1

Le AggregatorSpec a outputProcessor propriété:

/** 
* A processor to determine the output message from the released group. Defaults to a message 
* with a payload that is a collection of payloads from the input messages. 
* @param outputProcessor the processor. 
* @return the aggregator spec. 
*/ 
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) { 

vous pouvez fournir de votre propre logique personnalisée pour analyser tous les messages dans la groupe et construisez votre Result pour eux.

L'échantillon de l'essai de cas:

.aggregate(a -> a.outputProcessor(g -> g.getMessages() 
         .stream() 
         .map(m -> (String) m.getPayload()) 
         .collect(Collectors.joining(" ")))) 

L'échantillon Cafe Demo:

.aggregate(aggregator -> aggregator 
     .outputProcessor(g -> 
        new Delivery(g.getMessages() 
           .stream() 
           .map(message -> (Drink) message.getPayload()) 
           .collect(Collectors.toList()))) 
     .correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()))