2017-01-30 4 views
1

L'acteur suivant effectue une division une fois le numérateur et le dénominateur sont reçus,messages acteur Akka agrègent

package funnelTest 

import akka.actor.{Actor, ActorSystem, Props} 

object Main extends App { 

    val system1 = ActorSystem("funnelTest") 
    val input = system1.actorOf(Props[Funnel], "input") 

    input ! 3 
    input ! 2.718 

} 

case object Run 

class Funnel extends Actor { 

    var i: Option[Int] = None 
    var d: Option[Double] = None 

    def isReady = i.nonEmpty && d.nonEmpty 

    def receive = { 
    case v: Int => i = Some(v) ; if (isReady) self ! Run 
    case v: Double => d = Some(v) ; if (isReady) self ! Run 
    case Run  => println(s"aggregated, $d/$i = " + d.get/i.get) 
    case _   => 
    } 
} 

est-il un moyen plus évolutive pour regrouper tous les messages?

+0

pourquoi pensez-vous que votre implémentation n'est pas extensible? (il peut être légèrement amélioré, remplacez "! Run" par un appel à une nouvelle méthode run() qui fait ce que fait maintenant "case Run"). –

Répondre

1

Un identifiant unique qui identifie la requête est un moyen de résoudre le problème. Une carte() à l'intérieur de l'acteur contient le FractionComponent précédemment arrivé (Numerator ou Denominator). Une fois que la deuxième partie de la paire est arrivée, nous pouvons commencer à exécuter le calcul comme vous l'avez déjà fait.

La mise en œuvre ne résout toujours pas le problème d'une fuite de mémoire où la deuxième paire ne serait pas reçue et la carte continuerait de croître.

import akka.actor.{Actor, ActorSystem, Props} 

object Main extends App { 

    import Funnel._ 

    val system1 = ActorSystem("funnelTest") 
    val input = system1.actorOf(Props[Funnel], "input") 

    (1 to 10) foreach { number => 

    val id = java.util.UUID.randomUUID().toString 
    input ! Numerator(id, value = number + 2) 
    input ! Denominator(id, value = number + 1) 
    } 

    system1.awaitTermination() 

} 

class Funnel extends Actor { 

    import Funnel._ 
    import scala.collection._ 

    val calcRegistry = mutable.Map[String, FractionComponent]() 

    def saveToRegistry(comp: FractionComponent) = calcRegistry(comp.id) = comp 

    def printValue(num: Numerator, den: Denominator) = println(s"aggregated, ${num.value}/${den.value} = ${num.value/den.value}") 

    def receive = { 
    case [email protected](id, _) => 
     if (calcRegistry contains id) 
     self ! Run(num, calcRegistry(id).asInstanceOf[Denominator]) 
     else saveToRegistry(num) 
    case [email protected](id, _) => 
     if (calcRegistry contains id) 
     self ! Run(calcRegistry(id).asInstanceOf[Numerator], den) 
     else saveToRegistry(den) 
    case Run(num: Numerator, den: Denominator) => 
     calcRegistry.remove(num.id) 
     printValue(num, den) 
    case _ => 
    } 
} 

object Funnel { 

    sealed trait FractionComponent { 
    def id: String 
    } 

    case class Numerator(override val id: String, value: Double) extends FractionComponent 

    case class Denominator(override val id: String, value: Integer) extends FractionComponent 

    case class Run(num: Numerator, denominator: Denominator) 

} 

Exemple de sortie:

aggregated, 3.0/2 = 1.5 aggregated, 4.0/3 = 1.3333333333333333 aggregated, 5.0/4 = 1.25 aggregated, 6.0/5 = 1.2 aggregated, 7.0/6 = 1.1666666666666667 aggregated, 8.0/7 = 1.1428571428571428 aggregated, 9.0/8 = 1.125 aggregated, 10.0/9 = 1.1111111111111112 aggregated, 11.0/10 = 1.1 aggregated, 12.0/11 = 1.0909090909090908

Référence: Reactive Messaging Patterns with the Actor Model