2010-07-01 4 views
4

Je travaille actuellement avec deux acteurs de la scène. Un, le producteur , produit des données et l'envoie à un parenter. Le producteur envoie un HashMap[String,HashMap[Object,List[Int]]] par un message (avec cette pour marquer l'expéditeur):Acteurs Boîte aux lettres Dépassement. Scala

parcer ! (this,data) 

L'analyseur attend en permanence des messages comme ceci:

def act(){ 
    loop{ 
     react{ 
     case (producer, data)=> parse(data); 
     } 
    } 
} 

Le programme fonctionne parfaitement dans des conditions normales circonstances. Le problème vient avec de gros volumes de données et de nombreux messages envoyés (le hachage a environ 10^4 éléments, le hachage interne environ 100 éléments et la liste est longue de 100), le programme se bloque. Il ne montre aucune erreur ni exception. Ça s'arrête juste.

Le problème semble être que mon producteur travaille beaucoup plus vite que l'analyseur (et pour le moment je ne veux pas plus d'un parseur). Après la lecture scala mailbox size limit Je me demande si la boîte aux lettres de mon analyseur atteint sa limite. Le poste offre également quelques solutions, mais je dois d'abord m'assurer que c'est le problème. Comment puis-je tester cela?

Existe-t-il un moyen de connaître la limite de mémoire de l'acteur? Qu'en est-il de lire la mémoire utilisée/libre dans la boîte aux lettres?

Toutes les suggestions pour le flux de travail qui n'ont pas été publiées dans that link sont également les bienvenues.

Merci,

+1

Si vous vous souciez de regarder les différentes implémentations, vous devriez jeter un oeil à Akka [1], il a à la fois l'équilibrage de charge [2] et le travail à voler [3] [1]: www.akkasource. org [2]: http://klangism.tumblr.com/post/582112173/akka-message-routing-part-2 [3]: http://doc.akkasource.org/dispatchers –

Répondre

4

abord, vous devez passer pas l'expéditeur explicitement, que l'expéditeur est suivi par les acteurs Scala cadre de toute façon. Vous pouvez toujours accéder à l'expéditeur d'un message en utilisant la méthode sender.

Comme on peut le voir ici: scala.actors.MQueue, la boîte aux lettres d'un acteur est implémentée comme une liste chaînée et n'est donc limitée que par la taille du tas.

Néanmoins, si vous craignez que le producteur soit très rapide et que le consommateur soit très lent, je vous suggère d'explorer un mécanisme d'étranglement. Mais je ne recommanderais pas l'approche de la réponse acceptée à la question scala mailbox size limit.

Essayer d'envoyer des messages de surcharge lorsque le système est fortement sollicité ne semble généralement pas être une bonne idée. Que faire si votre système est trop occupé pour vérifier la surcharge? Que faire si le destinataire du message de surcharge est trop occupé pour agir? De plus, laisser tomber des messages ne me semble pas une très bonne idée. Je pense que vous voulez que tous vos éléments de travail traités de manière fiable.

En outre, je ne compterais pas sur le mailboxSize pour déterminer la charge. Vous ne pouvez pas distinguer différents types de messages et vous ne pouvez vérifier que depuis le consommateur lui-même et non depuis le producteur.

Je suggère d'utiliser une approche où le consommateur demande plus de travail, quand il sait qu'il peut le gérer.

Voici un exemple simple de la façon dont il pourrait être implémenté.

import scala.actors._ 
import Actor._ 

object ConsumerProducer { 
    def main(args: Array[String]) { 
    val producer = new Producer(Iterator.range(0, 10000)) 
    val consumer = new Consumer(producer) 
    } 
} 

case class Produce(count: Int) 
case object Finished 

class Producer[T](source: Iterator[T]) extends Actor { 

    start 

    def act() { 
    loopWhile(source.hasNext) { 
     react { 
     case Produce(n: Int) => produce(n) 
     } 
    } 
    } 

    def produce(n: Int) { 
    println("producing " + n) 
    var remaining = n 
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 } 
    if(!source.hasNext) sender ! Finished 
    } 
} 

class Consumer(producer: Actor) extends Actor { 

    start 

    private var remaining = 0 

    def act() { 
    requestWork() 
    consume() 
    } 

    def consume(): Nothing = react { 
    case Finished => println("Finished") 
    case n: Int => work(n); requestWork(); consume() 
    } 

    def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) } 

    def work(n: Int) = { 
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n }) 
    remaining -= 1 
    } 
} 
+0

Hi Ruediger, Merci pour votre réponse. Donc le mailBox n'est pas implémenté comme je le pensais! BTW, comment accéder à l'expéditeur d'un message? – Skuge

+0

A l'intérieur de votre acteur, vous avez la méthode de l'expéditeur. Il renvoie toujours l'expéditeur du dernier message reçu. Je l'ai utilisé dans la méthode de production de l'acteur producteur dans l'exemple que j'ai donné. –

Questions connexes