2009-06-17 8 views
8

En tant que in my own answer to my own question, j'ai le cas où je suis en train de traiter un grand nombre d'événements qui arrivent dans une file d'attente. Chaque événement est géré exactement de la même manière et chacun peut même être géré indépendamment de tous les autres événements. Mon programme tire parti de la structure de concurrence de Scala et de nombreux processus impliqués sont modélisés comme suit: Actor s. Comme Actor s traitent leurs messages de manière séquentielle, ils ne sont pas bien adaptés à ce problème particulier (même si mes autres acteurs exécutent des actions qui sont sont). Comme je veux Scala à toute la création de fil « contrôle » (que je suppose est le point de celui-ci ayant un système d'accès concurrentiel en premier lieu), il semble que j'ai 2 choix:Traitement simultané dans Scala

  1. Envoyer les événements à un pool d'événement processeurs, que je contrôle
  2. obtenir mon Actor pour les traiter simultanément par un autre mécanisme

j'aurais pensé que # 1 nie le point d'utiliser le sous-système d'acteurs: combien les acteurs du processeur dois-je créer? étant une question évidente. Ces choses sont soi-disant cachées et résolues par le sous-système.

Ma réponse était de faire ce qui suit:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //I want to be able to handle multiple events at the same time 
     //create a new actor to handle it 
     actor { 
      //processing code here 
      process(x) 
     } 
    } 
    } 
} 

Y at-il une meilleure approche? Est-ce incorrect?

modifier: Une approche est peut-être mieux:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //Pass processing to the underlying ForkJoin framework 
     Scheduler.execute(process(e)) 
    } 
    } 
} 
+0

Bien que les acteurs ne semblent pas soutenir un bassin de travailleurs directement, ce Q a été utile pour exposer cette lacune. Tous les documents disponibles pour moi, ne mentionnez pas cela explicitement. – ePharaoh

Répondre

8

Cela semble être une copie d'une autre question. Donc, je vais dupliquer ma réponse

Les acteurs traitent un message à la fois. Le modèle classique pour traiter plusieurs messages est d'avoir un acteur coordinateur devant un pool d'acteurs consommateurs. Si vous utilisez réagissez, le pool de consommateurs peut être volumineux mais n'utilisera toujours qu'un petit nombre de threads JVM. Voici un exemple où je crée un groupe de 10 consommateurs et un coordinateur à leur disposition.

import scala.actors.Actor 
import scala.actors.Actor._ 

case class Request(sender : Actor, payload : String) 
case class Ready(sender : Actor) 
case class Result(result : String) 
case object Stop 

def consumer(n : Int) = actor { 
    loop { 
    react { 
     case Ready(sender) => 
     sender ! Ready(self) 
     case Request(sender, payload) => 
     println("request to consumer " + n + " with " + payload) 
     // some silly computation so the process takes awhile 
     val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
     sender ! Result(result) 
     println("consumer " + n + " is done processing " + result) 
     case Stop => exit 
    } 
    } 
} 

// a pool of 10 consumers 
val consumers = for (n <- 0 to 10) yield consumer(n) 

val coordinator = actor { 
    loop { 
    react { 
     case msg @ Request(sender, payload) => 
      consumers foreach {_ ! Ready(self)} 
      react { 
       // send the request to the first available consumer 
       case Ready(consumer) => consumer ! msg 
      } 
     case Stop => 
      consumers foreach {_ ! Stop} 
      exit 
    } 
    } 
} 

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop 
for (i <- 0 to 1000) coordinator ! Request(self, i.toString) 

Ce code teste pour voir quel consommateur est disponible et envoie une requête à ce consommateur. Les alternatives consistent à assigner au hasard des consommateurs ou à utiliser un planificateur round robin. En fonction de ce que vous faites, vous pourriez être mieux servi avec Scala Futures. Par exemple, si vous ne avez pas vraiment besoin d'acteurs alors toutes les machines ci-dessus pourrait être écrit comme

import scala.actors.Futures._ 

def transform(payload : String) = {  
    val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
    println("transformed " + payload + " to " + result) 
    result 
} 

val results = for (i <- 0 to 1000) yield future(transform(i.toString)) 
+0

Merci - Je ne savais pas que vous pouviez appeler des tâches sur le même planificateur que celui utilisé par l'infrastructure d'acteur. Je pense que la meilleure approche est donc d'utiliser Scheduler.execute (process (e)) –

+0

Aussi - oui; C'est une question très similaire (que je lie à) mais pas tout à fait la même chose. La première question était "les acteurs sont-ils séquentiels?" alors que la deuxième question était «Comme les acteurs sont séquentiels, comment puis-je faire X» –

+0

Incidemment: «0 à 10» contient 11 éléments, pas 10. –

3

