2017-05-28 1 views
2

J'essaie les flux akka, mais je ne peux pas obtenir de contre-pression pour travailler dans mon exemple simple. Je ne suis pas admis avec les akka (streams), donc il me manque probablement quelque chose de grand.Comprendre la contre-pression dans les flux akka Source.queue

Je produis (en file d'attente) des nombres entiers plus rapidement que les consommer, donc je pensais que la contre-pression entrerait en jeu. Mon but est de toujours consommer l'objet le plus récent qui a été mis dans la file d'attente (c'est pourquoi j'ai bufferSize = 1 et OverflowStrategy.dropHead() dans la file d'attente source).

public class SimpleStream { 
    public static void main(String[] argv) throws InterruptedException { 
     final ActorSystem system = ActorSystem.create("akka-streams"); 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Procedure<Integer> slowConsumer = (i) -> { 
      System.out.println("consuming [" + i + "]"); 
      ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS); 
     }; 

     final SourceQueue<Integer> q = Sink 
       .<Integer>foreach(slowConsumer) 
       .runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer); 

     final AtomicInteger i = new AtomicInteger(0); 
     final Thread t = new Thread(() -> { 
      while (!Thread.currentThread().isInterrupted()) { 
       int n = i.incrementAndGet(); 
       q.offer(n); 
       System.out.println("produced: [" + n + "]"); 
       ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS); 
      } 
     }); 
     t.setName("ticking"); 
     t.start(); 

     // run some time... to observe the effects. 
     ThreadUtils.sleepQuietly(1, TimeUnit.HOURS); 
     t.interrupt(); 
     t.join(); 

     // eventually shutdown akka here... 
    } 
} 

Cependant ceci est le résultat:

produced: [1] 
consuming [1] 
produced: [2] 
produced: [3] 
consuming [2] <-- Expected to be consuming 3 here. 
produced: [4] 
produced: [5] 
consuming [3] <-- Expected to be consuming 5 here. 
produced: [6] 
produced: [7] 

S'il vous plaît ignorer les trucs de filetage ici et là juste des données fausses obtenir à partir d'une source externe (comme il se passerait-il si je devais l'utiliser dans un vrai projet).

Une idée de ce qui me manque?

+0

Backpressure ne fonctionnera pas avec 'Source.queue'. Vous pouvez appeler son «offre» autant de fois que possible. Vous devez vérifier ce que «offre» renvoie. Vous voulez probablement que le producteur soit indépendant de la file d'attente des consommateurs. Jetez un oeil à 'MergeHub'. Peut-être que cela fonctionnera mieux pour vous. – expert

Répondre

0

Source.queue termine la signalisation de contre-pression. C'est pourquoi la méthode Source.queue prend un OverflowStrategy. Si la contre-pression pouvait être signalée en amont de la file d'attente, il n'y aurait pas besoin de faire face à une situation où la file d'attente pourrait potentiellement déborder. Mais comme la contre-pression ne se propage pas au-delà de la file d'attente, une stratégie doit être définie pour faire face à un producteur qui est plus rapide qu'un consommateur.

Avec des flux typiques, l'ultime Source reçoit la demande du Sink pour produire plus de résultats. Cependant, avec un flux créé à partir de Source.queue, la «source ultime» est une file d'attente. Cette file d'attente ne peut vider le contenu, s'il y en a. Il ne peut pas signaler en amont pour générer plus de résultats parce que l'amont est de l'autre côté de la méthode offer.

+0

Ok, il est logique que la "contre-pression" ne puisse pas être propagée vers le haut après la file d'attente. Dans ma question j'appelle "backpressure" l'application de la stratégie de débordement par la file d'attente. Ce serait bien si cette stratégie de débordement fonctionnait malgré le nom (c'est-à-dire que c'est la contre-pression ou non). –