J'ai un héritage, code non-RX qui fait du travail en réseau en engendrant un nouveau thread. Lorsque le travail se termine, il appelle une méthode sur un rappel.Code adapté à RX Completable ne bloquant pas sur thread threadsubscribe
Je n'ai aucun contrôle sur le thread sur lequel ce code est exécuté. C'est l'héritage, et il engendre un nouveau Thread
par lui-même.
Cela peut être simplifié en quelque chose comme:
interface Callback {
void onSuccess();
}
static void executeRequest(String name, Callback callback) {
new Thread(() -> {
try {
System.out.println(" Starting... " + name);
Thread.sleep(2000);
System.out.println(" Finishing... " + name);
callback.onSuccess();
} catch (InterruptedException ignored) {}
}).start();
}
Je veux convertir en un RX Completable
. Pour ce faire, j'utilise Completable#create()
. La mise en œuvre du CompletableEmitter
appelle le executeRequest
passage de l'implémentation du Callback
qui signale lorsque la demande est terminée.
J'affiche également une trace de journal lors de l'abonnement pour m'aider à déboguer.
static Completable createRequestCompletable(String name) {
return Completable.create(e -> executeRequest(name, e::onComplete))
.doOnSubscribe(d -> System.out.println("Subscribed to " + name));
}
Cela fonctionne comme prévu. Le Completable
se termine uniquement après la fin de la "demande" et le rappel est appelé.
Le problème est que lors de l'abonnement à ceci ces terminables dans un planificateur trampoline
, il n'attend pas que la première requête se termine avant de s'abonner à la deuxième demande.
Ce code:
final Completable c1 = createRequestCompletable("1");
c1.subscribeOn(Schedulers.trampoline()).subscribe();
final Completable c2 = createRequestCompletable("2");
c2.subscribeOn(Schedulers.trampoline()).subscribe();
Sorties:
Subscribed to 1
Starting... 1
Subscribed to 2
Starting... 2
Finishing... 1
Finishing... 2
Comme vous le voyez, il souscrit à la deuxième Completable
avant la première Completable
a terminé, même si je suis en vous inscrivant trampoline
.
Je voudrais faire la queue le completables
, de sorte que le second attend le premier à terminer, sortie ceci:
Subscribed to 1
Starting... 1
Finishing... 1
Subscribed to 2
Starting... 2
Finishing... 2
Je suis sûr que le problème est lié au travail effectué dans un travailleur fil. Si l'implémentation du Completable
ne génère pas un nouveau thread, cela fonctionne comme prévu. Mais c'est un code hérité et ce que j'essaie de faire, c'est de l'adapter à RX sans le modifier.
REMARQUE: les demandes sont exécutées dans différents points du programme - je ne peux pas utiliser andThen
ou concat
pour implémenter l'exécution sérialisée.