2016-12-30 2 views
2

je suis en train de traiter une certaine quantité de données en utilisant simultanément CompletableFuture et Stream Jusqu'à présent, j'ai:Java 8 CompletableFuture, Stream et Timeouts

public static void main(String[] args) throws InterruptedException, ExecutionException { 
    System.out.println("start"); 

    List<String> collect = Stream.of("1", "2", "3", "4", "5", 
      "6", "7") 
      .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))) 
      .collect(Collectors.toList()) 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    System.out.println("stop out!"); 
} 


public static Supplier<String> getStringSupplier(String text) { 
    return() -> { 

     System.out.println("start " + text); 
     try { 
      TimeUnit.SECONDS.sleep(2); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println("stop " + text); 
     return "asd" + text; 
    }; 
} 

Et la sortie est bien:

start start 1 start 4 start 3 start 2 start 5 start 6 start 7 stop 4 stop 1 stop 5 stop 2 stop 6 stop 3 stop 7 stop out!

Cependant, en ce moment, je veux ajouter un délai d'attente à ce travail. Disons qu'il devrait être annulé après 1 seconde. Et renvoie null ou une autre valeur à la liste collect. (Je préférerais une valeur indiquant la cause).

Comment puis-je y parvenir?

Merci de votre aide à l'avance.

+2

En Java 9 vous pouvez le faire facilement avec 'CompletableFuture # completeOnTimeout' (http : //download.java.net/java/jdk9/docs/api/java/util/concurrent/CompletableFuture.html#completeOnTimeout-T-long-java.util.concurrent.TimeUnit-) – eee

+0

Jusqu'à présent (et sera probablement pendant un certain temps) ma société fonctionne avec java 8 donc je ne peux pas l'utiliser :) – user2377971

Répondre

2

J'ai trouvé le moyen de le faire:

private static final ScheduledExecutorService scheduler = 
     Executors.newScheduledThreadPool(
       1, 
       new ThreadFactoryBuilder() 
         .setDaemon(true) 
         .setNameFormat("failAfter-%d") 
         .build()); 

public static void main(String[] args) throws InterruptedException, ExecutionException { 
    System.out.println("start"); 
    final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1)) 
      .exceptionally(xxx -> "timeout exception"); 
    List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") 
      .map(x -> CompletableFuture.anyOf(createTaskSupplier(x) 
        , oneSecondTimeout)) 
      .collect(Collectors.toList()) 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    System.out.println("stop out!"); 
    System.out.println(collect); 
} 

public static CompletableFuture<String> createTaskSupplier(String x) { 
    return CompletableFuture.supplyAsync(getStringSupplier(x)) 
      .exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage()); 
} 


public static Supplier<String> getStringSupplier(String text) { 
    return() -> { 

     System.out.println("start " + text); 
     try { 
      TimeUnit.MILLISECONDS.sleep(100); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     if (text.equals("1")) { 
      throw new RuntimeException("LOGIC ERROR"); 
     } 
     try { 
      if (text.equals("7")) 
       TimeUnit.SECONDS.sleep(2); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println("stop " + text); 
     return "result " + text; 
    }; 
} 

public static <T> CompletableFuture<T> failAfter(Duration duration) { 
    final CompletableFuture<T> promise = new CompletableFuture<>(); 
    scheduler.schedule(() -> { 
     final TimeoutException ex = new TimeoutException("Timeout after " + duration); 
     return promise.completeExceptionally(ex); 
    }, duration.toMillis(), MILLISECONDS); 
    return promise; 
} 

Il retourne:

start 
start 1 
start 3 
start 4 
start 2 
start 5 
start 6 
start 7 
stop 6 
stop 4 
stop 3 
stop 5 
stop 2 
stop out! 
[PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]` 

Que pensez-vous à ce sujet, pouvez-vous repérer les défauts de cette solution?

0

Vous pouvez placer le travail dans un autre objet CompletableFuture et émettre une exception TimeoutException si le temps donné est dépassé. Vous pouvez séparer le bloc catch TimeoutException si vous voulez le gérer spécialement.

List<String> collect = null; 
    try { 
     collect = CompletableFuture.supplyAsync(() -> 
       Stream.of("1", "2", "3", "4", "5", 
         "6", "7") 
         .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))) 
         .collect(Collectors.toList()) 
         .stream() 
         .map(CompletableFuture::join) 
         .collect(Collectors.toList()) 
     ).get(5, TimeUnit.SECONDS); 
    } catch (InterruptedException | ExecutionException | TimeoutException e) { 
     e.printStackTrace(); 
     //separate out the TimeoutException if you want to handle it differently 

    } 

    System.out.println(collect); //would be null in case of any exception 
+0

Problème avec cette solution est que si l'exception se produit 'collecter' La liste est' null' donc je perds les données d'exécutions réussies et je ne peux pas le laisser se produire – user2377971

0

vous pouvez essayer la méthode supplyAsync surchargée de CompletableFuture avec le paramètre exécuteur (CompletableFuture.supplyAsync (getStringSupplier (x), timeoutExecutorService)) et peut se référer link pour timeoutExecutorService.

+0

Vous avez raison tha Je pourrais travailler. Mais cette solution utilise 'Feature 'à la place sur' CompletableFuture'. Je préférerais utiliser l'objet 'CompletableFuture' parce que je veux apprendre de nouvelles choses. – user2377971

1

Pour d'autres, qui ne sont pas limitées avec Java 8, vous pouvez utiliser la méthode completeOnTimeout, qui a été introduit en Java 9.

List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") 
     .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)) 
       .completeOnTimeout(null , 1, SECONDS)) 
     .filter(Objects::nonNull) 
     .collect(toList()) 
     .stream() 
     .map(CompletableFuture::join) 
     .collect(toList());