2017-08-21 5 views
0

J'ai un code qui exécute un pipeline utilisant des flux Akka.Akka flux - peut-il être échelle comme des acteurs réguliers ou d'une autre manière?

Ma question est quelle est la meilleure façon de l'agrandir? Peut-il être fait en utilisant des ruisseaux Akka aussi?

Ou il doit être converti en acteurs/autre façon?

L'extrait de code est:

val future = SqsSource(sqsEndpoint)(awsSqsClient) 
.takeWhile(_=>true) 
.map { m: Message => 
(m, Ack()) 
}.runWith(SqsAckSink(sqsEndpoint)(awsSqsClient)) 

Répondre

0

Si vous modifiez votre code un peu alors votre flux sera matérialisée dans plusieurs valeurs Actor. Ces acteurs matérialisées vous obtiendrez la concurrence que vous recherchez:

val future = 
    SqsSource(sqsEnpoint)(awsSqsClient)   //Actor 1 
    .via(Flow[Message] map (m => (m, Ack()))) //Actor 2 
    .to(SqsAckSink(sqsEndpoint)(awsSqsClient)) //Actor 3 
    .run() 

Notez l'utilisation de via et to. Ceux-ci sont importants car ils indiquent que ces étapes du flux devraient être matérialisées en Acteurs séparés. Dans votre exemple de code, vous utilisez map et runWith sur le Source ce qui entraînerait la création d'un seul acteur à cause de operator fusion.

Flows qui demandent des acteurs externes

Si vous cherchez à étendre à d'autres acteurs encore, vous pouvez utiliser Flow#mapAsync pour interroger un acteur externe pour faire plus de travail, semblable à this example.

+0

cela signifie-t-il que le degré maximal de parallizème est de 3? peut-il être plus? – john

+0

@john Réponse mise à jour pour refléter davantage de capacités asynchrones. –