2014-04-16 10 views
0

J'ai un objet PublishSubject auquel je suis abonné deux fois. Le premier abonné ne compte que le nombre d'éléments traités, et cette valeur correspond toujours à ce que j'envoie à travers l'observateur. Cependant, l'autre abonné utilise un tampon et souvent (75%) je ne reçois pas tous les éléments qui ont traversé l'observateur. Suis-je en utilisant le tampon faux? J'attends plus longtemps que le délai après que j'arrête d'envoyer à l'observateur pour m'assurer que tous les articles sont traités.Le tampon rx-java perd des éléments

Integer downloads1 = 0; 
Integer downloads2 = 0; 
PublishSubject<Object> subject = PublishSubject.create(); 
// this subscriber count matches the expected 
subject.subscribe(s -> { 
    synchronized (downloads1) { 
    downloads1 += 1; 
    } 
}); 
// this subscriber seems to miss items about 75% of the time 
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> { 
    synchronized (downloads2) { 
    downloads2 += list.size(); 
    } 
}); 
+0

Où la source est-elle observable? Puisque vous utilisez 'synchronized', je suppose que votre source Observable peut avoir un problème. Vous devez vous assurer que votre Observable envoie le message dans le même thread ou soit synchronisé. – zsxwing

Répondre

0

Peut-être que vous rencontrez ce bug: https://github.com/Netflix/RxJava/issues/534

Et d'ailleurs, au lieu de vous abonner, vous devez utiliser reduce(R initialValue, Func2<R,? super T,R> accumulator) avec une valeur initiale de 0, vous n'avez pas besoin de faire une synchronisation vous.