2017-05-18 1 views
1

J'essaye de créer un exemple akka flux qui prend un fichier CSV, le change en XML (en utilisant un objet déjà existant qui a une fonction toXml), puis publie ceci à un point final. Le code que j'ai créé ressemble à ceci: -Akka flux pour la publication des données attend un graphique pas un flux en via

val poolClientFlow = 
    Http().cachedHostConnectionPool[Thing]("localhost",5000) 

val file = new File("./example.csv") 

val uploadPipeline = 
    FileIO.fromFile(file) 
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) 
    .map(_.utf8String) 
    .map(_.split(",")) 
    .map(t => Thing(t(0),t(1).toInt,t(2).toInt)) 
    .map(_.toXml) 
    .map(_.toString) 
    .map(ByteString(_)) 
    .map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d)) 
    .via(poolClientFlow) 
    .runForeach(x => System.out.println(x.toString())) 

Cependant, il ne compile pas l'appel à .via(poolClientFlow) car il a trouvé un akka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse], mais cette version via attend un akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?].

Je pense que je n'ai pas construit mon poolClientFlow correctement, mais je ne vois pas la différence entre ce que j'ai fait, et ce que j'ai vu dans un autre exemple de code. Quelqu'un peut-il aider?

Répondre

1

Le flux de cachedHostConnectionPool[T] prend un tuple de (HttpRequest,T) Cela vous permet de garder le contexte de votre demande lorsque vous obtenez finalement le résultat (Try[HttpResponse], T). Si vous n'en avez pas besoin, passez simplement dans l'unité ().

Je ne sais pas s'il existe une API qui accepte la demande.

Pour obtenir votre exemple, vous pouvez compiler ..

.map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d)) 
.via(Http().cachedHostConnectionPool[ByteString]("localhost",5000)) 

Aussi, si je vous où je n'aurais pas tout à fait tant de l » .map dans ce code. Votre sérialisation ne bloque pas et n'a pas vraiment besoin de la pression inverse entre toutes ces étapes. J'aurais une fonction pure def write(t: Thing): HttpRequest. Mais ce n'est pas une grosse affaire ...

+0

Merci! C'est réglé! Je pense que vous avez raison d'avoir autant de cartes, c'est probablement juste en ajoutant des frais généraux. – cogpp