2017-09-26 1 views
4

TLDR: est-il préférable de matérialiser un flux par requête (c.-à-d. Utiliser des flux de courte durée) ou d'utiliser une matérialisation de flux unique entre requêtes, lorsque j'ai une requête http sortante du courant?akka-stream + akka-http lifecycle

Détails: J'ai un service typique qui prend une requête HTTP, la disperse vers plusieurs services en aval de tiers (non contrôlés par moi) et agrège les résultats avant de les renvoyer. J'utilise akka-http pour l'implémentation du client et pulvérise pour le serveur (héritage, passera à akka-http au fil du temps). Schématiquement:

request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response

Ceci peut être réalisé soit en matérialisant un flux par demande ou matérialisant (parties de) courant une fois et de le partager entre les requêtes.

La matérialisation par requête implique une surcharge de la matérialisation et il n'est pas clair comment exploiter les pools de connexion avec elle. Le problème est décrit here (de nombreuses matérialisations peuvent épuiser la piscine). Je peux envelopper un pool dans un flux http de longue durée comme here et envelopper dans un mapAsync "en amont", mais la stratégie de gestion des erreurs n'est pas claire pour moi. Lorsqu'une seule requête échoue et que le flux est terminé, est-ce que cela entraînera également l'arrêt du pool? De plus, il me semble que je vais devoir rapprocher les demandes et les réponses puisqu'elles ne sont pas retournées dans l'ordre.

// example of stream per request 

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port) 
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] = 
    Flow[HttpRequest] 
     .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream. 
     .via(connectionFlow) 
     .map { case (response, _) => response } 

val result = Range(1 to 5).foreach{ i => { 
    Source.single(i) 
    .map(HttpRequest(...)) 
    .via(httpFlow) 
    .mapAsync(1) { 
     // response handling logic 
    } 
    .runWith(Sink.last) 
}) 


// example of stream per request with long running http stream 

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue 
def queueRequest(request: HttpRequest): Future[HttpResponse] 

val result = Range(1 to 5).foreach{ i => { 
    Source.single(i) 
    .map(HttpRequest(...)) 
    .mapAsync(1)(queueRequest) 
    .mapAsync(1) { 
     // somehow reconcile request with response? 
     // response handling logic 
    } 
    .runWith(Sink.last) 
}) 

flux de partage entre les requêtes a un problème similaire de la gestion des erreurs - il semble qu'il existe des modes de défaillance qui peuvent faire baisser ce flux avec toutes les demandes en vol. Le code sera similaire à host level API, mais avec la file d'attente devant tout le flux.

Quelle est la meilleure solution dans ce cas? J'ai essayé de mettre en œuvre les deux solutions, mais il y a de nombreux choix de conception à chaque étape de la mise en œuvre, il semble donc facile de bousiller même sur un «bon» chemin. Bien que je crois que c'est négligeable, et c'est de la même manière que fonctionne le serveur akka-http.

Répondre

1

En général, il est préférable d'utiliser une seule connexion Flow et d'envoyer toutes vos demandes via ce flux unique. La principale raison est due au fait qu'une nouvelle matérialisation peut entraîner la formation d'un nouveau Connection à chaque fois (en fonction des paramètres de votre pool de connexions).

Vous avez raison que cela entraîne quelques complications:

Commande: En fournissant un UUID aléatoire que la 2ème valeur du tuple que vous passez à vous le flux connexion éliminer votre capacité à corréler une demande à une réponse. Cette valeur supplémentaire de T dans le tuple peut être utilisée comme "identifiant de corrélation" pour savoir quel HttpResponse vous obtenez du flux. Dans votre exemple particulier, vous pouvez utiliser la Int initiale du Range vous avez créé:

val responseSource : Source[(Try[HttpResponse], Int), _] = 
    Source 
    .fromIterator(() => Iterator range (0,5)) 
    .map(i => HttpRequest(...) -> i) 
    .via(connectionFlow) 

Maintenant, chaque réponse est livré avec la valeur Int originale que vous pouvez utiliser pour traiter la réponse.

Traitement des erreurs: Vous avez tort d'indiquer "une seule requête échoue et le flux est terminé". Un échec de requête unique n'entraîne PAS nécessairement l'échec du flux. Au lieu de cela, vous obtiendrez simplement une valeur (Failure(exception), Int) à partir du flux de connexion. Vous savez maintenant quel Int a provoqué l'échec et vous avez l'exception du flux.

+0

Merci Ramon! Mon problème est que je n'ai pas vraiment 'Source', chaque entrée dans le flux est une requête pour http, donc tout au plus' Source.single'. Comment aurais-je un flux unique dans ce cas? – Tim

+0

De plus, comment gérer le délai d'attente de la requête dans un seul flux? Le seul délai que j'ai pu trouver est l'étape 'completionTimeout', mais il échoue un flux, ne propage pas une erreur en aval. – Tim