2017-04-24 1 views
1

Disons que j'ai deux verticles qui découvrent des noms de fichiers spéciaux (par exemple, pourrait être quelque chose si) et les publier à bus d'événement par exemple. on est en train de lire les noms d'un api REST et un autre d'un système de fichiers:Terminer et obtenir des résultats à partir d'un bus d'événement

ScanRestVerticle.java

/** 
* Reads file names through an REST API 
*/ 
public class ScanRestVerticle extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL"); 

     req.toObservable() 
      .flatMap(HttpClientResponse::toObservable) 
      .lift(unmarshaller(Model.class)) 
      .subscribe(c -> vertx.eventBus().publish("address", c.specialName())); 
     req.exceptionHandler(Throwable::printStackTrace); 
     req.end(); 
    } 
} 

ScanFsVerticle.java

/** 
* Reads file names from a file 
*/ 
public class ScanFsVerticle.java extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     StringObservable.byLine(vertx.fileSystem() 
      .rxReadFile("myFileNames.txt") 
      .map(Buffer::toString) 
      .toObservable()) 
      .subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage())); 
    } 
} 

Tout fonctionne très bien, mais, maintenant , J'ai un verticle qui combine ces noms d'un bus d'événement et les imprime sur STD.OUT:

PrintVerticle.java

public class PrintVerticle extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     vertx.eventBus() 
      .<JsonObject>consumer("address") 
      .bodyStream() 
      .toObservable() 
      .reduce(new JsonArray(), JsonArray::add) 
      .subscribe(j -> System.out.println(j.toString())); 
} 

Le problème est que réduire ici est jamais réellement terminé, car le bus d'événements fait un flux infini que je pense.

Alors, comment dois-je terminer cette opération et imprimer les noms publiés par les deux verticles?

Note: Je suis vraiment nouveau dans vert.x et rx et je pourrais manquer quelques morceaux ou obtenir quelque chose de mal, alors s'il vous plaît NE PAS FAIRE juge :)

Merci à l'avance.

EDIT: Je pourrais appeler scan() au lieu de reduce() ici, pour obtenir des résultats intermédiaires, mais comment pourrais-je obtenir scan().last()?

+0

Quelle est votre définition pour l'achèvement? Si vous voulez recueillir tous les 2 noms publiés par 2 sources, alors vous devez faire la distinction entre les 2 événements – yosriz

+0

Merci pour la question! Comme vous pouvez le voir dans l'exemple, je veux collecter tous les noms des deux vertices "scan" et les joindre dans la verticule "print" ... bien sûr, c'est un exemple simplifié. –

Répondre

1

Vous supposez que l'opérateur reduce() utilise l'événement onComplete() pour savoir où s'arrêter pour collecter.
tandis que scan() est un accumulateur qui émet une valeur cumulée pour chaque nouvelle émission de flux source, de sorte que l'analyse vous donnera des résultats intermédiaires.
Le problème ici est, que vous utilisez un EventBus, conceptuellement, EventBus est un flux infini, donc onComplete() ne devrait jamais être appelé. techniquement, il pourrait être appelé quand vous close() le `EventBus, mais je suppose que vous ne devriez pas et ne veulent pas compter sur elle.

  • Si vous souhaitez utiliser un EventBus, vous devriez finir en quelque sorte votre flux PrintVerticle, de sorte que vous aurez l'événement onComplete(). Par exemple, vous pouvez utiliser le EventBus pour publier certains événements qui publient chacun des 2 sommets observés. alors vous pouvez le compresser et terminer votre flux (en utilisant takeUntil()) au PrintVerticle après que 2 événements aient été émis.
    Quelque chose comme ceci:

    Observable vertice1EndStream = vertx.eventBus() 
        .<JsonObject>consumer("vertice1_end") 
        .bodyStream() 
        .toObservable(); 
    
    Observable vertice2EndStream = vertx.eventBus() 
        .<JsonObject>consumer("vertice2_end") 
        .bodyStream() 
        .toObservable() 
    
    Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object()); 
    
    vertx.eventBus() 
        .<JsonObject>consumer("address") 
        .bodyStream() 
        .toObservable() 
        .takeUntil(bothVerticesEndStream) 
        .reduce(new JsonArray(), JsonArray::add) 
        .subscribe(j -> System.out.println(j.toString())); 
    
  • Une autre option est d'utiliser directement les 2 flux au lieu du EventBus. les fusionner ensemble puis utiliser reduce(), comme ces 2 flux sont finis, vous n'aurez pas de problème.

+0

Merci pour vos suggestions ... Je pensais que j'irais avec 'EventBus', mais en utilisant uniquement' Observables' est en fait plus simple et je pense que c'est plus "pur". –