2017-02-08 1 views
0

J'essaye de joindre un abonné reactivestream à une source akka.Impossible d'utiliser reactivestream Subscriber avec des sources de flux akka

Ma source semble fonctionner correctement avec un évier simple (comme un foreach) - mais si je mets un vrai évier, fait à partir d'un abonné, je ne reçois rien.

Mon contexte est:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import org.reactivestreams.{Subscriber, Subscription} 

implicit val system = ActorSystem.create("test") 
implicit val materializer = ActorMaterializer.create(system) 

class PrintSubscriber extends Subscriber[String] { 
    override def onError(t: Throwable): Unit = {} 
    override def onSubscribe(s: Subscription): Unit = {} 
    override def onComplete(): Unit = {} 
    override def onNext(t: String): Unit = { 
    println(t) 
    } 
} 

et mon cas de test est:

val subscriber = new PrintSubscriber() 
val sink = Sink.fromSubscriber(subscriber) 

val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc")) 
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz")) 
source1.to(sink).run()(materializer) 
source2.runForeach(println) 

Je reçois la sortie:

aaa 
bbb 
ccc 

Pourquoi ne pas obtenir xxx, yyy, et zzz?

Répondre

2

Citant les spécifications réactives cours d'eau pour l'Subscriber ci-dessous:

Will receive appel à onSubscribe (Abonnement) une fois après avoir passé une instance d'abonné à Publisher.subscribe (abonné). Aucune autre notification ne sera reçue jusqu'à ce que Subscription.request (long) soit appelé.

Le plus petit changement que vous pouvez faire pour voir certains éléments qui traversent à votre abonné est

override def onSubscribe(s: Subscription): Unit = { 
    s.request(3) 
} 

Cependant, gardez à l'esprit que ce ne sera pas le rendre pleinement conforme aux flux réactifs specs. Ce n'est pas si facile à mettre en œuvre que la raison principale derrière les boîtes à outils de plus haut niveau comme Akka-Streams lui-même.