Disons que j'ai deux verticles qui découvrent des noms de fichiers spéciaux (par exemple, pourrait être quelque chose si) et les publier à bus d'événement par exemple. on est en train de lire les noms d'un api REST et un autre d'un système de fichiers:Terminer et obtenir des résultats à partir d'un bus d'événement
ScanRestVerticle.java
/**
* Reads file names through an REST API
*/
public class ScanRestVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL");
req.toObservable()
.flatMap(HttpClientResponse::toObservable)
.lift(unmarshaller(Model.class))
.subscribe(c -> vertx.eventBus().publish("address", c.specialName()));
req.exceptionHandler(Throwable::printStackTrace);
req.end();
}
}
ScanFsVerticle.java
/**
* Reads file names from a file
*/
public class ScanFsVerticle.java extends AbstractVerticle {
@Override
public void start() throws Exception {
StringObservable.byLine(vertx.fileSystem()
.rxReadFile("myFileNames.txt")
.map(Buffer::toString)
.toObservable())
.subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage()));
}
}
Tout fonctionne très bien, mais, maintenant , J'ai un verticle qui combine ces noms d'un bus d'événement et les imprime sur STD.OUT:
PrintVerticle.java
public class PrintVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.eventBus()
.<JsonObject>consumer("address")
.bodyStream()
.toObservable()
.reduce(new JsonArray(), JsonArray::add)
.subscribe(j -> System.out.println(j.toString()));
}
Le problème est que réduire ici est jamais réellement terminé, car le bus d'événements fait un flux infini que je pense.
Alors, comment dois-je terminer cette opération et imprimer les noms publiés par les deux verticles?
Note: Je suis vraiment nouveau dans vert.x et rx et je pourrais manquer quelques morceaux ou obtenir quelque chose de mal, alors s'il vous plaît NE PAS FAIRE juge :)
Merci à l'avance.
EDIT: Je pourrais appeler scan()
au lieu de reduce()
ici, pour obtenir des résultats intermédiaires, mais comment pourrais-je obtenir scan().last()
?
Quelle est votre définition pour l'achèvement? Si vous voulez recueillir tous les 2 noms publiés par 2 sources, alors vous devez faire la distinction entre les 2 événements – yosriz
Merci pour la question! Comme vous pouvez le voir dans l'exemple, je veux collecter tous les noms des deux vertices "scan" et les joindre dans la verticule "print" ... bien sûr, c'est un exemple simplifié. –