2017-09-03 1 views
2

J'expérimente la tâche suivante pour obtenir ma tête autour RxJava:rxjava2 - simple exemple d'exécuter des tâches sur un pool de threads, inscrivez-vous sur un seul thread

  • Étant donné une liste d'URL
  • Do une requête HTTP pour chaque URL sur un pool de threads
  • pour chaque résultat insérer des données dans une base de données SQLite (pas multithreading ici)
  • Bloquer la méthode jusqu'à ce qu'il termine

Alors je l'ai essayé dans Kotlin:

val ex = Executors.newFixedThreadPool(10) 
Observable.fromIterable((1..100).toList()) 
    .observeOn(Schedulers.from(ex)) 
    .map { Thread.currentThread().name } 
    .subscribe { println(it + " " + Thread.currentThread().name } 

Je m'y attendais à imprimer

pool-1-thread-1 main 
pool-1-thread-2 main 
pool-1-thread-3 main 
pool-1-thread-4 main 
.... 

Cependant, il imprime:

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 

Quelqu'un peut-il corriger mes malentendus sur la façon dont cela fonctionne? Pourquoi n'utilise-t-il pas tous les threads du pool de threads? Comment puis-je faire en sorte que mon abonné s'exécute sur le thread principal ou le bloc jusqu'à la fin?

Répondre

3

Rx n'est pas conçu comme un service d'exécution parallèle, utilisez les API de Java pour cela. Les événements Rx sont synchrones et circuleront ensuite dans le flux. Lors de la construction du flux, observerOn demandera un fil une fois et traitera les émissions une par une sur ce fil.

Vous vous attendiez également à ce que subscribe soit exécuté sur le thread principal. observeOn bascule les threads et tous les événements en aval se produisent sur ce thread. Si vous voulez passer au thread principal, vous devrez insérer un autre observeOn juste avant subscribe.

1

Pour rendre le code dans votre map travail de bloc en parallèle, vous devez envelopper à observer avec le propre programmateur:

val ex = Executors.newFixedThreadPool(10) 
    val scheduler = Schedulers.from(ex) 
    Observable.fromIterable((1..100).toList()) 
      .flatMap { 
       Observable 
         .fromCallable { Thread.currentThread().name } 
         .subscribeOn(scheduler) 
      } 
      .subscribe { println(it + " " + Thread.currentThread().name) } 

Dans ce cas, vous verrez le résultat:

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-2 pool-1-thread-1 
pool-1-thread-3 pool-1-thread-1 
pool-1-thread-4 pool-1-thread-1 
... 

Vous pouvez vérifiez l'article RxJava - Achieving Parallelization qui donne des explications sur ce comportement.

En outre, RxJava 2.0.5 introduit ParallelFlowable API