2016-02-17 2 views
14

J'utilise les flux akka graphDSL pour créer un graphe exécutable. Il n'y a pas d'erreur de compilation au niveau de l'entrée/sortie des composants du flux. Runtime renvoie erreur suivante:Erreur lors de la création du graphe: échec de l'exigence: Les entrées [] et les prises [] doivent correspondre aux entrées [entrée] et sorties [sortie]

Toutes les idées que dois-je vérifier pour le faire fonctionner?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out] 
at scala.Predef$.require(Predef.scala:219) 
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168) 
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390) 
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18) 
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813) 
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109) 
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65) 
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39) 
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13) 

La structure graphique:

source ~> flowRate ~> render ~> platformPartition.in 
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0) 
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1) 
merger.out ~> evtCreator ~> Sink.ignore 
+0

Pouvez-vous réellement publier votre plan graphique? – manub

+1

J'ai mis à jour la question avec la structure du graphique. Les paramètres de source/flux/type de puits correspondent tous à l'entrée/à la sortie. – phantomastray

+0

Les types de 'render',' platformPartition', 'merger' et' evtCreator' pourraient également être utiles. – manub

Répondre

14

Vous avez une entrée ou à la sortie inutilisée (un de vos flux ou quelque chose n'est pas connecté sur tous les côtés). Voici quelques exemples:

Cela fonctionne:

val workingFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val intFlow = b.add(Flow[Int]) 
    FlowShape(intFlow.in, intFlow.out) 
    }) 

Le code suivant produit une erreur semblable à la vôtre, parce qu'il a tout un flux utilisé:

val buggyFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val intFlow = b.add(Flow[Int]) 
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow is unused 
    FlowShape(intFlow.in, intFlow.out) 
    }) 

Un exemple un peu plus complexe: Ici, il n'y a pas tout un flux inutilisé, il y a juste une sortie inutilisée. Il produit la même erreur:

val buggyFlow = 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[Int](2)) 
    val intFlow = b.add(Flow[Int]) 
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow's outlet isn't used 

    broadcast ~> intFlow 
    broadcast ~> unusedFlow 

    FlowShape(broadcast.in, intFlow.out) 
    }) 
+0

oui, nous l'avons compris depuis longtemps. Le message d'erreur est non intuitif. J'aurais dû mettre à jour la question. :) Merci, mais pour l'avoir regardé. – phantomastray