TLDR: est-il préférable de matérialiser un flux par requête (c.-à-d. Utiliser des flux de courte durée) ou d'utiliser une matérialisation de flux unique entre requêtes, lorsque j'ai une requête http sortante du courant?akka-stream + akka-http lifecycle
Détails: J'ai un service typique qui prend une requête HTTP, la disperse vers plusieurs services en aval de tiers (non contrôlés par moi) et agrège les résultats avant de les renvoyer. J'utilise akka-http pour l'implémentation du client et pulvérise pour le serveur (héritage, passera à akka-http au fil du temps). Schématiquement:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
Ceci peut être réalisé soit en matérialisant un flux par demande ou matérialisant (parties de) courant une fois et de le partager entre les requêtes.
La matérialisation par requête implique une surcharge de la matérialisation et il n'est pas clair comment exploiter les pools de connexion avec elle. Le problème est décrit here (de nombreuses matérialisations peuvent épuiser la piscine). Je peux envelopper un pool dans un flux http de longue durée comme here et envelopper dans un mapAsync
"en amont", mais la stratégie de gestion des erreurs n'est pas claire pour moi. Lorsqu'une seule requête échoue et que le flux est terminé, est-ce que cela entraînera également l'arrêt du pool? De plus, il me semble que je vais devoir rapprocher les demandes et les réponses puisqu'elles ne sont pas retournées dans l'ordre.
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
flux de partage entre les requêtes a un problème similaire de la gestion des erreurs - il semble qu'il existe des modes de défaillance qui peuvent faire baisser ce flux avec toutes les demandes en vol. Le code sera similaire à host level API, mais avec la file d'attente devant tout le flux.
Quelle est la meilleure solution dans ce cas? J'ai essayé de mettre en œuvre les deux solutions, mais il y a de nombreux choix de conception à chaque étape de la mise en œuvre, il semble donc facile de bousiller même sur un «bon» chemin. Bien que je crois que c'est négligeable, et c'est de la même manière que fonctionne le serveur akka-http.
Merci Ramon! Mon problème est que je n'ai pas vraiment 'Source', chaque entrée dans le flux est une requête pour http, donc tout au plus' Source.single'. Comment aurais-je un flux unique dans ce cas? – Tim
De plus, comment gérer le délai d'attente de la requête dans un seul flux? Le seul délai que j'ai pu trouver est l'étape 'completionTimeout', mais il échoue un flux, ne propage pas une erreur en aval. – Tim