2016-11-20 2 views
0

Par exemple, j'ai le code java runnable suivant.Est-ce correct de transformer le code suivant dans rxjava?

Il s'agit d'un producteur et de plusieurs consommateurs parallèles. Ces consommateurs exécutent des tâches chronophages et fonctionnent en parallèle.

Je me demande si ce cas d'utilisation correspond à rx-java, et comment le réécrire en rx-java.

public class DemoInJava { 
    public static void main(String[] args) { 

     final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); 

     AtomicBoolean done = new AtomicBoolean(false); 
     Thread producer = new Thread(() -> { 
      int offset = 0; 
      int limit = 10; 
      while (true) { 
       if (queue.isEmpty()) { 
        if (offset < 100) {// there is 100 records in db 
         fetchDataFromDb(offset, limit).forEach(e -> queue.add(e)); 
         offset = offset + limit; 
        } else { 
         done.set(true); 
         break; // no more data 
        } 
       } else { 
        try { 
         Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way. 
        } catch (InterruptedException e) { 
        } 
       } 
      } 
     }); 

     List<Thread> consumers = IntStream.range(0, 5).boxed().map(c -> new Thread(() -> 
     { 
      while (true) { 
       Integer i = queue.poll(); 
       if (i != null) { 
        longRunJob(i); 
       } else { 
        if (done.get()) { 
         break; 
        } else { 
         try { 
          Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way. 
         } catch (InterruptedException e) { 
         } 
        } 
       } 
      } 
     })).collect(Collectors.toList()); 

     producer.start(); 
     consumers.forEach(c -> c.start()); 
    } 

    private static List<Integer> fetchDataFromDb(int offset, int limit) { 
     return IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList()); 
    } 

    private static void longRunJob(Integer i) { 
     System.out.println(Thread.currentThread().getName() + " long run job of " + i); 
    } 
} 

la sortie est:

.... 
Thread-1 long run job of 7 
Thread-1 long run job of 8 
Thread-1 long run job of 9 
Thread-4 long run job of 10 
Thread-4 long run job of 16 
Thread-10 long run job of 14 
Thread-5 long run job of 15 
Thread-8 long run job of 13 
Thread-7 long run job of 12 
Thread-9 long run job of 11 
Thread-10 long run job of 19 
Thread-4 long run job of 18 
Thread-3 long run job of 17 
.... 

Répondre

0

Voyons voir ... Tout d'abord, le code:

package rxtest; 

import static io.reactivex.Flowable.generate; 
import static io.reactivex.Flowable.just; 

import java.util.List; 
import java.util.concurrent.Executors; 
import java.util.stream.Collectors; 
import java.util.stream.IntStream; 

import io.reactivex.Emitter; 
import io.reactivex.Scheduler; 
import io.reactivex.schedulers.Schedulers; 

public class Main { 

    private static final Scheduler SCHEDULER = Schedulers.from(Executors.newFixedThreadPool(10)); 

    private static class DatabaseProducer { 
     private int offset = 0; 
     private int limit = 100; 

     void fetchDataFromDb(Emitter<List<Integer>> queue) { 
      System.out.println(Thread.currentThread().getName() + " fetching "+offset); 
      queue.onNext(IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList())); 
      offset += limit; 
     } 
    } 

    public static void main(String[] args) { 
     generate(new DatabaseProducer()::fetchDataFromDb) 
     .subscribeOn(Schedulers.io()) 
     .concatMapIterable(list -> list, 1) // 1 call, no prefetch 
     .flatMap(item -> 
       just(item) 
       .doOnNext(i -> longRunJob(i)) 
       .subscribeOn(SCHEDULER) 
       , 10) // don't subscribe to more than 10 at a time 
     .take(1000) 
     .blockingSubscribe(); 
    } 

    private static void longRunJob(Integer i) { 
     System.out.println(Thread.currentThread().getName() + " long run job of " + i); 
     try { 
      Thread.sleep(1000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

classe DatabaseProducer est simplement le producteur stateful des valeurs, car il a besoin du courant décalage. Ce n'est pas strictement nécessaire, que l'appel generate aurait pu être remplacé par

 generate(() -> 0, (offset,e) -> { 
      e.onNext(IntStream.range(offset, offset + 100).boxed() 
         .collect(Collectors.toList())); 
      return offset + 100; 
     }, e -> {}); 

Mais ce n'est pas presque aussi lisible.

Gardez à l'esprit que cocatMap et flatMap peut et va précharger et pré-souscrire à des observables/coulants jusqu'à une limite en fonction de la mise en oeuvre, même s'il n'y a pas de fils libres pour les traiter - ils vont tout simplement se file d'attente dans les planificateurs. Les nombres sur chaque appel représentent les limites que nous voulons avoir - 1 sur le concatMap parce que nous voulons extraire de la base de données seulement si c'est nécessaire (si vous mettez ici 2, vous pouvez sur-lire, mais il y aura moins de latence dans le pipeline). Si vous souhaitez effectuer un calcul lié au processeur, il est préférable d'utiliser Schedulers.computation(), car il est configuré automatiquement en fonction du nombre de processeurs du système sur lequel la machine virtuelle Java s'exécute, et vous pouvez l'utiliser depuis d'autres parties de votre code de sorte que vous ne surchargez pas le processeur.

+0

Eh bien, cela fonctionne très bien. Merci –

+0

N'oubliez pas d'accepter la réponse; C'est comme ça que ça fonctionne. –