2015-08-06 1 views
3

Ce que j'essaie de réaliser est de mettre en œuvre quelque chose comme une boucle de rétroaction synchronisée avec des flux akka.Feedback synchronisé avec Akka Streams

Disons que vous avez un Flow[Int].filter(_ % 5 == 0). Lorsque vous diffusez un flux de Int « s à ce flux et zipper les tuples directement derrière elle, vous obtenez quelque chose comme

(0,0) 
(5,1) 
(10,2) 

est-il un moyen d'émettre un Option[Int], qui indique, si le flux émis un élément après J'ai poussé le suivant à travers ou pas?

(Some(0),0) 
(None, 1) 
(None, 2) 
(None, 3) 
(None, 4) 
(Some(5), 5) 
(None, 6) 
... 

Je pensais à mettre en œuvre mon droit de DetachedStage devant et derrière la Flow de tenir un état, chaque fois que le flux tiré sur la scène avant, je savais qu'il a besoin de l'élément suivant. Lorsque l'étape derrière n'a pas reçu d'élément, c'était None.

Malheureusement, les résultats ne sont pas bons et désactivés par de nombreuses positions.

côté notes

Le filtre de flux est juste un exemple, il peut être un flux à long realy, où je ne peux pas donner la possibilité d'émettre un Option dans toutes les étapes, donc je vraiment à savoir, si le flux poussé l'autre ou ne pas demandé le lendemain de l'aval au lieu

J'ai aussi joué avec conflate et expand, mais ceux-ci nous sommes encore pire avec des décalages de position des résultats

Un J'ai changé dans la configuration un tampon initial et max pour le flux, de sorte que je peux être sûr que la demande indiquée est vraiment après l'élément que je l'ai traversé.

Il serait bon d'obtenir des suggestions sur la façon de résoudre ce problème!

Répondre

2

Je ne peux pas produire exactement ce que vous cherchez. Mais je peux finagle un avenir de ce que vous cherchez, par exemple:

(Future(Some(0)), 0) 
(Future(None) , 1) 
(Future(None) , 2) 
... 

L'expansion de votre exemple, si on leur donne un flux qui ne peut pas être changé:

val flow = Flow[Int].filter(_ % 5 == 0) 

Ensuite, ce flux peut être évaluée sur une entrée unique et le résultat converti en un Option:

import scala.concurrent.{Future, Promise} 
import akka.stream.{Materializer, ActorMaterializer} 
import akka.stream.scaladsl.{Source,Sink} 

def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = { 
    val fut : Future[Int] = 
    Source.single(in) 
      .via(flow) 
      .runWith(Sink.head) //Throws an Exception if filter fails 

    fut.map(Some(_))    //  val => Some(val) 
    .fallbackTo(Promise(None)) // Exception => None 
} 

Cette fonction retourne un Future[Option[Int]]. On peut alors utiliser l'évaluation de combiner simplement le résultat à l'entrée:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) = 
    (evalFlow(in, flow), in)//(Future[Option[Int]], Int) 

Et, enfin, la fonction evalAndCombine peut être placé après votre source de Ints:

import akka.actor.ActorSystem 

implicit val actorSystem = ActorSystem() 
implicit val mat = ActorMaterializer() 
import actorSystem.dispatcher 

val exampleSource = Source(() => (1 to 6).toIterator) 

val tupleSource = exampleSource map evalAndCombine(flow) 

De même, si vous voulez un Future[(Option[Int], Int)] au lieu de (Future[Option[Int]], Int), par exemple:

Future[(Some(0), 0)] 
Future[(None , 1)] 
... 

ensuite modifier légèrement la fonction moissonneuse-batteuse:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) = 
    evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)] 
+0

Ceci est en fait vraiment une bonne idée! De cette façon, nous pouvons être sûrs de ne pas manquer un élément et ne pas avoir à lutter avec la logique pour gérer les demandes de demande à la main. Merci beaucoup! –

+1

@Chasmo Vous êtes les bienvenus. Une réponse récente de Viktor Klang (voir ci-dessous) m'a vraiment ouvert les yeux sur la fonctionnalité des objets Source, Flow et Sink standard. J'essaie d'éviter les étapes personnalisées autant que possible. Piratage heureux. http://stackoverflow.com/questions/33817241/conditionally-skip-flow-using-akka-streams/33826267#33826267 –