2016-04-05 4 views
1

J'ai quelques problèmes avec un itinéraire suivant:itinéraire Camel EIP Apache - Comment arrêter split()

// from("cxf:....")... 
from("direct:start").process(startRequestProcessor) // STEP 1 
      .choice() 
       .when(body().isNull()) 
         .to("direct:finish") 
       .otherwise() 
        .split(body()) // STEP 2 
        .bean(TypeMapper.class) // STEP 3 
        .log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}") 
        .recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4 
        .ignoreInvalidEndpoints(); 

    from("direct:endpoint2") // STEP 6 
      .log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}") 
      .recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader") 
      .ignoreInvalidEndpoints(); 

    from("direct:endpoint1.1") // STEP 5 
      .process(new DateRangeProcessor()) 
      .to("direct:collections"); 

    from("direct:endpoint1.2") // STEP 5 
      .process(new SingleProcessor()) 
      .to("direct:collections"); 


    from("direct:endpoint2.2") // STEP 7 
      .aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("endpoint22")) 

      .process(new QueryBuilderProcessor()) 
      .bean(MyService, "getDbCriteria") 

      .setHeader("collection", constant("endpoint2.1")) 
      .to("direct:endpoint2.1").end(); 


    from("direct:endpoint2.1") // STEP 8 
      .aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("CamelSplitSize")) 
      .to("direct:finish").end(); 

    from("direct:finish") 
      .process(new QueryBuilderProcessor()) 
      .bean(MyRepository, "findAll") 
      .log("ResponseData: ${body}"). 
      marshal().json(JsonLibrary.Gson).end(); 

La route

  1. reçoit une chaîne JSON convertit à la liste (HashSet) de JSONObjects.
  2. diviser la liste reçue en objets json.
  3. Set-têtes correspondant en fonction du contenu de l'objet
  4. Routes les messages en fonction des en-têtes à endpoint1.1 ou endpoint1.2
  5. Convertir des messages à MongoDB critères et envoyer à Endpoint2
  6. itinéraires Endpoint2 messages selon une autre tête à endpoint2.1 ou endpoint2.2. Endpoint2.2 agrège tous les messages reçus, les traite pour obtenir des critères mongodb et les envoie à endpoint2.1 (completionSize est calculé à l'étape 2 et enregistré dans la propriété "endpoint22").
  7. Enpoint2.1 agrège TOUS les messages (CamelSplitSize) convertit les messages agrégés en objet Requête et l'envoie au référentiel pour récupérer les données.

Je peux voir l'objet de réponse valide dans débogueur mais de toute façon j'obtenir une erreur:

No message body writer has been found for class java.util.HashSet, ContentType: application/json

Le problème n'est pas dans l'objet de réponse car il fonctionne avec d'autres routes et il ne contient pas HashSet.

Je pense que la route envoie à la sortie du HashSet créé tat STEP 1 ...

Mes questions sont les suivantes:

  • ce qui est erroné dans la sortie de route?
  • deux recipientList() tentent de transmettre messages au point final invalide (je dois utiliser .ignoreInvalidEndpoints() pour éviter exception):

    org.apache.camel.NoSuchEndpointException: No endpoint could be found for: [email protected], please check your classpath contains the needed Camel component jar.

Toute aide serait très apprécié! Merci.

Répondre

2

Je trouve cela très étrange, mais la fonction .aggregate() ne répond pas à l'échange. Il utilise votre stratégie d'agrégation mais répond toujours aux échanges entrants. Ce n'est pas clair lors de la lecture de la documentation, mais vous devez utiliser la stratégie d'agrégation avec split() pour pouvoir retourner l'échange.