2014-04-30 5 views
0

Je suis en train de mettre en œuvre un programme simple en Celluloid qui, idéalement, exécutera quelques acteurs en parallèle, chacun d'entre eux calculera quelque chose, puis renverra son résultat à un acteur principal, dont le travail consiste simplement à agréger les résultats.Comment mettre fin à un SupervisionGroup?

Après this FAQ, j'introduit un SupervisionGroup, comme ceci:

module Shuffling   
    class AggregatorActor 
    include Celluloid  

    def initialize(shufflers) 
     @shufflerset = shufflers 
     @results = {}   
    end 

    def add_result(result) 
     @results.merge! result 

     @shufflerset = @shufflerset - result.keys 

     if @shufflerset.empty? 
     self.output 
     self.terminate 
     end 
    end 

    def output 
     puts @results 
    end 
    end 

    class EvalActor 
    include Celluloid 

    def initialize(shufflerClass) 
     @shuffler = shufflerClass.new 
     self.async.runEvaluation 
    end 

    def runEvaluation 
     # computation here, which yields result 
     Celluloid::Actor[:aggregator].async.add_result(result) 
     self.terminate 
    end 
    end 

    class ShufflerSupervisionGroup < Celluloid::SupervisionGroup 
    shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman].to_set 

    supervise AggregatorActor, as: :aggregator, args: [shufflers.map { |sh| sh.new.name }] 

    shufflers.each do |shuffler| 
     supervise EvalActor, as: shuffler.name.to_sym, args: [shuffler] 
    end 
    end 

    ShufflerSupervisionGroup.run 
end 

Je les EvalActor de résilier après leur fait, et je termine même la AggregatorActor lorsque tous les travailleurs se font. Cependant, le thread de supervision reste actif et maintient le thread principal en vie. Le programme ne se termine jamais.

Si j'envoie .run! au groupe, le thread principal se termine juste après, et rien ne fonctionne. Que puis-je faire pour terminer le groupe (ou, dans la terminologie de groupe, finalize, je suppose) après la fin de AggregatorActor?

Répondre

0

Ce que je l'ai fait après tout, est de changer le AggregatorActor d'avoir un wait_for_results:

class AggregatorActor 
    include Celluloid 

    def initialize(shufflers) 
    @shufflerset = shufflers 
    @results = {} 
    end 

    def wait_for_results 
    sleep 5 while not @shufflerset.empty? 

    self.output 
    self.terminate 
    end 

    def add_result(result) 
    @results.merge! result 

    @shufflerset = @shufflerset - result.keys 

    puts "Results for #{result.keys.inspect} recorded, remaining: #{@shufflerset.inspect}" 
    end 

    def output 
    puts @results 
    end 
end 

Et puis je me suis débarrassé de la SupervisionGroup (puisque je ne l'ai pas besoin d'une supervision, à savoir réexécution des acteurs qui ont échoué) et je l'ai utilisé comme ceci:

shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman, RiffleShuffle].to_set 

Celluloid::Actor[:aggregator] = AggregatorActor.new(shufflers.map { |sh| sh.new.name }) 

shufflers.each do |shuffler| 
    Celluloid::Actor[shuffler.name.to_sym] = EvalActor.new shuffler 
end 

Celluloid::Actor[:aggregator].wait_for_results 

qui ne se sent pas très propre, ce serait bien s'il y avait un moyen plus propre, mais au moins cela fonctionne.