2016-01-20 1 views
6

Au cours des derniers jours, j'ai essayé de trouver la meilleure façon de télécharger une ressource HTTP dans un fichier en utilisant Akka Streams et HTTP.Comment télécharger une ressource HTTP dans un fichier avec Akka Streams et HTTP?

Dans un premier temps j'ai commencé avec le Future-Based Variant et qui avait l'air quelque chose comme ceci:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

C'était assez bien, mais une fois que j'appris plus sur Streams pur Akka Je voulais essayer d'utiliser le Flow-Based Variant pour créer un flux à partir d'un Source[HttpRequest]. Au début, cela m'a complètement bloqué jusqu'à ce que je suis tombé sur la transformation de flux flatMapConcat. Cela a fini par un peu plus bavard:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { 
    case (responseTry, context) => (responseTry.get, context) 
} 

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { 
    case (response, _) => response.entity.dataBytes 
} 

def downloadViaFlow(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSource). 
    runWith(FileIO.toFile(file)) 
} 

Je voulais obtenir un peu délicat et utiliser l'en-tête Content-Disposition.

Pour en revenir à la variante Future-Based:

def destinationFile(downloadDir: File, response: HttpResponse): File = { 
    val fileName = response.header[ContentDisposition].get.value 
    val file = new File(downloadDir, fileName) 
    file.createNewFile() 
    file 
} 

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val file = destinationFile(downloadDir, response) 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

Mais maintenant, je ne sais pas comment faire avec la variante future basée sur. Ceci est aussi loin que je suis:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { 
    case (response, _) => 
    val source = responseToByteSource(in) 
    val file = destinationFile(downloadDir, response) 
    source.map((_, file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    val sourceWithDest: Source[(ByteString, File), Unit] = source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) 
    sourceWithDest.runWith(???) 
} 

Alors maintenant, j'ai un Source qui émet un ou plusieurs (ByteString, File) éléments pour chaque File (je dis chaque File car il n'y a aucune raison pour l'Source d'origine doit être un seul HttpRequest). Y at-il de toute façon de prendre ces et de les acheminer vers un Sink?

Je pense quelque chose comme flatMapConcat, tels que:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? 

Alors que je pouvais terminer downloadViaFlow2 avec:

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { 
    val sink = FileIO.toFile(destination, true) 
    Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) 
} 
sourceWithDest.runWithMap { 
    case (_, file) => destToSink(file) 
} 

Répondre

5

La solution ne nécessite pas flatMapConcat. Si vous ne avez pas besoin de valeurs de retour de l'écriture de fichiers, vous pouvez utiliser Sink.foreach:

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = { 
    val file = destinationFile(downloadDir, httpResponse) 
    httpResponse.entity.dataBytes.runWith(FileIO.toFile(file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = { 
    val request = HttpRequest(uri=uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 

    source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .runWith(Sink.foreach(writeFile(downloadDir))) 
} 

Notez que le Sink.foreach crée Futures de la fonction writeFile. Par conséquent, il n'y a pas beaucoup de contre-pression en cause. Le writeFile pourrait être ralenti par le disque dur, mais le flux continuerait à générer des contrats à terme. Pour contrôler cela, vous pouvez utiliser Flow.mapAsyncUnordered (ou Flow.mapAsync):

val parallelism = 10 

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.ignore) 

Si vous voulez accumuler les valeurs longues pour un nombre total vous devez combiner avec un Sink.fold:

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.fold(0L)(_ + _)) 

Le pli gardera une somme cumulée et émettent la valeur finale lorsque la source des demandes s'est tarie.

+0

Hmm J'espérais qu'il y avait un meilleur moyen que cela. Je ne suis pas sûr que cela fonctionnera correctement non plus. 'writeFile' retournera dès que le flux FileIO aura été matérialisé. Si la réponse est tronquée, elle doit être écrite dans le fichier dans l'ordre.Problème similaire avec l'utilisation de 'mapAsync'. Le paramètre 'append' devrait également être défini. De même, il semble que toute erreur d'écriture dans le fichier n'entraîne pas un signal d'erreur pour le flux externe. – Steiny

+1

@Steiny Rupture de ma réponse à vos commentaires multiples: (a) corriger, écrire le fichier retourne avec un avenir immédiatement, mais la mapAsync gère cela (b) il n'y a pas de solution qui peut corriger la chunkedsource ni cette partie de la question initiale (c) append n'est nécessaire que si écrire dans le même fichier (d) forcer le flux externe à échouer sur un fichier echec ne faisait pas partie de la question initiale. Vous avez demandé "Y at-il de toute façon à prendre ces et les acheminer vers un évier dynamique?", Ma réponse répond ** à cette question. J'ai écrit ma réponse dans le contexte de votre exemple de code ... –