3

Je suis en train de me couper les dents sur les flux Akka et j'ai fait un exemple d'abonné fibonacci comme suit. Cependant, je ne comprends pas encore comment la demande est initialement générée et quelle relation elle a avec la stratégie de demande de l'abonné. Quelqu'un peut-il expliquer s'il vous plaît?Akka/Scala: Pouvez-vous expliquer ce qui se passe dans ce flux Akka Streams?

FibonacciPublisher:

class FibonacciPublisher extends ActorPublisher[Long] with ActorLogging { 
    private val queue = Queue[Long](0, 1) 

    def receive = { 
    case Request(_) => // _ is the demand 
     log.debug("Received request; demand = {}.", totalDemand) 
     publish 
    case Cancel => 
     log.info("Stopping.") 
     context.stop(self) 
    case unknown => log.warning("Received unknown event: {}.", unknown) 
    } 

    final def publish = { 
    while (isActive && totalDemand > 0) { 
     val next = queue.head 
     queue += (queue.dequeue + queue.head) 

     log.debug("Producing fibonacci number: {}.", next) 

     onNext(next) 

     if (next > 5000) self ! Cancel 
    } 
    } 
} 

FibonacciSubscriber:

class FibonacciSubscriber extends ActorSubscriber with ActorLogging { 
    val requestStrategy = WatermarkRequestStrategy(20) 

    def receive = { 
    case OnNext(fib: Long) => 
     log.debug("Received Fibonacci number: {}", fib) 

     if (fib > 5000) self ! OnComplete 
    case OnError(ex: Exception) => 
     log.error(ex, ex.getMessage) 
     self ! OnComplete 
    case OnComplete => 
     log.info("Fibonacci stream completed.") 
     context.stop(self) 
    case unknown => log.warning("Received unknown event: {}.", unknown) 
    } 
} 

App Fibonacci:

val src = Source.actorPublisher(Props[FibonacciPublisher]) 
val flow = Flow[Long].map { _ * 2 } 
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber]) 

src.via(flow).runWith(sink) 

run Exemple: Question: D'où vient la demande initiale de 4 à partir?

2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4. 
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0. 
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1. 
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1. 
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2. 
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0 
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2 
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2. 
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2 
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4 
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3. 
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5. 

Répondre

1

La demande initiale à la source est fournie par le tampon d'entrée de vos étapes ultérieures. Ceci, à son tour, est configuré via l'instance ActorMaterializerSettings que vous transmettez lorsque vous initialisez votre ActorMaterializer.

Si vous ne passez aucun paramètre spécifique, akka utilisera la configuration fournie pour en initialiser une; in the default configuration vous pouvez trouver que akka.stream.materializer.initial-input-buffer-size est défini sur 4. Modifier que devrait changer votre demande initiale.

+0

Merci pour votre réponse. En jouant avec mon exemple de fibonacci ci-dessus, au lieu d'un abonné, j'ai attaché 2, et la demande initiale est devenue 16. Je vois que 16 est la valeur par défaut de 'akka.stream.materializer.max-input-buffer-size'. Je lis aussi [Buffers and working with rate] (http://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-rate.html). Ce que cela n'explique pas, et j'espère que vous le pouvez, c'est que 1) lorsque la demande est supérieure à 4 mais inférieure à 16, que se passe-t-il? Et 2) que se passe-t-il lorsque la demande est supérieure à 16? –

+0

Je pense que dans la plupart des étapes, le tampon ne sera utilisé que lorsque la source est plus rapide que le consommateur; dans ce cas, le remplissage du tampon (moins de demande) entraînera une contre-pression appliquée au consommateur. Si le consommateur est plus rapide (plus de demande), le tampon ne sera pas utilisé et les données du producteur seront directement transmises au consommateur. –

+0

Ce que vous avez dit dans votre commentaire est incorrect. Selon le doc [Tampons et travailler avec taux] (http://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-rate.html), "pour des raisons de performance Akka Streams introduit un tampon pour chaque étape de traitement ". –