J'ai développé un flux d'intégration où je reçois les utilisateurs d'un MongoDbMessageSource et pour chaque milieu social associé à l'utilisateur-je obtenir les commentaires qui lui sont adressées.Spring Integration MongoDbStoringMessageHandler ClassCastException: BasicDBObject ne peut pas être jeté à BasicDBList
Ces commentaires que je veux les persister dans MongoDB avec l'aide de MongoDbStoringMessageHandler liée au canal storeChannel.
Le flux est la suivante:
@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {
private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
/**
* The Pollers builder factory can be used to configure common bean definitions or
* those created from IntegrationFlowBuilder EIP-methods
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
*
* MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload
* which is the result of execution of a Query
*/
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setExpectSingleResult(false);
messageSource.setEntityClass(UserEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("users"));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "storeChannel")
public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
adapter.setCollectionNameExpression(new LiteralExpression("comments"));
return adapter;
}
@Bean
@Autowired
public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
.<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
-> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
)
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
}
})
.channel("directChannel_1")
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
}
})
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
.<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
m
-> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST FACEBOOK Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST YOUTUBE Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST INSTAGRAM Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
});
}
}))
)
.channel("directChannel_2")
.aggregate()
.channel("directChannel_3")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.aggregate()
.channel("directChannel_4")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.channel("storeChannel")
.get();
}
}
Les messages de débogage avant l'erreur sont les suivants:
2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel : preSend on channel 'storeChannel', message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], sanchez.[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler : mongodbAdapter received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
où il est clair que le canal « storeChannel » vient une liste de « CommentEntity »
@Document(collection="comments")
public class CommentEntity {
@Id
private ObjectId id;
@Field("message")
private String message;
private ObjectId user;
@PersistenceConstructor
public CommentEntity(String message, ObjectId user) {
this.message = message;
this.user = user;
}
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public ObjectId getUser() {
return user;
}
public void setUser(ObjectId user) {
this.user = user;
}
}
Cette exception se produit alors:
2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
J'utilise actuellement intégré MongoDB:
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
Est-ce que quelqu'un sait que je fais mal? Merci d'avance.
Merci pour la réponse !! Et pourriez-vous les faire via un référentiel de données Spring? Serait-il élégant? –
Le référentiel n'est pas lié aux objectifs d'intégration de Spring. Ceci est un protocole agnostique de messagerie et un domaine pour l'interaction POJO –