J'ai configuré un RunnableGraph en utilisant GraphDSL.create(). J'ai également spécifié un ClosedShape et connecté tous les points de vente/entrées. Lorsque je tente d'exécuter le programme que je reçois l'exception d'exécution suivante:Comment exécuter RunnableGraph dans Akka Streams 2.4.2 et s'assurer que les entrées/sorties sont correctement configurées?
requirement failed: The inlets [] and outlets [] must correspond to the inlets [filter.in] and outlets [out]
Toute idée où je ne l'ai pas connecté correctement les entrées et sorties?
Voici le code graphique:
val g = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
// Source
val A: Outlet[String] = builder.add(Source.fromIterator(() => flightDelayLines)).out
// Flows
val B: FlowShape[String, FlightEvent] = builder.add(csvToFlightEvent)
val C: FlowShape[FlightEvent, DelayRecord] = builder.add(flightEventToDelayRecord)
val D: UniformFanOutShape[DelayRecord, DelayRecord] = builder.add(Broadcast[DelayRecord](2))
val F: FlowShape[DelayRecord, (Int, Int)] = builder.add(countByCarrier)
// Sinks
val E: Inlet[Any] = builder.add(Sink.ignore).in
val G: Inlet[Any] = builder.add(Sink.ignore).in
// Graph
A ~> B ~> flightEventToDelayRecord ~> D ~> E
D ~> F ~> G
ClosedShape
}).run()