2017-09-14 1 views
0

J'ai expérimenté l'écriture d'une source personnalisée en Java. Plus précisément, j'ai écrit une source qui prend des éléments d'un BlockingQueue. Je suis conscient de Source.queue, mais je ne sais pas comment obtenir la valeur matérialisée si j'en connecte plusieurs à une étape de fusion. Quoi qu'il en soit, voici la mise en œuvre:Akka Streams- une étape de fusion ne pousse parfois vers l'aval qu'une fois que toutes les sources amont y ont été poussées

public class TestingSource extends GraphStage<SourceShape<String>> { 
    private static final ExecutorService executor = Executors.newCachedThreadPool(); 

    public final Outlet<String> out = Outlet.create("TestingSource.out"); 
    private final SourceShape<String> shape = SourceShape.of(out); 

    private final BlockingQueue<String> queue; 
    private final String identifier; 

    public TestingSource(BlockingQueue<String> queue, String identifier) { 
     this.queue = queue; 
     this.identifier = identifier; 
    } 

    @Override 
    public SourceShape<String> shape() { 
     return shape; 
    } 

    @Override 
    public GraphStageLogic createLogic(Attributes inheritedAttributes) { 
     return new GraphStageLogic(shape()) { 
      private AsyncCallback<BlockingQueue<String>> callBack; 

      { 
       setHandler(out, new AbstractOutHandler() { 
        @Override 
        public void onPull() throws Exception { 
         String string = queue.poll(); 
         if (string == null) { 
          System.out.println("TestingSource " + identifier + " no records in queue, invoking callback"); 
          executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream 
         } else { 
          System.out.println("TestingSource " + identifier + " found record during pull, pushing"); 
          push(out, string); 
         } 
        } 
       }); 
      } 

      @Override 
      public void preStart() { 
       callBack = createAsyncCallback(queue -> { 
        String string = null; 
        while (string == null) { 
         Thread.sleep(100); 
         string = queue.poll(); 
        } 
        push(out, string); 
        System.out.println("TestingSource " + identifier + " found record during callback, pushed"); 
       }); 
      } 
     }; 
    } 
} 

Cet exemple fonctionne, il semble donc que ma source fonctionne correctement:

private static void simpleStream() throws InterruptedException { 
    BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 
    Source.fromGraph(new TestingSource(queue, "source")) 
      .to(Sink.foreach(record -> System.out.println(record))) 
      .run(materializer); 

    Thread.sleep(2500); 
    queue.add("first"); 

    Thread.sleep(2500); 
    queue.add("second"); 
} 

j'ai écrit aussi un exemple qui relie deux des sources à une étape de fusion:

private static void simpleMerge() throws InterruptedException { 
    BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); 
    BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(); 

    final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
      Sink.foreach(record -> System.out.println(record)), 
      (builder, out) -> { 
       final UniformFanInShape<String, String> merge = 
         builder.add(Merge.create(2)); 
       builder.from(builder.add(new TestingSource(queue1, "queue1"))) 
         .toInlet(merge.in(0)); 
       builder.from(builder.add(new TestingSource(queue2, "queue2"))) 
         .toInlet(merge.in(1)); 

       builder.from(merge.out()) 
         .to(out); 
       return ClosedShape.getInstance(); 
      })); 
    result.run(materializer); 

    Thread.sleep(2500); 
    System.out.println("seeding first queue"); 
    queue1.add("first"); 

    Thread.sleep(2500); 
    System.out.println("seeding second queue"); 
    queue2.add("second"); 
} 

Parfois, cet exemple fonctionne comme j'expect- imprime « première » au bout de 5 secondes, puis imprime « deuxième » après 5 secondes. Cependant, parfois (environ 1 fois sur 5), il imprime "seconde" après 10 secondes, puis imprime immédiatement "premier". En d'autres termes, l'étape Fusionner les chaînes en aval seulement lorsque les deux sources ont poussé quelque chose. La sortie complète ressemble à ceci:

TestingSource queue1 no records in queue, invoking callback 
TestingSource queue2 no records in queue, invoking callback 
seeding first queue 
seeding second queue 
TestingSource queue2 found record during callback, pushed 
second 
TestingSource queue2 no records in queue, invoking callback 
TestingSource queue1 found record during callback, pushed 
first 
TestingSource queue1 no records in queue, invoking callback 

Ce phénomène se produit plus fréquemment avec MergePreferred et MergePrioritized.

Ma question est - est-ce le bon comportement de Fusionner? Sinon, qu'est-ce que je fais de mal?

Répondre

0

À première vue, bloquer le fil avec un Thread.sleep au milieu de la scène semble être au moins un des problèmes.

Quoi qu'il en soit, je pense qu'il serait beaucoup plus facile d'utiliser Source.queue, comme vous l'avez mentionné au début de votre question. Si le problème est d'extraire la valeur matérialisée lors de l'utilisation du GraphDSL, voici comment vous le faites:

final Source<String, SourceQueueWithComplete<String>> source = Source.queue(100, OverflowStrategy.backpressure()); 
    final Sink<Object, CompletionStage<akka.Done>> sink = Sink.ignore(); 

    final RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<akka.Done>>> g = 
      RunnableGraph.fromGraph(
        GraphDSL.create(
          source, 
          sink, 
          Keep.both(), 
          (b, src, snk) -> { 
           b.from(src).to(snk); 
           return ClosedShape.getInstance(); 
          } 
        ) 
      ); 

    g.run(materializer); // this gives you back the queue 

Plus d'informations sur ce sujet dans le docs.

+0

Merci pour votre réponse. Qu'est-ce que je veux si vous voulez utiliser une quantité arbitraire de sources (par exemple, j'ai un 'List '), et les connecter à une étape de fusion? Comment puis-je obtenir toutes leurs files d'attente? En outre, le 'Thread.sleep' est dans le fil principal, pourquoi cela affecterait-il le flux? – akir94

+0

Pour fusionner une quantité arbitraire de source, regardez dans MergeHub. Docs ici http://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub. En ce qui concerne les bits bloquants dans votre code, vous avez un 'Thread.sleep' dans votre fonction' prestart', plus vous avez 'queue.poll' dans votre callback' onPull'. Ceux-ci sont tous bloquants et ne devraient pas être appelés à l'intérieur d'une scène graphique, sauf si vous les exécutez sur un répartiteur dédié. Lisez ceci pour plus d'informations http://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –

+0

Merci, j'ai complètement raté 'MergeHub'. Une dernière question - Je veux une fonctionnalité similaire à 'MergePrioritized', où chaque' Source' a une priorité différente. Quelle est la bonne façon d'accomplir cela avec 'MergeHub'? Les docs ne semblent pas le couvrir. – akir94