2017-09-29 4 views
1

J'ai une source qui regroupe des éléments et un récepteur qui effectue une requête par lots, J'utilise KillSwitch pour pouvoir arrêter le graphique à un moment quelconque. Le problème que les dossiers du dernier lot incomplet que les sorties source se perdent quand switch.shutdown() est appeléFlux Akka - flux d'arrêt avec regroupement sans perte de données

val source = Source.tick(10.millis, 10.millis, "tick").grouped(500) 

val (switch, _) = source.viaMat(KillSwitches.single)(Keep.right) 
.toMat(sink)(Keep.both).run() 

Thread.sleep(3000) // wait some arbitrary time 

switch.shutdown() 

est-il un moyen de « débusquer » le lot incomplet lorsque l'arrêt se produit?

Répondre

3

Le comportement de l'arrêt de coupe-circuit est de position, selon ses docs

Après avoir appelé [[UniqueKillSwitch # shutdown()]] l'instance en cours d'exécution de le [[Graphique]] de [[FlowShape ]] qui s'est matérialisé en [[UniqueKillSwitch]] complètera son flux aval et annulera son en amont (sauf si terminé ou échoué, auquel cas la commande est ignorée).

Voir aussi plus docs here.

Maintenant, l'étape grouped émettra un groupe partiellement rempli uniquement à l'achèvement, mais pas lorsqu'il est annulé.

Cela signifie que le graphique ci-dessous (regroupées avant Killswitch) se comportera comme vous avez observé

val switch = 
    Source.tick(10.millis, 175.millis, "tick") 
      .grouped(10) 
      .viaMat(KillSwitches.single)(Keep.right) 
      .toMat(Sink.foreach(println))(Keep.left) 
      .run() 

tandis que le graphique ci-dessous (regroupés après Killswitch) émettra en aval des groupes partiels à la fin

val switch = 
    Source.tick(10.millis, 175.millis, "tick") 
      .viaMat(KillSwitches.single)(Keep.right) 
      .grouped(10) 
      .toMat(Sink.foreach(println))(Keep.left) 
      .run() 
+0

Merci beaucoup, ça marche! –

+0

@ stanislav.chetvertkov vous êtes les bienvenus :) pourriez-vous accepter la réponse? –

+0

Merci pour l'explication. Bonne réponse. –