2017-08-04 7 views
0

J'apprends Akka Acteur récemment. J'ai lu le document des répartiteurs dans Acteur. Je suis curieux au sujet de l'opération de blocage chez un acteur. Le dernier topic dans le document décrit comment résoudre le problème. Et j'essaie de reproduire l'exemple d'expérience dans le document.Opération de blocage dans l'acteur n'occupant pas tous les expéditeurs par défaut

Voici mon code:

package dispatcher 

import akka.actor.{ActorSystem, Props} 
import com.typesafe.config.ConfigFactory 

object Main extends App{ 

    var config = ConfigFactory.parseString(
    """ 
     |my-dispatcher{ 
     |type = Dispatcher 
     | 
     |executor = "fork-join-executor" 
     | 
     |fork-join-executor{ 
     |fixed-pool-size = 32 
     |} 
     |throughput = 1 
     |} 
    """.stripMargin) 

// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf")) 


    val system = ActorSystem("block") 


    val actor1 = system.actorOf(Props(new BlockingFutureActor())) 
    val actor2 = system.actorOf(Props(new PrintActor())) 

    for(i <- 1 to 1000){ 
    actor1 ! i 
    actor2 ! i 
    } 

} 

package dispatcher 

import akka.actor.Actor 

import scala.concurrent.{ExecutionContext, Future} 

class BlockingFutureActor extends Actor{ 
    override def receive: Receive = { 
    case i: Int => 
     Thread.sleep(5000) 
     implicit val excutionContext: ExecutionContext = context.dispatcher 
     Future { 
     Thread.sleep(5000) 
     println(s"Blocking future finished ${i}") 
     } 
    } 
} 
package dispatcher 

import akka.actor.Actor 

class PrintActor extends Actor{ 
    override def receive: Receive = { 
    case i: Int => 
     println(s"PrintActor: ${i}") 
    } 
} 

Je crée simplement un ActorSystem avec les répartiteurs par défaut et tous les acteurs dépends de ceux-ci. Le BlockingFutureActor a une opération de blocage qui est encapsulée dans un Future. Le PrintActor imprime simplement un nombre instantanément.

Dans l'explication du document, les répartiteurs par défaut seront occupés par Future s dans le BlockingFutureActor, ce qui entraîne le blocage des messages du PrintActor. L'application se coince quelque part comme:

> PrintActor: 44 
> PrintActor: 45 

Malheureusement, mon code n'est pas bloqué. Toutes les sorties de PrintActor apparaissent doucement. Mais les sorties de BlockingFutureActor apparaissent comme un dentifrice pressant. J'essaie de surveiller mes informations de fil en Debug Intellij, je suis arrivé: thread monitoring

Vous trouverez peut-être que deux répartiteurs dorment (BlockingFutureActor fait cela arrive). D'autres attendent, ce qui signifie qu'ils sont disponibles pour la livraison de nouveaux messages.

J'ai lu une réponse à propos de l'opération de blocage dans Actor (page). Il est cité que «les répartiteurs sont, en fait, des pools de threads: en séparant les deux garanties, les opérations de blocage lent ne cèdent pas la place à l'autre: cette approche est généralement appelée« bulk-heading »car l'idée est que Si une partie de l'application échoue, le reste reste réactif. "

Est-ce que les répartiteurs par défaut épargnent certains répartiteurs pour les opérations de blocage? De telle sorte que le système peut gérer les messages même s'il y a tellement d'opérations de blocage demandant des répartiteurs.

L'expérience du document Akka peut-elle être reproduite? Y at-il un problème avec ma configuration?

Merci pour vos suggestions. Meilleurs vœux.

+0

« Toutes les sorties de' PrintActor' apparaissent en douceur. » Êtes-vous en train de dire que vous voyez toutes les 1000 instructions 'println' de' PrintActor'? – chunjef

+0

Oui, exactement. 1000 'println' apparaît au moment où l'application démarre. – jiexray

Répondre

2

La raison pour laquelle vous voyez tous les 1000 déclarations d'impression de la PrintActor avant toute déclaration d'impression de la BlockingFutureActor est à cause du premier appel Thread.sleep dans le bloc de receiveBlockingFutureActor. Cette Thread.sleep est la principale différence entre votre code et l'exemple dans la documentation officielle:

override def receive: Receive = { 
    case i: Int => 
    Thread.sleep(5000) // <----- this call is not in the example in the official docs 
    implicit val excutionContext: ExecutionContext = context.dispatcher 
    Future { 
     ... 
    } 
} 

Rappelez-vous que les acteurs traitent un message à la fois. Le Thread.sleep(5000) simule essentiellement un message qui prend au moins cinq secondes à traiter. Le BlockingFutureActor ne traitera pas un autre message tant qu'il n'aura pas traité le message en cours, même s'il contient des centaines de messages dans sa boîte aux lettres. Pendant que le BlockingFutureActor traite le premier message Int de valeur 1, le PrintActor a déjà terminé le traitement des 1000 messages qui lui ont été envoyés.Pour le rendre plus clair, nous allons ajouter une déclaration println:

override def receive: Receive = { 
    case i: Int => 
    println(s"Entering BlockingFutureActor's receive: $i") // <----- 
    Thread.sleep(5000) 
    implicit val excutionContext: ExecutionContext = context.dispatcher 
    Future { 
     ... 
    } 
} 

Un exemple de sortie lorsque nous courons le programme:

Entering BlockingFutureActor's receive: 1 
PrintActor: 1 
PrintActor: 2 
PrintActor: 3 
... 
PrintActor: 1000 
Entering BlockingFutureActor's receive: 2 
Entering BlockingFutureActor's receive: 3 
Blocking future finished 1 
... 

Comme vous pouvez le voir, au moment où le BlockingFutureActor commence réellement à traiter la message 2, le PrintActor a déjà affiché tous les 1000 messages.

Si vous supprimez ce premier Thread.sleep, les messages supprimés de la boîte aux lettres de BlockingFutureActor s'afficheront plus rapidement, car le travail est "délégué" à Future. Une fois le Future créé, l'acteur récupère le message suivant depuis sa boîte aux lettres sans attendre que le Future soit terminé. Ci-dessous un exemple de sortie sans que le premier Thread.sleep (il ne sera pas exactement la même chose à chaque fois que vous l'exécutez):

Entering BlockingFutureActor's receive: 1 
PrintActor: 1 
PrintActor: 2 
... 
PrintActor: 84 
PrintActor: 85 
Entering BlockingFutureActor's receive: 2 
Entering BlockingFutureActor's receive: 3 
Entering BlockingFutureActor's receive: 4 
Entering BlockingFutureActor's receive: 5 
PrintActor: 86 
PrintActor: 87 
... 
+0

Je suis vraiment désolé d'avoir fait un bogue stupide à cause de mon insouciance. Il est vrai que 'Thread.sleep (5000)' devrait être encapsulé dans un 'Future'. Et mon premier 'Thread.sleep (5000)' est un bug stupide. Je vous suis extrêmement reconnaissant pour votre patience et votre aide. Meilleurs vœux. – jiexray