0

J'ai développé un flux d'intégration où je reçois les utilisateurs d'un MongoDbMessageSource et pour chaque milieu social associé à l'utilisateur-je obtenir les commentaires qui lui sont adressées.Spring Integration MongoDbStoringMessageHandler ClassCastException: BasicDBObject ne peut pas être jeté à BasicDBList

Ces commentaires que je veux les persister dans MongoDB avec l'aide de MongoDbStoringMessageHandler liée au canal storeChannel.

Le flux est la suivante:

@Configuration 
@IntegrationComponentScan 
public class InfrastructureConfiguration { 

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class); 

    /** 
    * The Pollers builder factory can be used to configure common bean definitions or 
    * those created from IntegrationFlowBuilder EIP-methods 
    */ 
    @Bean(name = PollerMetadata.DEFAULT_POLLER) 
    public PollerMetadata poller() { 
     return Pollers.fixedDelay(10, TimeUnit.SECONDS).get(); 
    } 

    @Bean 
    public TaskExecutor taskExecutor() { 
     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(5); 
     executor.setMaxPoolSize(10); 
     executor.setQueueCapacity(25); 
     return executor; 
    } 

    /** 
    * 
    * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
    * which is the result of execution of a Query 
    */ 
    @Bean 
    @Autowired 
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) { 
     MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}")); 
     messageSource.setExpectSingleResult(false); 
     messageSource.setEntityClass(UserEntity.class); 
     messageSource.setCollectionNameExpression(new LiteralExpression("users")); 
     return messageSource; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "storeChannel") 
    public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception { 
     MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo); 
     adapter.setCollectionNameExpression(new LiteralExpression("comments")); 
     return adapter; 
    } 

    @Bean 
    @Autowired 
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) { 
     return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller)) 
       .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList 
         -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia)) 
       ) 
       .split(new AbstractMessageSplitter() { 
        @Override 
        protected Object splitMessage(Message<?> msg) { 
         return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet(); 
        } 
       }) 
       .channel("directChannel_1") 
       .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key"))) 
       .split(new AbstractMessageSplitter() { 
        @Override 
        protected Object splitMessage(Message<?> msg) { 
         return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue(); 
        } 
       }) 
       .channel(MessageChannels.executor("executorChannel", this.taskExecutor())) 
       .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(), 
         m 
         -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() { 
           @Override 
           public Object handle(SocialMediaEntity payload, Map<String, Object> headers) { 
            ObjectId userId = (ObjectId)headers.get("user-id"); 
            logger.info("TEST FACEBOOK Channel for user id: " + userId); 
            return Arrays.asList(new CommentEntity[] { 
             new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId), 
             new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId) 
            }); 
           } 
          })) 
          .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() { 
           @Override 
           public Object handle(SocialMediaEntity payload, Map<String, Object> headers) { 
            ObjectId userId = (ObjectId)headers.get("user-id"); 
            logger.info("TEST YOUTUBE Channel for user id: " + userId); 
            return Arrays.asList(new CommentEntity[] { 
             new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId), 
             new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId) 
            }); 
           } 
          })) 
          .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() { 
           @Override 
           public Object handle(SocialMediaEntity payload, Map<String, Object> headers) { 
            ObjectId userId = (ObjectId)headers.get("user-id"); 
            logger.info("TEST INSTAGRAM Channel for user id: " + userId); 
            return Arrays.asList(new CommentEntity[] { 
             new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId), 
             new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId) 
            }); 
           } 
          })) 
       ) 
       .channel("directChannel_2") 
       .aggregate() 
       .channel("directChannel_3") 
       .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
         comments.stream().flatMap(List::stream).collect(Collectors.toList())) 
       .aggregate() 
       .channel("directChannel_4") 
       .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
         comments.stream().flatMap(List::stream).collect(Collectors.toList())) 
       .channel("storeChannel") 
       .get(); 
    } 

} 

Les messages de débogage avant l'erreur sont les suivants:

2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel : preSend on channel 'storeChannel', message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], sanchez.[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler : mongodbAdapter received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 

où il est clair que le canal « storeChannel » vient une liste de « CommentEntity »

@Document(collection="comments") 
public class CommentEntity { 

    @Id 
    private ObjectId id; 

    @Field("message") 
    private String message; 

    private ObjectId user; 

    @PersistenceConstructor 
    public CommentEntity(String message, ObjectId user) { 
     this.message = message; 
     this.user = user; 
    } 

    public ObjectId getId() { 
     return id; 
    } 

    public void setId(ObjectId id) { 
     this.id = id; 
    } 

    public String getMessage() { 
     return message; 
    } 

    public void setMessage(String message) { 
     this.message = message; 
    } 

    public ObjectId getUser() { 
     return user; 
    } 

    public void setUser(ObjectId user) { 
     this.user = user; 
    } 

} 

Cette exception se produit alors:

2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 

J'utilise actuellement intégré MongoDB:

<dependency> 
    <groupId>de.flapdoodle.embed</groupId> 
    <artifactId>de.flapdoodle.embed.mongo</artifactId> 
</dependency> 

Est-ce que quelqu'un sait que je fais mal? Merci d'avance.

Répondre

1

Eh bien, cette exception dit clairement que MongoDbStoringMessageHandler ne prend pas en charge la collecte pour la sauvegarde:

protected void handleMessageInternal(Message<?> message) throws Exception { 
    Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method"); 
    String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class); 
    Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null"); 

    Object payload = message.getPayload(); 

    this.mongoTemplate.save(payload, collectionName); 
} 

Vous n'êtes pas obligé de .aggregate() construire des collections pour sauver. Vous pouvez seulement les enregistrer avec ce composant seulement un par un.

Je pense que cela devrait être un bon ajout à laisser ce composant à effectuer:

/** 
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the 
* class. 
* 
* @param collectionToSave the list of objects to save. 
*/ 
void insertAll(Collection<? extends Object> objectsToSave); 

S'il vous plaît, soulever une JIRA sur la question et ne pas hésiter à contribution!

+0

Merci pour la réponse !! Et pourriez-vous les faire via un référentiel de données Spring? Serait-il élégant? –

+1

Le référentiel n'est pas lié aux objectifs d'intégration de Spring. Ceci est un protocole agnostique de messagerie et un domaine pour l'interaction POJO –