2016-06-15 2 views
1

J'ai le code suivant:seule tâche CompletableFuture qui se poursuit avec de nombreuses tâches parallèles

return CompletableFuture.supplyAsync(() -> { 
    return foo; // some custom object 
}) 
.thenAccept(foo -> { 
    // ??? need to spawn N async parallel jobs that works on 'foo' 
}); 

En anglais: la première tâche crée l'objet foo de manière asynchrone; et puis j'ai besoin d'exécuter N processus parallèles sur elle.

Y at-il une meilleure façon de le faire alors:

... 
CompletableFuture[] parallel = new CompletableFuture[N]; 
for (int i = 0; i < N; i++) { 
    parallel[i] = CompletableFuture.runAsync(() -> { 
     work(foo); 
    }); 
} 
CompletableFuture.allOf(parallel).join(); 
... 

Je n'aime pas cela comme un thread est verrouillé en travaux en attente N à la fin.

+0

Pourquoi avez-vous cette ligne 'CompletableFuture.allOf (parallel) .join();' lorsque vous ne voulez pas attendre la fin? Personne ne vous demande d'attendre ... – Holger

+0

J'étais aveugle en ce moment. – igr

Répondre

1

Vous pouvez enchaîner autant d'emplois indépendants comme vous le souhaitez sur un travail prérequis particulier, par exemple

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo()); 
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo))); 

va engendrer N emplois parallèles, invoquant work(foo) en même temps, après l'achèvement de la tâche initiale qui fournit l'instance Foo. Mais gardez à l'esprit que le framework sous-jacent prendra en compte le nombre de cœurs CPU disponibles pour dimensionner le pool de threads exécutant réellement les jobs parallèles, donc si N > #cores, certains de ces jobs peuvent s'exécuter les uns après les autres.

Si le travail est lié aux E/S, vous voulez donc avoir un plus grand nombre de threads parallèles, vous devez spécifier votre propre exécuteur.


Le nCopies/forEach n'est pas nécessaire, une boucle for ferait aussi bien, mais il donne une idée de la façon de gérer les travaux ultérieurs, qui dépendent de la réalisation de tous ces travaux parallèles:

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo()); 
CompletableFuture<Void> all = CompletableFuture.allOf(
    Collections.nCopies(N, base).stream() 
     .map(f -> f.thenAcceptAsync(foo -> work(foo))) 
     .toArray(CompletableFuture<?>[]::new)); 

Vous pouvez maintenant utiliser all pour vérifier la fin de tous les travaux ou enchaîner des actions supplémentaires.

+0

* "Si le travail est lié aux E/S, vous voulez donc avoir un plus grand nombre de threads parallèles" * - pourquoi est-ce? Peut-être vous manque ** synchrone ** E/S lié, ou mieux encore, * "Si le travail est ** bloquant **" *. – acelent

+0

@acelent: [I/O bound] (https://en.wikipedia.org/wiki/I/O_bound) est un terme général utilisé pour toutes sortes de tâches potentiellement bloquantes, par opposition aux tâches * liées au processeur *. L'exécuteur par défaut utilisé par 'CompletableFuture' est configuré pour les tâches liées à l'UC, tout ce qu'il faut savoir ... – Holger

+0

*" [I/O bound] (https://en.wikipedia.org/wiki/I/O_bound) est un terme général utilisé pour toutes sortes de tâches potentiellement bloquantes, par opposition aux tâches liées au CPU. "* - Alors, permettez-moi d'être en désaccord. Je ai ponté 'CompletableFuture' sur les méthodes de' AsynchronousSocketChannel' qui prennent un 'CompletionHandler', et il n'a certainement pas besoin de plus de threads de pool de threads que les cœurs, car il n'a pas bloqué. Cependant, utiliser n'importe quel type de code de blocage, que ce soit des E/S, en attente ou en veille, nécessitera probablement plus de threads de pool de threads pour avoir autant de threads exécutables que de cœurs.Il n'y a pas d'E/S en attente ou en train de dormir. – acelent

0

Depuis CompletableFuture.allOf retourne déjà un autre CompletableFuture<Void> un vous pouvez juste faire une autre .thenAccept sur et en extraire les valeurs renvoyées des SEF parallel dans le rappel, de cette façon vous éviter d'appeler join

+0

Ou je peux créer un tableau à l'extérieur puis créer des tâches parallèles avec 'thenAcceptAsync' dans une boucle. – igr