2017-06-13 1 views
1

Supposons que j'ai deux flux de A et de B.'Zipper' deux flux réactifs où les éléments sont réutilisables

Volet 1: A1, A2, A3, A4, A5, A6 Volet B: B1, B2, B3, B4

Calendrier: B1> A1> A2> B2> A3> A4> B3> A5 sortie désiré: [B1 & A1], [A2 & B2], [B2 & A3], [A4 & B3], [B3 & A5], [A6 & B4]

Cela reflète un processus de la vie réelle, où Les B sont à double face et empilés ensemble. Par exemple,

---- B1 ----] 
      |-1 
---- A1 ----] 

---- A2 ----] 
      ]-2 
---- B2 ----|] 
      ]|-3 
---- A3 ---- ] 

---- A4 ----] 
      |-4 
---- B3 ----]] 
      |-5 
---- A5 ---- ] 

---- A6 ----] 
      |-6 
---- B4 ----] 

Le groupe 1 est caractéristique de la paire initiale. Le groupement 6 est caractéristique de l'appariement des bornes. Les groupes 2/3 et 4/5 peuvent être répétés.

Je cherche des idées pour réaliser cela avec RxJava2.

Merci, Dan.

EDIT

J'ai fait un test:

import java.util.Objects; 

import org.junit.Test; 

import io.reactivex.Flowable; 
import io.reactivex.functions.BiFunction; 
import io.reactivex.processors.FlowableProcessor; 
import io.reactivex.processors.ReplayProcessor; 

public class BuildControllerTest { 
    @Test 
    public void testProductBlanketSequencing() throws Exception {   
     final String a1="A1", a2="A2", a3="A3", a4="A4", a5="A5", a6="A6"; 
     final String b1="B1", b2="B2", b3="B3", b4="B4"; 

     class AB { 
      String a; 
      String b; 

      public AB(String a, String b) { 
       this.a = a; 
       this.b = b; 
      } 

      @Override public boolean equals(Object obj) { 
       if(getClass() != obj.getClass()) return false; 
       if(! Objects.equals(a, ((AB)obj).a)) return false; 
       if(! Objects.equals(b, ((AB)obj).b)) return false; 
       return true; 
      } 
     } 

     final FlowableProcessor<String> a = ReplayProcessor.create(); 
     final FlowableProcessor<String> b = ReplayProcessor.create(); 

     b.onNext(b1); 
     a.onNext(a1); 

     a.onNext(a2); 
     b.onNext(b2); 
     a.onNext(a3); 

     a.onNext(a4); 
     b.onNext(b3); 
     a.onNext(a5); 

     a.onNext(a6); 
     b.onNext(b4); 

     a.onComplete(); b.onComplete(); 

     Flowable.zip(a, b, new BiFunction<String, String, AB>() { 
      @Override public AB apply(String t1, String t2) throws Exception { 
       return new AB(t1, t2); 
      } 
     }) 
     .test() 
     .assertResult(
       new AB(a1, b1), 
       new AB(a2, b2), 
       new AB(a3, b2), 
       new AB(a4, b3), 
       new AB(a5, b3), 
       new AB(a6, b4)); 
    } 
} 

Répondre

1

De votre exemple, il ne sait pas quelle condition entraîne l'appariement.

Lorsque nous utilisons zip la paire est formé dès que les deux sources a élément suivant:

(A1, B1), (A2, B2), (A3, B3), (A4, B4) 

Si nous utilisons combineLatest la paire est formé quand il y a l'élément suivant dans une source:

(A1, B1), (A2, B1), (A2, B2), (A3, B2), (A4, B2), (A4, B3), (A5, B3), (A6, B3), (A6, B4) 

Pour moi, il semble que vous voulez combiner des sources, mais ne prenez pas toutes les paires. Vous pouvez supprimer des paires supplémentaires en fonction des règles métier.

La règle supplémentaire peut être le temps, puis regardez sampling.

Flowable.combineLatest(a, b, (t1, t2) -> new AB(t1, t2)) 
     .throttleLast(1, TimeUnit.MICROSECONDS) 

il prendra le dernier résultat de paires combinées chaque intervalle de temps

(A1, B1), (A4, B3), (A5, B3), (A6, B3), (A6, B4) 

Si des règles supplémentaires est d'avoir des valeurs uniques de la source A et répète posible de B vous pouvez alors utiliser distinct

Flowable.combineLatest(a, b, (t1, t2) -> new AB(t1, t2)) 
     .distinct(ab -> ab.a) 

(A1, B1), (A2, B1), (A3, B2), (A4, B2), (A5, B3), (A6, B3) 
+0

combineLatest semble être pratique pour mes circonstances, bien que jusqu'ici j'ai lutté pour l'obtenir pour produire ce dont j'ai besoin. Un problème est le contrôle du séquençage dans un cas de test. Je l'ai cependant réussi à travailler avec zip et à doubler les Bs après le premier: final AtomicBoolean reset = new AtomicBoolean (true); zip (a, b.flatMap (t -> reset.getAndSet (faux)? Flowable.just (t): Flowable.just (t, t)), (t1, t2) -> nouveau AB (t1, t2)) ; Mais c'est probablement mauvais? – Dan

+0

Ah, je n'ai pas compris que vous voulez répéter tous les éléments de B sauf le premier. Cela devrait fonctionner ensuite. –

+0

La duplication des Bs n'était pas le but, mais elle s'avère produire la séquence correcte (en ignorant la première et la dernière) lorsqu'elle est zippée. Mon problème est plus complexe que cet exemple, et combineLatest s'est avéré très utile dans ce scénario. Merci! – Dan