2017-09-14 5 views
0

J'ai une méthode de rx pour faire un appel d'api, et l'appelant de cette méthode pourrait se produire plusieurs fois dans un temps très court. Ainsi, la méthode rx seraitrxjava laisse tomber les résultats en aval

public void apiCallWithRx() { 
    apiService.makeApiCall() 
     .subscribeOn(Schecdulars.io()) 
     .observeOn(AndroidSchedulers.mainTread()) 
     .subscribe(
      // onNext 
      new onConsume(), 
      // onError 
      new onConsume(), 

     ); 
} 

La méthode de l'appelant pourrait appeler cette apiCallWithRx plusieurs fois en peu de temps .. Mais le problème est que je ne pourrais parfois obtenir une réponse de downtream lors de l'appel de la deuxième fois, ou tout particulier temps. Ni onNext, onError ou onComplete est appelé. Je me demandais, est-ce à cause du tampon ou de la contre-pression? Essayés avec rxjava1 et rxjava2, ils sont identiques.

Je serais très apprécié pour tout conseil.

MISE À JOUR 1

Je ne ai vu aucune exception de contre-pression, donc il ne pouvait pas être un problème de contre-pression.

MISE À JOUR 2

S'il vous plaît ignorer le détail, le code Rx fonctionne la plupart du temps. Je viens omis un code d'illustration but

MISE À JOUR 3

J'ai un BlockingQueue en arrière-plan, cette méthode rx est en fait appelé quand il y a une donnée disponible dans la file d'attente. Les données peuvent être ajoutées dans la file d'attente à tout moment. Et cette méthode rx est pas appelé de manière asynchrone, puisque cette méthode est appelée seulement après la première réponse, puis vérifier la file d'attente, s'il y a une donnée, alors nous envoyons une deuxième demande api.

+0

L'utilisation d'un BlockingQueue avec RxJava est sujette à des blocages. Vous avez probablement besoin d'un objet UnicastSubject pour mettre les données en mémoire tampon jusqu'à ce que l'aval puisse les consommer. – akarnokd

+0

@akamokd La méthode apiCallWithRx() est appelée à partir du thread d'interface utilisateur sur android. ExecutorService récupère les données de BlockingQueue dans le thread d'arrière-plan et les données sont transmises au thread UI et déclenche la méthode apiCallWithRx() sur le thread UI. Et il vérifiera la blockingqueue pour obtenir les données de demande suivantes chaque fois que nous recevons la réponse api du serveur pour les données de requête précédentes. Donc, l'appel api et BlockingQueue sont assez séparés, je ne pense pas qu'il y ait une impasse ici – Cheng

Répondre

0

Observable est paresseux par défaut et vous devez subscribe pour que le Observable s'exécute.

+0

ouais, nous avons ce code, à des fins d'illustration, son omis. – Cheng

+0

Je viens d'ajouter que, merci – Cheng

+0

Votre code utilise la version de 'subscribe' qui déclenche l'observable mais ignore tous les événements. –

0

Je ne sais pas pourquoi RxJava2 subscribeOn et observerOn opérateur peut être utilisé sans spécifier le fil où il doit être exécuté.

Mais au moins dans RxJava1 si vous utilisez ces opérateurs, vous l'exécutez de manière asynchrone, puis vous devez attendre dans l'exécution pour obtenir le résultat.

Vérifiez ce test

@Test 
public void testObservableAsync() throws InterruptedException { 
    Subscription subscription = Observable.from(numbers) 
      .doOnNext(increaseTotalItemsEmitted()) 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(number -> System.out.println("Items emitted:" + total)); 
    System.out.println("I finish before the observable finish. Items emitted:" + total); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

Vous pouvez voir ici un autre exemple https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

+0

Question mise à jour, j'utilise un BlockingQueue pour m'assurer que la méthode rx est appelée séquentiellement – Cheng