2016-02-28 1 views
2

J'ai configuré un RunnableGraph en utilisant GraphDSL.create(). J'ai également spécifié un ClosedShape et connecté tous les points de vente/entrées. Lorsque je tente d'exécuter le programme que je reçois l'exception d'exécution suivante:Comment exécuter RunnableGraph dans Akka Streams 2.4.2 et s'assurer que les entrées/sorties sont correctement configurées?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [filter.in] and outlets [out]

Toute idée où je ne l'ai pas connecté correctement les entrées et sorties?

Voici le code graphique:

val g = RunnableGraph.fromGraph(GraphDSL.create() { 
    implicit builder => 
    import GraphDSL.Implicits._ 

    // Source 
    val A: Outlet[String] = builder.add(Source.fromIterator(() => flightDelayLines)).out 

    // Flows 
    val B: FlowShape[String, FlightEvent] = builder.add(csvToFlightEvent) 
    val C: FlowShape[FlightEvent, DelayRecord] = builder.add(flightEventToDelayRecord) 
    val D: UniformFanOutShape[DelayRecord, DelayRecord] = builder.add(Broadcast[DelayRecord](2)) 
    val F: FlowShape[DelayRecord, (Int, Int)] = builder.add(countByCarrier) 

    // Sinks 
    val E: Inlet[Any] = builder.add(Sink.ignore).in 
    val G: Inlet[Any] = builder.add(Sink.ignore).in 


    // Graph 
    A ~> B ~> flightEventToDelayRecord ~> D ~> E 
              D ~> F ~> G 

    ClosedShape 
}).run() 

Répondre

1

Je résolu mon problème. C'était un oubli très simple. Plutôt que d'utiliser C que j'ai ajouté au constructeur, j'utilisais la fonction flightEventToDelayRecord dans le graphique. La solution consistait à utiliser C dans le graphique à la place. Cela m'a fait réaliser à quel point il est important de casser les grands graphiques en petits. " L'exception d'exécution ne localise pas la cause première (par exemple, "C is unused"), il sera donc probablement plus facile de déboguer ces exceptions d'exécution si vous travaillez avec des graphes plus petits. J'espère que cela aide quelqu'un d'autre qui reste coincé.