J'essaie les flux akka, mais je ne peux pas obtenir de contre-pression pour travailler dans mon exemple simple. Je ne suis pas admis avec les akka (streams), donc il me manque probablement quelque chose de grand.Comprendre la contre-pression dans les flux akka Source.queue
Je produis (en file d'attente) des nombres entiers plus rapidement que les consommer, donc je pensais que la contre-pression entrerait en jeu. Mon but est de toujours consommer l'objet le plus récent qui a été mis dans la file d'attente (c'est pourquoi j'ai bufferSize = 1 et OverflowStrategy.dropHead() dans la file d'attente source).
public class SimpleStream {
public static void main(String[] argv) throws InterruptedException {
final ActorSystem system = ActorSystem.create("akka-streams");
final Materializer materializer = ActorMaterializer.create(system);
final Procedure<Integer> slowConsumer = (i) -> {
System.out.println("consuming [" + i + "]");
ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS);
};
final SourceQueue<Integer> q = Sink
.<Integer>foreach(slowConsumer)
.runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer);
final AtomicInteger i = new AtomicInteger(0);
final Thread t = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
int n = i.incrementAndGet();
q.offer(n);
System.out.println("produced: [" + n + "]");
ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS);
}
});
t.setName("ticking");
t.start();
// run some time... to observe the effects.
ThreadUtils.sleepQuietly(1, TimeUnit.HOURS);
t.interrupt();
t.join();
// eventually shutdown akka here...
}
}
Cependant ceci est le résultat:
produced: [1]
consuming [1]
produced: [2]
produced: [3]
consuming [2] <-- Expected to be consuming 3 here.
produced: [4]
produced: [5]
consuming [3] <-- Expected to be consuming 5 here.
produced: [6]
produced: [7]
S'il vous plaît ignorer les trucs de filetage ici et là juste des données fausses obtenir à partir d'une source externe (comme il se passerait-il si je devais l'utiliser dans un vrai projet).
Une idée de ce qui me manque?
Backpressure ne fonctionnera pas avec 'Source.queue'. Vous pouvez appeler son «offre» autant de fois que possible. Vous devez vérifier ce que «offre» renvoie. Vous voulez probablement que le producteur soit indépendant de la file d'attente des consommateurs. Jetez un oeil à 'MergeHub'. Peut-être que cela fonctionnera mieux pour vous. – expert