2017-08-22 11 views
0

J'essaie de mettre en œuvre un flux de filtrage de flux akka qui recueille des statistiques sur les données traitées et matérialise les statistiques qui en résultent.État dynamique matérialisant l'état final

class SFilter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { 
    val in = Inlet[A]("SFilter.in") 
    val out = Outlet[A]("SFilter.out") 
    val shape = FlowShape.of(in, out) 
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     var positive: Long = 0 
     var negative: Long = 0 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) { 
      push(out, elem) 
      positive += 1 
      } else { 
      pull(in) 
      negative += 1 
      } 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
} 

Jusqu'à présent, si bon, mais je me SFilter[A] être de type Flow[A,A,(Long,Long)]. Comment puis-je matérialiser (positive,negative) à la fin de la comnputation?

+0

Peut-être que vous n'êtes pas à la recherche de cette réponse, mais ce que vous semblez faire est essentiellement un 'fold'. Donc, sans écrire votre propre scène, vous pourriez simplement réutiliser le combinateur 'fold' existant. – jrudolph

+0

@jrudolph oui j'y pense, mais je préfère utiliser fold avec un accumulateur immuable. Je compare cette approche avec une version mutable (mais sûre). – paradigmatic

Répondre

0

Vous ne pouvez pas matérialiser un Tuple2[Long, Long] car ces Longs dépendront du flux en cours lui-même. Vous pouvez cependant matérialiser un Future[Tuple2[Long, Long]] que vous complétez ensuite lorsque le flux est terminé. Editer: Et vous voulez nommer votre scène personnalisée différemment afin que vous puissiez faire la différence entre un filtre normal et votre SFilter.

+0

Un Futur [Tuple2 [Long, Long]] 'sera également bon. Mais comment ? Je change également le nom en fonction de votre suggestion. – paradigmatic

+0

@paradigmatic lire cette section dans la documentation: http://doc.akka.io/docs/akka/current/scala/stream/stream-customize.html#custom-materialized-values ​​ –

+0

@Viktor_Klang merci, j'ai pu pour écrire une solution grâce au lien – paradigmatic

0

Merci aux suggestions de Viktor Klang, j'ai pu mettre en œuvre la solution suivante:

class SFilter[A](p: A => Boolean) extends GraphStageWithMaterializedValue[FlowShape[A,A],Future[(Long,Long)]] { 
    val in = Inlet[A]("SFilter.in") 
    val out = Outlet[A]("SFilter.out") 
    val shape = FlowShape.of(in, out) 
    override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { 
    val result = Promise[(Long,Long)]() 
    val logic = new GraphStageLogic(shape) { 
     var positive: Long = 0 
     var negative: Long = 0 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) { 
      push(out, elem) 
      positive += 1 
      } else { 
      pull(in) 
      negative += 1 
      } 
     } 
     override def onUpstreamFinish(): Unit = { 
      result.success((positive,negative)) 
      completeStage() 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
     (logic, result.future) 
    } 
}