Dans une application plus large RxJava, je possède un certain nombre de chaudes, infinite Observables source. Les émissions de ces derniers sont fusionnées puis traitées par les observateurs en aval. Le résultat du traitement doit ensuite être utilisé pour suspendre temporairement les émissions de certains des observables sources, tandis que les observateurs non suspendus devraient continuer à émettre et les observateurs à consommer les émissions non suspendues. Etre chaud Observable tous les événements qui se produisent pendant la suspension, peut être ignoré/laissé sans risque.RxJava: Motif pour suspendre temporairement une sélection de source observables sans variables statefull
La seule solution que j'ai pu trouver jusqu'ici est d'appliquer un filtre avec des variables globales et statefull. Le code ci-dessous montre le principe. Pour simplifier, j'ai déplacé la logique dont la source Observable est suspendue, dans une boucle while et j'attribue simplement de manière aléatoire la décision suspend/run. En outre, les Observables source sont remplacés par des intervalles simples (dans l'application réelle, les événements sont aléatoires et proviennent de sources externes, qui sont enveloppées dans Observables)
boolean is1running = true;
boolean is2running = true;
boolean is3running = true;
public void multiStream() {
Observable<String> ob1 = Observable
.interval(100, TimeUnit.MILLISECONDS)
.map(s -> "OB1::" + s)
.filter(s -> keepRunning(1));
Observable<String> ob2 = Observable
.interval(100, TimeUnit.MILLISECONDS)
.map(s -> "OB2::::" + s)
.filter(s -> keepRunning(2));
Observable<String> ob3 = Observable
.interval(100, TimeUnit.MILLISECONDS)
.map(s -> "OB3:::::" + s)
.filter(s -> keepRunning(3));
Observable<String> finalObs = Observable.merge(ob1, ob2, ob3);
finalObs.subscribe(s -> System.out.println(s));
Random randomGenerator = new Random();
while(true)
{
sleep(1000);
is1running = randomGenerator.nextBoolean();
is2running = randomGenerator.nextBoolean();
is3running = randomGenerator.nextBoolean();
}
}
private boolean keepRunning(int i) {
switch(i)
{
case 1: return is1running;
case 2: return is2running;
case 3: return is3running;
}
return true;
}
Le code semble fonctionner, mais je ne suis pas heureux à propos d'avoir à utiliser des variables globales, stateful. Y a-t-il un meilleur modèle pour une telle situation qui adhère également aux paradigmes fonctionnels et réactifs?
Oui, je vois la logique générale, mais comment définissez-vous ob1Switch pour commencer? –