2017-02-21 2 views
4

Exemple de code suivant J'injecte un biconsumer qui dort pour 100 millis comme une action d'achèvement d'un ensemble d'avenir à compléter. J'ai utilisé la méthode whenCompleteAsync en donnant un executorService à utiliser. executorService est un ThreadPoolExecutor avec la taille de la piscine de base 5, la taille max 5 et la longueur de la file d'attente de 1.Comment puis-je capturer la exception RejectedExecutionException levée dans l'invocation de WhenCompleteAsync d'un CompletableFuture?

public class CompleteTest { 
    public static void main(String[] args) { 
     ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10, 
       TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); 

     ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 

     for (int i = 0; i <100; i++) { 
      CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
      stringCompletableFuture.whenCompleteAsync((e, a) -> { 
       System.out.println("Complete " + e); 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e1) {e1.printStackTrace();} 
      }, executorService); 

      list.add(stringCompletableFuture); 
     } 

     for (int i = 0; i < list.size(); i++) { 
      list.get(i).complete(i + ""); 
     } 
    } 
} 

Quand je courais le code, même si je terminé 100 à terme seulement 6 sorties seront imprimés. C'est 5 threads de base et 1 file d'attente. Qu'arrive-t-il au reste? Si d'autres exécutables ne peuvent pas être soumis au service d'exécution en raison de la file d'attente déjà pleine, ne devrait pas y avoir une exception.

OutPut

Complete 0 
Complete 1 
Complete 2 
Complete 3 
Complete 4 
Complete 5 

Répondre

5

Une exception est levée, et un CompletableFuture est terminée à titre exceptionnel, tout simplement pas de ceux que vous suivez.

Vous instanciez et initialisez un ThreadPoolExecutor avec un constructeur qui utilise un RejectedExecutionHandler par défaut qui déclenche simplement une exception. Nous savons qu'un RejectedExecutionException est levé si un ExecutorService ne peut pas accepter une tâche. Alors, où est la tâche ajoutée et où l'exception est-elle lancée? En l'état actuel des choses, tout le chaînage se produit à whenCompleteAsync. Lorsque vous appelez cela, vous ajoutez une personne à charge au récepteur CompletableFuture, stringCompletableFuture. Lorsque stringCompletableFuture est terminé (avec succès, dans ce cas), il créera un nouveau CompletableFuture (qu'il renvoie) et tentera de planifier le BiConsumer donné sur le ExecutorService donné.

Puisque la file d'attente de ExecutorService n'a pas d'espace, elle appellera RejectedExecutionHandler qui lancera RejectedExecutionException. Cette exception est capturée à ce moment-là et utilisée pour completeExceptionally le CompletableFuture qui sera retourné. En d'autres termes, dans votre boucle for, capturez le CompletableFuture renvoyé par whenCompleteAsync, stockez-le et imprimez son état.

ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>(); 
for (int i = 0; i <100; i++) { 
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> { 
     System.out.println("Complete " + e); 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e1) {e1.printStackTrace();} 
    }, executorService); 
    dependents.add(thisWillHaveException); 
    list.add(stringCompletableFuture); 
} 

for (int i = 0; i < list.size(); i++) { 
    list.get(i).complete(i + ""); 
} 
Thread.sleep(2000); 
dependents.forEach(cf -> { 
    cf.whenComplete((r, e) -> { 
     if (e != null) 
      System.out.println(cf + " " + e.getMessage()); 
    }); 
}); 

Vous remarquerez qu'ils sont tous (sauf les 6 qui ont été imprimées avec succès plus tôt) complété exceptionnellement avec un RejectedExecutionException.

... 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]