2017-08-17 2 views
1

Dans une application plus large RxJava, je possède un certain nombre de chaudes, infinite Observables source. Les émissions de ces derniers sont fusionnées puis traitées par les observateurs en aval. Le résultat du traitement doit ensuite être utilisé pour suspendre temporairement les émissions de certains des observables sources, tandis que les observateurs non suspendus devraient continuer à émettre et les observateurs à consommer les émissions non suspendues. Etre chaud Observable tous les événements qui se produisent pendant la suspension, peut être ignoré/laissé sans risque.RxJava: Motif pour suspendre temporairement une sélection de source observables sans variables statefull

La seule solution que j'ai pu trouver jusqu'ici est d'appliquer un filtre avec des variables globales et statefull. Le code ci-dessous montre le principe. Pour simplifier, j'ai déplacé la logique dont la source Observable est suspendue, dans une boucle while et j'attribue simplement de manière aléatoire la décision suspend/run. En outre, les Observables source sont remplacés par des intervalles simples (dans l'application réelle, les événements sont aléatoires et proviennent de sources externes, qui sont enveloppées dans Observables)

boolean is1running = true; 
boolean is2running = true; 
boolean is3running = true; 

public void multiStream() { 

    Observable<String> ob1 = Observable 
      .interval(100, TimeUnit.MILLISECONDS) 
      .map(s -> "OB1::" + s) 
      .filter(s -> keepRunning(1)); 

    Observable<String> ob2 = Observable 
      .interval(100, TimeUnit.MILLISECONDS) 
      .map(s -> "OB2::::" + s) 
      .filter(s -> keepRunning(2)); 

    Observable<String> ob3 = Observable 
      .interval(100, TimeUnit.MILLISECONDS) 
      .map(s -> "OB3:::::" + s) 
      .filter(s -> keepRunning(3)); 

    Observable<String> finalObs = Observable.merge(ob1, ob2, ob3); 

    finalObs.subscribe(s -> System.out.println(s)); 

    Random randomGenerator = new Random(); 

    while(true) 
    { 
     sleep(1000); 
     is1running = randomGenerator.nextBoolean(); 
     is2running = randomGenerator.nextBoolean(); 
     is3running = randomGenerator.nextBoolean(); 
    } 
} 

private boolean keepRunning(int i) { 
    switch(i) 
    { 
     case 1: return is1running; 
     case 2: return is2running; 
     case 3: return is3running; 
    } 

    return true; 
} 

Le code semble fonctionner, mais je ne suis pas heureux à propos d'avoir à utiliser des variables globales, stateful. Y a-t-il un meilleur modèle pour une telle situation qui adhère également aux paradigmes fonctionnels et réactifs?

Répondre

0

Pour ce type de situation, j'utilise fréquemment l'opérateur switchMap(). Pour votre exemple, supposons que nous ayons un boolean observable pour chaque source, donc ob1Switch, ob2Switch et ob3Switch. Ensuite,

Observable<String> ob1 = Observable 
     .interval(100, TimeUnit.MILLISECONDS) 
     .map(s -> "OB1::" + s); 

serait contrôlé par ob1Switch:

Observable<String> ob1Switchable = ob1Switch 
    .switchMap(switchOn -> switchOn ? ob1 : Observable.never()); 

Maintenant, vous pouvez utiliser ob1Switchable où que vous l'habitude d'utiliser ob1, sans la nécessité d'un état global. Vous devez certainement garder ob1Switch en cours par ob1Switch.onNext(Boolean.TRUE) lorsque vous voulez ob1 activé.

Edit: comme un exemple d'une chaîne avec une boucle de rétroaction:

Observable<Integer> eventsPerSecond = 
    Observable.merge(ob1Switchable, ob2Switchable, ob3Switchable) 
    .buffer(1, TimeUnit.SECONDS) 
    .map(buf -> buf.size()); 
Observable<Boolean> obs1Switch = eventsPerSecond 
    .map(eps -> Boolean.valueOf(eps > LOW_THRESHOLD); 
Observable<Boolean> obs2Switch = eventsPerSecond 
    .map(eps -> Boolean.valueOf(eps > MEDIUM_THRESHOLD); 
Observable<Boolean> obs3Switch = eventsPerSecond 
    .map(eps -> Boolean.valueOf(eps > HIGH_THRESHOLD); 

Avec ces définitions en place, vous pourriez avoir une boucle de rétroaction qui successivement fermer les sources jusqu'à ce que vous aviez un taux durable d'événements.

+0

Oui, je vois la logique générale, mais comment définissez-vous ob1Switch pour commencer? –