2017-09-21 1 views
0

J'ai lu cet article sur akka erreur de flux de manutentionTraitement des erreurs Akka Streams. Comment savoir quelle ligne a échoué?

http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html

et a écrit ce code.

val decider: Supervision.Decider = { 
    case _: Exception => Supervision.Restart 
    case _ => Supervision.Stop 
} 

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) 

val source = Source(1 to 10) 
val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")} 
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x)) 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s => 
    import GraphDSL.Implicits._ 
    source ~> flow ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
future.onComplete{ _ => 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf) 

Cela fonctionne très bien .... sauf que je dois analyser la sortie pour voir quelle ligne ne se traité. Y a-t-il un moyen pour moi d'imprimer/connecter la ligne qui a échoué? [Sans mettre explicitement try/catch blocs dans chaque flux que j'écris?]

Ainsi par exemple Si j'utilisais des acteurs (par opposition aux flux) j'aurais pu écrire un événement de cycle de vie d'un acteur et je pourrais ont enregistré quand un acteur a redémarré avec le message qui était en cours de traitement au moment du redémarrage.

mais ici je n'utilise pas d'acteurs explicitement (bien qu'ils soient utilisés en interne). Existe-t-il des événements du cycle de vie d'un flux/d'une source/d'un évier?

+0

Pourriez-vous bifurquer le courant et pousser les mauvaises à une autre file d'attente « mauvais » où ils ont juste être déconnecté (ou ce que vous voulez faire avec eux)? – Tyler

+0

Donc .. Pour bifurquer le flux, vous devrez toujours attraper toutes les exceptions, et dans la route du bloc catch vers un autre flux ... ce ne sera pas très fastidieux? Y at-il un simple était de fourchette le flux avec des messages échoués? –

+0

Oh, je n'ai pas compris votre exemple, je pensais que votre déclaration conditionnelle indiquait que vous connaissiez le cas à l'avance. – Tyler

Répondre

1

Juste une petite modification à votre code:

val decider: Supervision.Decider = { 
    case e: Exception => 
    println("Exception handled, recovering stream:" + e.getMessage) 
    Supervision.Restart 
    case _ => Supervision.Stop 
} 

Si vous passez des messages significatifs à vos exceptions dans le flux, la ligne par exemple, vous pouvez les imprimer dans la supervision decider.

J'utilisé println pour donner une réponse rapide et courte, mais recommande fortement d'utiliser certaines bibliothèques forestières telles que scala-logging