2017-03-11 1 views
0

Tenir compte ce flux simple:pourquoi la surveillance de redémarrage akka-stream ne redémarre pas, mais simplement reprendre

Source(1 to 5) 
.mapAsync(1) { i => 
    if (i % 3 == 0) Future.failed(new Exception("I don't like 3")) 
    else Future.successful(i) 
} 
.withAttributes(
    ActorAttributes.supervisionStrategy(Supervision.restartingDecider) 
) 
.runForeach(i => println(s"#$i")) 

Cette imprime en fait

#1 
#2 
#4 

Quelle est la même que la stratégie de reprise. j'attendre le courant de redémarrer après l'avenir a échoué avec la sortie suivante

#1 
#2 
#1 
#2 
... 
  1. Pourquoi la reprise et de la stratégie de redémarrage se comporte de la même façon dans ce cas?
  2. Comment puis-je redémarrer le flux depuis le début?
+0

J'ai eu question connexe :) http://stackoverflow.com/q/39822628/226895 S'il vous plaît poster ici si vous trouvez la réponse. – expert

Répondre

1

Question 1: la différence entre resume et restart est que - ce dernier - la scène ne redémarre, perdant tout état interne accumulée. (Voir docs pour référence).

Dans votre cas, vous avez un étage mapAsync avec le parallélisme 1, donc vous n'aurez jamais d'état cumulé. Ceci a pour résultat resume et restart d'avoir un comportement équivalent.

Question 2: La sémantique des stratégies de supervision dans les flux Akka est liée à l'étape spécifique qui échoue. Une étape ratée n'a tout simplement aucun moyen de rejouer les éléments qui ont coulé dans le passé, car ils sont déjà partis - c'est-à-dire qu'ils ne sont pas tenus n'importe où. Aucune stratégie de supervision ne peut vous donner cela.

Ce que vous cherchez est un redémarrage du flux tout qui devrait être réalisable avec le recoverWithRetries combinateur (docs). Vous pouvez réinjecter la même source (Source(1 to 5)) dans la combinatoire pour qu'elle rejoue ces éléments.