J'utilise Source.queue pour mettre en file d'attente HttpRequests et le limiter du côté client pour télécharger des fichiers depuis un serveur distant. Je comprends que Source.queue n'est pas threadsafe et nous devons utiliser MergeHub pour le rendre threadsafe. Voici le morceau de code qui utilise Source.queue et utilise cachedHostConnectionPool.akka-http Comment utiliser MergeHub pour réduire les requêtes côté client
import java.io.File
import akka.actor.Actor
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.{HttpResponse, HttpRequest, Uri}
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.concurrent.{Promise, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
class HttpClient extends Actor with RequestBuilding {
implicit val system = context.system
val logger = Logging(system, this)
implicit lazy val materializer = ActorMaterializer()
val config = ConfigFactory.load()
val remoteHost = config.getString("pool.connection.host")
val remoteHostPort = config.getInt("pool.connection.port")
val queueSize = config.getInt("pool.queueSize")
val throttleSize = config.getInt("pool.throttle.numberOfRequests")
val throttleDuration = config.getInt("pool.throttle.duration")
import scala.concurrent.ExecutionContext.Implicits.global
val connectionPool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = remoteHost, port = remoteHostPort)
// Construct a Queue
val requestQueue =
Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure)
.throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping)
.via(connectionPool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(error), p)) => p.failure(error)
}))(Keep.left)
.run()
// Convert Promise[HttpResponse] to Future[HttpResponse]
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
requestQueue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
}
}
def receive = {
case "download" =>
val uri = Uri(s"http://localhost:8080/file_csv.csv")
downloadFile(uri, new File("/tmp/compass_audience.csv"))
}
def downloadFile(uri: Uri, destinationFilePath: File) = {
def fileSink: Sink[ByteString, Future[IOResult]] =
Flow[ByteString].buffer(512, OverflowStrategy.backpressure)
.toMat(FileIO.toPath(destinationFilePath.toPath)) (Keep.right)
// Submit to queue and execute HttpRequest and write HttpResponse to file
Source.fromFuture(queueRequest(Get(uri)))
.flatMapConcat(_.entity.dataBytes)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true))
.map(_.utf8String)
.map(d => s"$d\n")
.map(ByteString(_))
.runWith(fileSink)
}
}
Cependant, quand je l'utilise MergeHub, il retourne Sink [(HttpRequest, Promise [HttpResponse]), NOTUSED]. J'ai besoin d'extraire le response.entity.dataBytes et écrire la réponse à un fichier en utilisant un fichier. Je n'arrive pas à comprendre comment utiliser MergeHub pour y parvenir. Toute aide serait appréciée.
val hub: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = queueSize)
.throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping)
.via(connectionPool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(error), p)) => p.failure(error)
}))(Keep.left)
.run()