2017-10-06 7 views
0

J'ai un héritage, code non-RX qui fait du travail en réseau en engendrant un nouveau thread. Lorsque le travail se termine, il appelle une méthode sur un rappel.Code adapté à RX Completable ne bloquant pas sur thread threadsubscribe

Je n'ai aucun contrôle sur le thread sur lequel ce code est exécuté. C'est l'héritage, et il engendre un nouveau Thread par lui-même.

Cela peut être simplifié en quelque chose comme:

interface Callback { 
    void onSuccess(); 
} 

static void executeRequest(String name, Callback callback) { 
    new Thread(() -> { 
     try { 
      System.out.println(" Starting... " + name); 
      Thread.sleep(2000); 
      System.out.println(" Finishing... " + name); 
      callback.onSuccess(); 
     } catch (InterruptedException ignored) {} 
    }).start(); 
} 

Je veux convertir en un RX Completable. Pour ce faire, j'utilise Completable#create(). La mise en œuvre du CompletableEmitter appelle le executeRequest passage de l'implémentation du Callback qui signale lorsque la demande est terminée.

J'affiche également une trace de journal lors de l'abonnement pour m'aider à déboguer.

static Completable createRequestCompletable(String name) { 
     return Completable.create(e -> executeRequest(name, e::onComplete)) 
       .doOnSubscribe(d -> System.out.println("Subscribed to " + name)); 
} 

Cela fonctionne comme prévu. Le Completable se termine uniquement après la fin de la "demande" et le rappel est appelé.

Le problème est que lors de l'abonnement à ceci ces terminables dans un planificateur trampoline, il n'attend pas que la première requête se termine avant de s'abonner à la deuxième demande.

Ce code:

final Completable c1 = createRequestCompletable("1"); 
c1.subscribeOn(Schedulers.trampoline()).subscribe(); 

final Completable c2 = createRequestCompletable("2"); 
c2.subscribeOn(Schedulers.trampoline()).subscribe(); 

Sorties:

Subscribed to 1 
    Starting... 1 
Subscribed to 2 
    Starting... 2 
    Finishing... 1 
    Finishing... 2 

Comme vous le voyez, il souscrit à la deuxième Completable avant la première Completable a terminé, même si je suis en vous inscrivant trampoline.

Je voudrais faire la queue le completables, de sorte que le second attend le premier à terminer, sortie ceci:

Subscribed to 1 
    Starting... 1 
    Finishing... 1 
Subscribed to 2 
    Starting... 2 
    Finishing... 2 

Je suis sûr que le problème est lié au travail effectué dans un travailleur fil. Si l'implémentation du Completable ne génère pas un nouveau thread, cela fonctionne comme prévu. Mais c'est un code hérité et ce que j'essaie de faire, c'est de l'adapter à RX sans le modifier.

REMARQUE: les demandes sont exécutées dans différents points du programme - je ne peux pas utiliser andThen ou concat pour implémenter l'exécution sérialisée.

Répondre

0

J'ai réussi à exécuter le Completables séquentiellement en bloquant explicitement l'abonnement Thread avec un Latch. Mais je ne pense pas que ce soit la façon idiomatique de le faire en RX, et je ne comprends toujours pas pourquoi j'ai besoin de faire cela et le Thread n'est pas bloqué jusqu'à ce que le Completable soit complet.

static Completable createRequestCompletable(String name) { 
    final CountDownLatch latch = new CountDownLatch(1); 
    return Completable.create(e -> { 
     executeRequest(name,() -> { 
      e.onComplete(); 
      latch.countDown(); 
     }); 
     latch.await(); 
    }) 
    .doOnSubscribe(disposable -> System.out.println("Subscribed to " + name)); 
}