Dans un flux d'intégration, une division avec sa stratégie par défaut génère un élément de la liste. Le traitement de cet article peut échouer. Je veux traiter cette erreur et diriger un nouveau message avec les informations de mappage de la précédente (en plus d'un en-tête d'erreur personnalisé) vers le canal de messagerie normal.Comment personnaliser la logique d'agrégation de messages dans Spring Integration Java DSL
Dans l'agrégateur, je souhaite personnaliser la logique d'agrégation pour générer d'autres types de messages avec le nombre de processus ayant échoué et le résultat des messages qui n'ont pas échoué.
Ici, je vous expliquer comment j'envoie le message d'erreur avec l'en-tête:
@Bean
public IntegrationFlow socialMediaErrorFlow() {
return IntegrationFlows.from("socialMediaErrorChannel")
.wireTap(sf -> sf.handle("errorService", "handleException"))
.<MessagingException>handle((p, h)
-> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList())
.copyHeaders(p.getFailedMessage().getHeaders())
.setHeader("ERROR", true)
.build()
)
.channel("directChannel_1")
.get();
}
Je veux l'agrégateur pour générer un objet de ce type:
public class Result {
private Integer totalTask;
private Integer taskFailed;
private List<CommentEntity> comments;
}
Comment dois-je aborder ce sujet?
merci d'avance.
Merci à l'aide de Artem J'ai fait cette mise en œuvre:
.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() {
@Override
public Object processMessageGroup(MessageGroup mg) {
Integer failedTaskCount = 0;
Integer totalTaskCount = mg.getMessages().size();
List<CommentEntity> comments = new ArrayList<>();
for(Message<?> message: mg.getMessages()){
if(message.getHeaders().containsKey("ERROR"))
failedTaskCount++;
else
comments.addAll((List<CommentEntity>)message.getPayload());
}
return new IterationResult(totalTaskCount, failedTaskCount, comments);
}
}))