J'ai expérimenté l'écriture d'une source personnalisée en Java. Plus précisément, j'ai écrit une source qui prend des éléments d'un BlockingQueue. Je suis conscient de Source.queue, mais je ne sais pas comment obtenir la valeur matérialisée si j'en connecte plusieurs à une étape de fusion. Quoi qu'il en soit, voici la mise en œuvre:Akka Streams- une étape de fusion ne pousse parfois vers l'aval qu'une fois que toutes les sources amont y ont été poussées
public class TestingSource extends GraphStage<SourceShape<String>> {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public final Outlet<String> out = Outlet.create("TestingSource.out");
private final SourceShape<String> shape = SourceShape.of(out);
private final BlockingQueue<String> queue;
private final String identifier;
public TestingSource(BlockingQueue<String> queue, String identifier) {
this.queue = queue;
this.identifier = identifier;
}
@Override
public SourceShape<String> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape()) {
private AsyncCallback<BlockingQueue<String>> callBack;
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
String string = queue.poll();
if (string == null) {
System.out.println("TestingSource " + identifier + " no records in queue, invoking callback");
executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream
} else {
System.out.println("TestingSource " + identifier + " found record during pull, pushing");
push(out, string);
}
}
});
}
@Override
public void preStart() {
callBack = createAsyncCallback(queue -> {
String string = null;
while (string == null) {
Thread.sleep(100);
string = queue.poll();
}
push(out, string);
System.out.println("TestingSource " + identifier + " found record during callback, pushed");
});
}
};
}
}
Cet exemple fonctionne, il semble donc que ma source fonctionne correctement:
private static void simpleStream() throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Source.fromGraph(new TestingSource(queue, "source"))
.to(Sink.foreach(record -> System.out.println(record)))
.run(materializer);
Thread.sleep(2500);
queue.add("first");
Thread.sleep(2500);
queue.add("second");
}
j'ai écrit aussi un exemple qui relie deux des sources à une étape de fusion:
private static void simpleMerge() throws InterruptedException {
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>();
final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
Sink.foreach(record -> System.out.println(record)),
(builder, out) -> {
final UniformFanInShape<String, String> merge =
builder.add(Merge.create(2));
builder.from(builder.add(new TestingSource(queue1, "queue1")))
.toInlet(merge.in(0));
builder.from(builder.add(new TestingSource(queue2, "queue2")))
.toInlet(merge.in(1));
builder.from(merge.out())
.to(out);
return ClosedShape.getInstance();
}));
result.run(materializer);
Thread.sleep(2500);
System.out.println("seeding first queue");
queue1.add("first");
Thread.sleep(2500);
System.out.println("seeding second queue");
queue2.add("second");
}
Parfois, cet exemple fonctionne comme j'expect- imprime « première » au bout de 5 secondes, puis imprime « deuxième » après 5 secondes. Cependant, parfois (environ 1 fois sur 5), il imprime "seconde" après 10 secondes, puis imprime immédiatement "premier". En d'autres termes, l'étape Fusionner les chaînes en aval seulement lorsque les deux sources ont poussé quelque chose. La sortie complète ressemble à ceci:
TestingSource queue1 no records in queue, invoking callback
TestingSource queue2 no records in queue, invoking callback
seeding first queue
seeding second queue
TestingSource queue2 found record during callback, pushed
second
TestingSource queue2 no records in queue, invoking callback
TestingSource queue1 found record during callback, pushed
first
TestingSource queue1 no records in queue, invoking callback
Ce phénomène se produit plus fréquemment avec MergePreferred et MergePrioritized.
Ma question est - est-ce le bon comportement de Fusionner? Sinon, qu'est-ce que je fais de mal?
Merci pour votre réponse. Qu'est-ce que je veux si vous voulez utiliser une quantité arbitraire de sources (par exemple, j'ai un 'List
Pour fusionner une quantité arbitraire de source, regardez dans MergeHub. Docs ici http://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub. En ce qui concerne les bits bloquants dans votre code, vous avez un 'Thread.sleep' dans votre fonction' prestart', plus vous avez 'queue.poll' dans votre callback' onPull'. Ceux-ci sont tous bloquants et ne devraient pas être appelés à l'intérieur d'une scène graphique, sauf si vous les exécutez sur un répartiteur dédié. Lisez ceci pour plus d'informations http://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –
Merci, j'ai complètement raté 'MergeHub'. Une dernière question - Je veux une fonctionnalité similaire à 'MergePrioritized', où chaque' Source' a une priorité différente. Quelle est la bonne façon d'accomplir cela avec 'MergeHub'? Les docs ne semblent pas le couvrir. – akir94