2017-10-10 10 views
0

Je suis nouveau sur akka streaming. Je cours l'exemple ci-dessous de github. Mais les messages à l'acteur "Helloer" ne sont pas reçus et affichés dans la console de sortie.flux akka avec diffusion d'étincelles: les messages ne sont pas remis à l'acteur; obtenir des lettres mortes

StreamingApp.scala

import _root_.akka.actor.{ Actor, Props } 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.akka.{ ActorReceiver, AkkaUtils } 

class Helloer extends ActorReceiver { 
    override def preStart() = { 
    println("") 
    println("=== Helloer is starting up ===") 
    println(s"=== path=${context.self.path} ===") 
    println("") 
    } 
    def receive = { 
    // store() method allows us to store the message so Spark Streaming knows about it 
    // This is the integration point (from Akka's side) between Spark Streaming and Akka 
    case s => store(s) 
    } 
} 

object StreamingApp { 
    def main(args: Array[String]) { 
    // Configuration for a Spark application. 
    // Used to set various Spark parameters as key-value pairs. 
    val driverPort = 7777 
    val driverHost = "localhost" 
    val conf = new SparkConf() 
     .setMaster("local[*]") // run locally with as many threads as CPUs 
     .setAppName("Spark Streaming with Scala and Akka") // name in web UI 
     .set("spark.logConf", "true") 
     .set("spark.driver.port", driverPort.toString) 
     .set("spark.driver.host", driverHost) 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val actorName = "helloer" 

    // This is the integration point (from Spark's side) between Spark Streaming and Akka system 
    // It's expected that the actor we're now instantiating will `store` messages (to close the integration loop) 
    val actorStream = AkkaUtils.createStream[String](ssc, Props[Helloer](), actorName) 

    // describe the computation on the input stream as a series of higher-level transformations 
    actorStream.reduce(_ + " " + _).print() 

    // Custom receiver 
    import pl.japila.spark.streaming.CustomReceiverInputDStream 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    val input: ReceiverInputDStream[String] = ssc.receiverStream[String](CustomReceiverInputDStream(StorageLevel.NONE)) 
    input.print() 

    // Data Ingestion from Kafka 
    import org.apache.spark.streaming.kafka._ 

    // start the streaming context so the data can be processed 
    // and the actor gets started 
    ssc.start() 

    // FIXME wish I knew a better way to handle the asynchrony 
    java.util.concurrent.TimeUnit.SECONDS.sleep(3) 

    import _root_.akka.actor.ActorSystem 
    val actorSystem = ActorSystem("SparkStreamingAkka") 

    val url = s"akka.tcp://[email protected]$driverHost:$driverPort/user/Supervisor0/$actorName" 
    val helloer = actorSystem.actorSelection(url) 
    helloer ! "Hello" 
    helloer ! "from" 
    helloer ! "Spark Streaming" 
    helloer ! "with" 
    helloer ! "Scala" 
    helloer ! "and" 
    helloer ! "Akka" 

    import java.util.concurrent.TimeUnit.MINUTES 
    ssc.awaitTerminationOrTimeout(timeout = MINUTES.toMillis(1)) 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    } 
} 

Le programme en utilisant une implémentation customeReceiverInputDstream. Le ci-dessous est le customreceiver.

customeReceiverInputDstream.scala

package pl.japila.spark.streaming 

import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.storage.StorageLevel 

    case class CustomReceiverInputDStream[T](override val storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 
     def onStart() { 
     println("\nHello from CustomReceiver.START\n") 
     } 

     def onStop() { 
     println("\nHello from CustomReceiver.STOP\n") 
     } 
    } 

Le dessous est la sortie deadletter messages que je reçois.

    . 
        . 
        . 

Hello from CustomReceiver.START 

        . 
        . 
        . 

17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805400 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805600 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805600 
[INFO] [10/10/2017 08:00:05.693] [Executor task launch worker-0] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] 
[INFO] [10/10/2017 08:00:05.696] [Executor task launch worker-0] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552] 
17/10/10 08:00:05 INFO ActorReceiverSupervisor: Supervision tree for receivers initialized at:akka://streaming-actor-system-0/user/Supervisor0 
17/10/10 08:00:05 INFO ReceiverSupervisorImpl: Called receiver 0 onStart 
17/10/10 08:00:05 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped 
17/10/10 08:00:05 INFO ActorReceiverSupervisor: Started receiver worker at:akka://streaming-actor-system-0/user/Supervisor0/helloer 

=== Helloer is starting up === 
=== path=akka://streaming-actor-system-0/user/Supervisor0/helloer === 

17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805800 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805800 
17/10/10 08:00:06 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636806000 
           . 
           . 
           . 
