2017-10-20 25 views
0

Supposons que j'ai 3 services. D'abord j'appelle serviceA, il renvoie un CompletableFuture. Après cela avec le résultat j'appelle serviceB et serviceC paralelly (thenCompose()). Après avoir obtenu tous les résultats, je voudrais combiner les 3 résultats et les renvoyer à un appelant. Dans l'appelant, je voudrais attendre millseconds ensemble X au processus de sorte que:Interrupt CompletableFuture avec la valeur par défaut

  • Si j'interrompre le processus pendant serviceA appel est en cours: jeter une exception (il est obligatoire)
  • Si J'interromps le processus pendant que les appels serviceB et serviceC sont en cours: retourne une valeur par défaut (ils sont optionnels). Ce est la raison pour laquelle je tente d'utiliser la méthode getNow(fallback) du CompletableFuture

S'il vous plaît vérifier ci-dessous mes extraits de code, si j'utilise de longs retards dans serviceB et serviceC appels, je termine toujours avec un TimeoutException. Comment puis-je faire cela?

public CompletableFuture<Result> getFuture() { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    return CompletableFuture.allOf(resultB, resultC) 
      .thenApply(ignoredVoid -> combine(
        resultA.join(), 
        resultB.getNow(fallbackB), 
        resultC.getNow(fallbackC)); 
} 

public Result extractFuture(CompletableFuture<Result> future) { 
    Result result; 
    try { 
     result = future.get(timeOut, MILLISECONDS); 
    } catch (ExecutionException ex) { 
     ... 
    } catch (InterruptedException | TimeoutException ex) { 
     // I always ends up here... 
    } 
    return result; 
} 

Répondre

2

L'avenir retourné par .allOf(resultB, resultC) est terminé que lorsque les deux, resultB et resultC sont terminés, par conséquent, la fonction dépendante ignoredVoid -> combine(resultA.join(), resultB.getNow(fallbackB), resultC.getNow(fallbackC) ne s'évalué si resultB et resultC sont terminés et en fournissant une solution de repli n'a pas d'effet du tout.

Il est généralement impossible de réagir sur un appel get() au sein de ces fonctions. Ce qui devrait être évident étant donné qu'il peut y avoir un nombre arbitraire d'appels get() sur le futur à différents moments avec des délais d'attente différents, mais la fonction passée à thenApply n'est évaluée qu'une seule fois.

La seule façon de gérer une durée d'expiration des consommateurs dans getFuture() est de changer pour retourner une fonction qui reçoit le délai d'attente:

interface FutureFunc<R> { 
    R get(long time, TimeUnit u) throws ExecutionException; 
} 
public FutureFunc<Result> getFuture() { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    CompletableFuture<Result> optimistic = CompletableFuture.allOf(resultB, resultC) 
     .thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), resultC.join())); 
    return (t,u) -> { 
     try { 
      return optimistic.get(t, u); 
     } catch (InterruptedException | TimeoutException ex) { 
      return combine(resultA.join(), resultB.getNow(fallbackB), 
              resultC.getNow(fallbackC)); 
     } 
    }; 
} 

public Result extractFuture(FutureFunc<Result> future) { 
    Result result; 
    try { 
     result = future.get(timeOut, MILLISECONDS); 
    } catch (ExecutionException ex) { 
     ... 
    } 
    return result; 
} 

Maintenant, différents appels avec différents délais d'attente peuvent être faites, avec des résultats éventuellement différents Tant que B ou C ne sont pas encore terminés. Pas qu'il y ait une certaine ambiguïté concernant la méthode combine qui peut aussi prendre du temps.

Vous pouvez changer la fonction à un

return (t,u) -> { 
    try { 
     if(resultB.isDone() && resultC.isDone()) return optimistic.get(); 
     return optimistic.get(t, u); 
    } catch (InterruptedException | TimeoutException ex) { 
     return combine(resultA.join(), resultB.getNow(fallbackB), 
             resultC.getNow(fallbackC)); 
    } 
}; 

attendre l'achèvement d'une course peut-être déjà combine. Dans les deux cas, il n'y a aucune garantie que le résultat est livré dans le délai spécifié, car même si les valeurs de repli pour B et C sont utilisées, il y aura une exécution de combine qui peut prendre un certain temps.

Si vous voulez l'annulation comme comportement, à savoir que toutes les requêtes de résultats renvoient le même résultat, même si elle a été calculée à partir des valeurs de repli en raison de la première requête, vous pouvez utiliser à la place

public FutureFunc<Result> getFuture() { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC); 
    CompletableFuture<Result> result = bAndC 
     .thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), 
                  resultC.join())); 
    return (t,u) -> { 
     try { 
      bAndC.get(t, u); 
     } catch (InterruptedException|TimeoutException ex) { 
      resultB.complete(fallbackB); 
      resultC.complete(fallbackC); 
     } 
     try { 
      return result.get(); 
     } catch (InterruptedException ex) { 
      throw new ExecutionException(ex); 
     } 
    }; 
} 

Avec cette , toutes les requêtes sur un seul FutureFunc retourneront systématiquement le même résultat, même s'il est basé sur des valeurs de repli dues au premier délai.Cette variante exclut également systématiquement l'exécution de combine à partir du délai d'expiration.

Bien sûr, si des délais d'attente différents ne sont pas du tout prévus, vous pouvez refactoriser getFuture() pour obtenir le délai d'attente souhaité à l'avance, par ex. en tant que paramètre. Cela simplifierait considérablement la mise en œuvre et pourrait à nouveau donner un avenir:

public CompletableFuture<Result> getFuture(long timeOut, TimeUnit u) { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
    e.schedule(() -> resultB.complete(fallbackB), timeOut, u); 
    e.schedule(() -> resultC.complete(fallbackC), timeOut, u); 
    CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC); 
    bAndC.thenRun(e::shutdown); 
    return bAndC.thenApply(ignoredVoid -> 
          combine(resultA.join(), resultB.join(), resultC.join())); 
} 
+0

C'est exactement ce que je cherchais. Cela fonctionne comme un charme, merci pour la réponse détaillée! – kornisb