Si les événements peuvent tous être traités indépendamment, pourquoi sont-ils sur une file d'attente? Ne sachant rien d'autre sur votre conception, cela semble être une étape inutile. Si vous pouviez composer la fonction process avec tout ce qui déclenche ces événements, vous pourriez potentiellement éviter la file d'attente.

Un acteur est essentiellement un effet concurrent équipé d'une file d'attente. Si vous voulez traiter plusieurs messages simultanément, vous ne voulez pas vraiment d'acteur. Vous voulez juste qu'une fonction (Any =>()) soit programmée pour être exécutée à un moment opportun. Cela dit, votre approche est raisonnable si vous voulez rester dans la bibliothèque des acteurs et si la file d'attente des événements n'est pas sous votre contrôle.

Scalaz fait une distinction entre les acteurs et les effets concurrents. Alors que son Actor est très léger, scalaz.concurrent.Effect est encore plus léger. Voici votre code traduit grossièrement à la bibliothèque Scalaz:

val eventProcessor = effect (x => process x) 

Ceci est avec la dernière tête de réseau, pas encore publié.

+0

Merci! Ils sont dans une "file d'attente" uniquement parce que je les envoie à un acteur et qu'un acteur a une file d'attente, qu'elle traite séquentiellement. Comme la bibliothèque des acteurs est comment je suis _supposed_ pour gérer la concurrence (*) dans Scala, j'essaie de l'utiliser. Sinon, j'utiliserais simplement ExecutorService.invokeAll. –

+0

Voir aussi mon commentaire à jschen ci-dessus. J'écris du code concurrent en Java depuis longtemps et j'essaie de trouver la bonne limite entre utiliser des acteurs et, euh, ne pas utiliser d'acteurs dans un programme de scala qui devrait être concurrent. –

+1

Les acteurs ne sont pas une panacée, et rien ne dit que vous devez utiliser des acteurs si vous voulez une simultanéité dans Scala. C'est juste une bibliothèque et, à mon avis, trop compliquée. – Apocalisp

1

Cela ressemble à un simple problème de consommateur/producteur. J'utiliserais une file d'attente avec un groupe de consommateurs. Vous pourriez probablement écrire ceci avec quelques lignes de code en utilisant java.util.concurrent.

+0

L'intérêt de la bibliothèque d'acteurs scala est de pouvoir mieux mapper votre code (écrit en utilisant des acteurs) sur la concurrence disponible dans l'environnement d'exploitation actuel. Donc, si Scala pense qu'il a 4 processeurs, peut-être qu'il va créer un pool de threads de soutien pour ses acteurs avec 4 travailleurs. Je ne gagne rien en créant mon propre pool de threads séparé pour exécuter ce travail - tout ce que je vais finir par faire est une charge de changement de contexte inutile. Je suis parfaitement conscient de la façon de résoudre cela en Java - je demande comment le résoudre en utilisant la bibliothèque d'acteurs Scala, d'où les balises. –

+0

Au moins, je suppose que c'est tout l'intérêt de l'utiliser: -/ –

+0

Désolé, je ne savais pas que c'était un exercice académique avec des acteurs. Je pensais que vous vouliez une bonne solution au problème. "Donc, si Scala pense qu'il a 4 processeurs, peut-être qu'il créera un pool de threads de backing pour ses acteurs avec 4 travailleurs." Il s'agit peut-être de deux lignes de code utilisant java.util.concurrent que vous pouvez facilement utiliser à partir de scala. Je l'utilise de jruby tout le temps. – jshen

1

Le but d'un acteur (enfin, l'un d'entre eux) est de s'assurer que l'état dans l'acteur ne peut être accessible que par un seul thread à la fois. Si le traitement d'un message ne dépend pas d'un état mutable au sein de l'acteur, il serait probablement plus approprié de simplement soumettre une tâche à un planificateur ou à un pool de threads à traiter. L'abstraction supplémentaire que fournit l'acteur est en train de vous gêner.

Il existe des méthodes pratiques dans scala.actors.Scheduler pour cela, ou vous pouvez utiliser un Executor de java.util.concurrent.

1

Les acteurs sont beaucoup plus légers que les fils, et en tant que telle une autre option est d'utiliser des objets d'acteur comme des objets Runnable vous avez l'habitude de soumettre à un pool de threads. La principale différence est que vous n'avez pas besoin de vous préoccuper du ThreadPool - le pool de threads est géré pour vous par le framework d'acteur et est principalement un problème de configuration.

def submit(e: MyEvent) = actor { 
    // no loop - the actor exits immediately after processing the first message 
    react { 
    case MyEvent(x) => 
     process(x) 
    } 
} ! e // immediately send the new actor a message 

ensuite de soumettre un message, dire ceci:

submit(new MyEvent(x)) 

, ce qui correspond à

eventProcessor ! new MyEvent(x) 

de votre question.

Testé avec succès ce modèle avec 1 million de messages envoyés et reçus en environ 10 secondes sur un ordinateur portable quad-core i7.

Espérons que cela aide.