+0

Pourquoi ne pas utiliser « actorOf » au lieu de « actorSelection ». Selon le document Akka: "actorSelection ne recherche jamais que les acteurs existants lorsque les messages sont livrés, c'est-à-dire ne crée pas d'acteurs, ou ne vérifie pas l'existence des acteurs lorsque la sélection est créée." – EmiCareOfCell44

+0

@ EmiCareOfCell44 a essayé d'utiliser actorOf. pas de messages de lettres mortes cette fois. Mais n'a pas obtenu les messages envoyés affichés comme écrit dans le code ci-dessus. – Mahesh

+0

il n'y a pas de println dans la clause case, qu'est-ce que vous attendez d'être imprimé ?? – EmiCareOfCell44

Répondre

0

Ok, je vois. Le problème ici est que l'acteur qui est créé pour agir comme source, le "helloer" est démarré à un différent ActorSystem et ce code essaie de trouver de celui nommé "SparkStreaminAkka" via akka.remote dans un autre ActorSystem donc un akka complet. tcp url est utilisé. Dans ce code, cela ne fonctionne pas, d'autres recherches doivent être effectuées ... Cependant, il n'est pas obligatoire d'utiliser un autre système ActorSystem dans cet exemple. Un travail pourrait être autour de:

import _root_.akka.actor.{Actor, Props} 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} 

class Helloer extends ActorReceiver { 
    override def preStart() = { 
    println("") 
    println("=== Helloer is starting up ===") 
    println(s"=== path=${context.self.path} ===") 
    println("") 
    } 
    def receive = { 
    // store() method allows us to store the message so Spark Streaming knows about it 
    // This is the integration point (from Akka's side) between Spark Streaming and Akka 
    case s => store(s) 
    } 
} 


// Create a common actor system 
object CreateActorSystem { 
    lazy val as = _root_.akka.actor.ActorSystem("ActorSystemSpark") 
} 

object StreamingApp { 
    import StreamingApp._ 

    def main(args: Array[String]) { 
    // Configuration for a Spark application. 
    // Used to set various Spark parameters as key-value pairs. 
    val driverPort = 7777 
    val driverHost = "localhost" 
    val conf = new SparkConf() 
     .setMaster("local[*]") // run locally with as many threads as CPUs 
     .setAppName("Spark Streaming with Scala and Akka") // name in web UI 
     .set("spark.logConf", "true") 
     .set("spark.driver.port", driverPort.toString) 
     .set("spark.driver.host", driverHost) 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val actorName = "helloer" 

    // This is the integration point (from Spark's side) between Spark Streaming and Akka system 
    // It's expected that the actor we're now instantiating will `store` messages (to close the integration loop) 

    // Pass actorsystem as parameter 
    val actorStream = AkkaUtils.createStream[String](ssc, Props[Helloer](), actorName, actorSystemCreator =() => CreateActorSystem.as) 

    // describe the computation on the input stream as a series of higher-level transformations 
    actorStream.reduce(_ + " " + _).print() 

    // Custom receiver 
    import pl.japila.spark.streaming.CustomReceiverInputDStream 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    val input: ReceiverInputDStream[String] = ssc.receiverStream[String](CustomReceiverInputDStream(StorageLevel.NONE)) 
    input.print() 

    // Data Ingestion from Kafka 
    //import org.apache.spark.streaming.kafka._ 

    // start the streaming context so the data can be processed 
    // and the actor gets started 
    ssc.start() 

    // FIXME wish I knew a better way to handle the asynchrony 
    java.util.concurrent.TimeUnit.SECONDS.sleep(3) 

    import _root_.akka.actor.ActorSystem 

    val actorSystem = CreateActorSystem.as 

    //Get the actor from the path. There is no nedd o akka.remote 
    val helloer = actorSystem.actorSelection("/user/Supervisor0/helloer") 

    helloer ! "Hello" 
    helloer ! "from" 
    helloer ! "Spark Streaming" 
    helloer ! "with" 
    helloer ! "Scala" 
    helloer ! "and" 
    helloer ! "Akka" 

    import java.util.concurrent.TimeUnit.MINUTES 
    ssc.awaitTerminationOrTimeout(timeout = MINUTES.toMillis(1)) 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    } 
} 

Cela fonctionne

+0

Merci beaucoup! Votre correction fonctionne.Je vais obtenir plus de connaissances à ce sujet pour venir avec akka.tcp – Mahesh

+0

je suis en mesure d'obtenir la messagerie à distance travaillé en utilisant 'actorOf '. Mais toujours face problème quand utilisé 'actorSelection'https: //stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-wor – Mahesh