3

Je suis conscient que depuis Akka 2.4.16, il n'y a pas d'implémentation "à distance" de Reactive Streams. La spécification porte sur un flux exécuté sur une seule machine virtuelle Java. Cependant, en considérant le cas d'utilisation pour impliquer une autre JVM pour un traitement tout en maintenant la contre-pression. L'idée est d'avoir une application principale qui fournit une interface utilisateur exécutant un flux. Par exemple, ce flux a une étape effectuant des calculs lourds qui devraient fonctionner sur une machine différente. Je suis intéressé par les moyens d'exécuter les flux de manière distribuée - je suis tombé sur quelques articles montrant quelques idées:Façons de maintenir la contre-pression dans les flux Akka impliquant plusieurs JVMs

Quelles autres solutions sont là? Y at-il des inconvénients significatifs à ce qui précède? Des caractéristiques spéciales à prendre en compte?

Mise à jour: Cette question n'est pas limitée à un seul cas d'utilisation. Je suis généralement intéressé par tous les moyens possibles de travailler avec des flux dans un environnement distribué. Cela signifie, par exemple. il peut impliquer un seul flux qui intègre les acteurs avec .mapAsync ou par ex. il pourrait y avoir deux flux distincts sur deux machines communiquant via Akka HTTP. La seule exigence est que la contre-pression doit être appliquée entre tous les composants.

+1

Je pense que vous ne comprenez pas quelque chose. Alors ... comment pouvez-vous avoir un flux inter-jvm? Eh bien ... en ayant des composants qui résident réellement dans différents jvm. Maintenant, vous devez comprendre que les composants dans ce cas particulier seront les acteurs. Alors ... vous avez juste besoin de créer un FlowShape/Sink/Source avec un 'acteur à distance 'et Artery s'occupera de la messagerie. –

+0

Je suis totalement d'accord avec votre commentaire - selon le blog, Artery maintient la pression inverse lorsque ces deux acteurs communiquent entre eux. Ma question vise plutôt à comprendre si, par ex. l'utilisation de '.mapAsync' pour intégrer des acteurs distants dans un flux a le même résultat: avoir un flux qui traite quelque chose sur une machine différente. Plus généralement demandé: Quels sont les moyens de mettre en œuvre des cours d'eau traversant les frontières de la JVM? – Toaditoad

Répondre

1

Eh bien ... Il me semble que je vais devoir ajouter un exemple pour ça. Une chose que vous devez comprendre est que BackPressure est géré par AsyncBoundries dans GraphStages. Cela n'a vraiment rien à voir avec un composant existant ailleurs. Aussi ... Il ne dépend pas de l'artère qui n'est rien d'autre que le nouveau transport à distance.

Voici un exemple de probablement le flux cross-jvm simple,

Première application,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 

class MyActor extends Actor with ActorLogging { 
    override def receive: Receive = { 
    case msg @ _ => { 
     log.info(msg.toString) 
     sender() ! msg 
    } 
    } 
} 

object MyApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="127.0.0.1" 
     |  port=18000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("my-actor-system", config) 

    var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor") 

} 

et deuxième application ... en fait "runs" le flux qui utilise l'acteur dans la première demande .

import akka.actor.{ActorPath, ActorSystem} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} 
import akka.pattern.ask 
import com.typesafe.config.ConfigFactory 

import scala.language.postfixOps 
import scala.concurrent.duration._ 

object YourApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="127.0.0.1" 
     |  port=19000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("your-actor-system", config) 

    import actorSystem.dispatcher 

    val logger = actorSystem.log 

    implicit val implicitActorSystem = actorSystem 
    implicit val actorMaterializer = ActorMaterializer() 

    val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor") 

    val myActorSelection = actorSystem.actorSelection(myActorPath) 

    val source = Source(1 to 10) 

    // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage 
    val myRemoteComponent = Flow[Int].mapAsync(2)(i => { 
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
     (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int]) 
    ) 
    }) 

    val sink = Sink.foreach[Int](i => logger.info(i.toString)) 

    val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right) 

    val streamRun = stream.run() 

} 
+0

Merci pour cet exemple. Comme je l'ai dit dans ma question, je connais l'approche utilisant une étape avec '.mapAsync'. Mais je m'interroge sur d'autres alternatives à cela. Par exemple, il serait possible de convertir votre exemple en quelque chose ayant des flux connectés via TCP par Akka HTTP. Je suis en train de mettre à jour ma question pour la rendre plus claire ... – Toaditoad

+0

@Toaditoad S'il vous plaît ne prenez pas cette question autrement - Êtes-vous nouveau à Akka et Scala?Je n'ai franchement pas compris le sens de 'avoir des flux connectés via TCP par Akka HTTP' parce que cela n'a aucun sens dans ma compréhension combinée de TCP + Akka + Http + Akka Streams + Akka HTTP. –

+0

Oui, je suis assez nouveau dans ce monde. J'étudie des conceptions possibles pour le traitement des images vidéo avec des acteurs et des flux. Par conséquent, je ne cherche pas une seule solution de travail mais je veux essayer quelques alternatives. Je n'ai pas encore commencé avec l'idée TCP/Akka HTTP mais avez-vous vu la réponse de Konrad Malawski (http://stackoverflow.com/a/30693174/4169741)? Il est l'un des gourous Akka avec Lightbend et sa réponse me semble assez claire. – Toaditoad