2017-07-11 2 views
0

Mon code actuel a la signature:Création d'un flux de AKKA puits qui envoie toutes les données à un point final http

def putObject(key: String, contentType: ContentType, data: Source[ByteString,_]): Future[HttpReponse] 

(qui simplement les délégués à Akka-http client)

Je voudrais la signature être

def putObject(key: String, contentType: ContentType): Sink[ByteString, Future[HttpReponse]] 

Comment puis-je créer un évier qui consume tout le corps comme des chaînes ordinaires pour un seul HttpRequest sans mémoire tampon tous les octets dans la note ry?

Répondre

0

Inverser le contrôle de ce qui court le flux peut être impossible. Dans la première API, un utilisateur donne une source à l'implémentation de la bibliothèque et donne à la bibliothèque le contrôle sur le moment où cette source sera exécutée.

La deuxième API renvoie le récepteur à la place. Cela signifie également que le contrôle de l'exécution du flux est rendu à l'utilisateur. Cela n'est pas pris en charge, car une partie du flux doit maintenant être exécutée par l'utilisateur et une autre partie du même flux par la bibliothèque Http.

Ce que vous pouvez faire est de changer l'API pour être:

def putObject(key: String, contentType: ContentType): Sink[Source[ByteString, NotUsed], Future[HttpResponse]] = 

Et la mise en œuvre serait alors comme:

val conn = Http().outgoingConnection(host = ???) 
val promise = Promise[HttpResponse] 
Flow[Source[ByteString, NotUsed]] 
    .map(data => HttpRequest(entity = HttpEntity(contentType, data))) 
    .via(conn) 
    .toMat(Sink.head)(Keep.right)