2017-09-22 1 views
1

J'utilise Source.queue pour mettre en file d'attente HttpRequests et le limiter du côté client pour télécharger des fichiers depuis un serveur distant. Je comprends que Source.queue n'est pas threadsafe et nous devons utiliser MergeHub pour le rendre threadsafe. Voici le morceau de code qui utilise Source.queue et utilise cachedHostConnectionPool.akka-http Comment utiliser MergeHub pour réduire les requêtes côté client

import java.io.File 

import akka.actor.Actor 
import akka.event.Logging 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.client.RequestBuilding 
import akka.http.scaladsl.model.{HttpResponse, HttpRequest, Uri} 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

import scala.concurrent.{Promise, Future} 
import scala.concurrent.duration._ 
import scala.util.{Failure, Success} 

class HttpClient extends Actor with RequestBuilding { 

    implicit val system = context.system 
    val logger = Logging(system, this) 
    implicit lazy val materializer = ActorMaterializer() 

    val config = ConfigFactory.load() 
    val remoteHost = config.getString("pool.connection.host") 
    val remoteHostPort = config.getInt("pool.connection.port") 
    val queueSize = config.getInt("pool.queueSize") 
    val throttleSize = config.getInt("pool.throttle.numberOfRequests") 
    val throttleDuration = config.getInt("pool.throttle.duration") 

    import scala.concurrent.ExecutionContext.Implicits.global 

    val connectionPool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = remoteHost, port = remoteHostPort) 

    // Construct a Queue 
    val requestQueue = 
     Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure) 
     .throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping) 
     .via(connectionPool) 
     .toMat(Sink.foreach({ 
      case ((Success(resp), p)) => p.success(resp) 
      case ((Failure(error), p)) => p.failure(error) 
     }))(Keep.left) 
     .run() 

    // Convert Promise[HttpResponse] to Future[HttpResponse] 
    def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
     val responsePromise = Promise[HttpResponse]() 
     requestQueue.offer(request -> responsePromise).flatMap { 
      case QueueOfferResult.Enqueued => responsePromise.future 
      case QueueOfferResult.Dropped  => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
      case QueueOfferResult.Failure(ex) => Future.failed(ex) 
      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
     } 
    } 

    def receive = { 
     case "download" => 
     val uri = Uri(s"http://localhost:8080/file_csv.csv") 
     downloadFile(uri, new File("/tmp/compass_audience.csv")) 
    } 

    def downloadFile(uri: Uri, destinationFilePath: File) = { 

     def fileSink: Sink[ByteString, Future[IOResult]] = 
      Flow[ByteString].buffer(512, OverflowStrategy.backpressure) 
      .toMat(FileIO.toPath(destinationFilePath.toPath)) (Keep.right) 

     // Submit to queue and execute HttpRequest and write HttpResponse to file 
     Source.fromFuture(queueRequest(Get(uri))) 
      .flatMapConcat(_.entity.dataBytes) 
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true)) 
      .map(_.utf8String) 
      .map(d => s"$d\n") 
      .map(ByteString(_)) 
      .runWith(fileSink) 

    } 
} 

Cependant, quand je l'utilise MergeHub, il retourne Sink [(HttpRequest, Promise [HttpResponse]), NOTUSED]. J'ai besoin d'extraire le response.entity.dataBytes et écrire la réponse à un fichier en utilisant un fichier. Je n'arrive pas à comprendre comment utiliser MergeHub pour y parvenir. Toute aide serait appréciée.

val hub: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = queueSize) 
    .throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping) 
    .via(connectionPool) 
    .toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(error), p)) => p.failure(error) 
    }))(Keep.left) 
    .run() 

Répondre

1

Source.Queue est réellement thread-safe maintenant. Si vous souhaitez utiliser MergeHub:

private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = 
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host).tail.head, port, connectionPoolSettings) 


    val ServerSink = 
    poolFlow.toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 

    // Attach a MergeHub Source to the consumer. This will materialize to a 
    // corresponding Sink. 
    val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink) 


    val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run() 



    protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = { 
    val responsePromise = Promise[HttpResponse]() 
    Source.single((httpRequest -> responsePromise)).runWith(toConsumer) 
    responsePromise.future.flatMap(handleHttpResponse(_, unmarshal)) 
    ) 
    } 

}