Ce que j'essaie de réaliser est de mettre en œuvre quelque chose comme une boucle de rétroaction synchronisée avec des flux akka.Feedback synchronisé avec Akka Streams
Disons que vous avez un Flow[Int].filter(_ % 5 == 0)
. Lorsque vous diffusez un flux de Int
« s à ce flux et zipper les tuples directement derrière elle, vous obtenez quelque chose comme
(0,0)
(5,1)
(10,2)
est-il un moyen d'émettre un Option[Int]
, qui indique, si le flux émis un élément après J'ai poussé le suivant à travers ou pas?
(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...
Je pensais à mettre en œuvre mon droit de DetachedStage
devant et derrière la Flow
de tenir un état, chaque fois que le flux tiré sur la scène avant, je savais qu'il a besoin de l'élément suivant. Lorsque l'étape derrière n'a pas reçu d'élément, c'était None.
Malheureusement, les résultats ne sont pas bons et désactivés par de nombreuses positions.
côté notes
Le filtre de flux est juste un exemple, il peut être un flux à long realy, où je ne peux pas donner la possibilité d'émettre un Option
dans toutes les étapes, donc je vraiment à savoir, si le flux poussé l'autre ou ne pas demandé le lendemain de l'aval au lieu
J'ai aussi joué avec conflate
et expand
, mais ceux-ci nous sommes encore pire avec des décalages de position des résultats
Un J'ai changé dans la configuration un tampon initial
et max
pour le flux, de sorte que je peux être sûr que la demande indiquée est vraiment après l'élément que je l'ai traversé.
Il serait bon d'obtenir des suggestions sur la façon de résoudre ce problème!
Ceci est en fait vraiment une bonne idée! De cette façon, nous pouvons être sûrs de ne pas manquer un élément et ne pas avoir à lutter avec la logique pour gérer les demandes de demande à la main. Merci beaucoup! –
@Chasmo Vous êtes les bienvenus. Une réponse récente de Viktor Klang (voir ci-dessous) m'a vraiment ouvert les yeux sur la fonctionnalité des objets Source, Flow et Sink standard. J'essaie d'éviter les étapes personnalisées autant que possible. Piratage heureux. http://stackoverflow.com/questions/33817241/conditionally-skip-flow-using-akka-streams/33826267#33826267 –