2017-09-27 5 views
2

Je cours une application Akaf Streams Reactive Kafka qui devrait être fonctionnelle sous forte charge. Après avoir exécuté l'application pendant environ 10 minutes, l'application descend avec un . J'ai essayé de déboguer le vidage de tas et ai trouvé que akka.dispatch.Dispatcher prend ~ 5GB de mémoire. Voici mes fichiers de configuration.Akka Streams Kafka réactif - OutOfMemoryError sous forte charge

Version Akka: 2.4.18

réactive la version Kafka: 2.4.18

1. application.conf:

consumer { 
num-consumers = "2" 
c1 { 
    bootstrap-servers = "localhost:9092" 
    bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1} 
    groupId = "testakkagroup1" 
    subscription-topic = "test" 
    subscription-topic=${?SUBSCRIPTION_TOPIC1} 
    message-type = "UserEventMessage" 
    poll-interval = 100ms 
    poll-timeout = 50ms 
    stop-timeout = 30s 
    close-timeout = 20s 
    commit-timeout = 15s 
    wakeup-timeout = 10s 
    use-dispatcher = "akka.kafka.default-dispatcher" 
    kafka-clients { 
    enable.auto.commit = true 
    } 
} 

2. build.sbt:

java -Xmx6g \ 
-Dcom.sun.management.jmxremote.port=27019 \ 
-Dcom.sun.management.jmxremote.authenticate=false \ 
-Dcom.sun.management.jmxremote.ssl=false \ 
-Djava.rmi.server.hostname=localhost \ 
-Dzookeeper.host=$ZK_HOST \ 
-Dzookeeper.port=$ZK_PORT \ 
-jar ./target/scala-2.11/test-assembly-1.0.jar 

3. Source et Sink acteurs:

class EventStream extends Actor with ActorLogging { 

    implicit val actorSystem = context.system 
    implicit val timeout: Timeout = Timeout(10 seconds) 
    implicit val materializer = ActorMaterializer() 
    val settings = Settings(actorSystem).KafkaConsumers 

    override def receive: Receive = { 
    case StartUserEvent(id) => 
     startStreamConsumer(consumerConfig("EventMessage"+".c"+id)) 
    } 

    def startStreamConsumer(config: Map[String, String]) = { 
    val consumerSource = createConsumerSource(config) 

    val consumerSink = createConsumerSink() 

    val messageProcessor = startMessageProcessor(actorA, actorB, actorC) 

    log.info("Starting The UserEventStream processing") 

    val future = consumerSource.map { message => 
     val m = s"${message.record.value()}" 
     messageProcessor ? m 
    }.runWith(consumerSink) 

    future.onComplete { 
     case _ => actorSystem.stop(messageProcessor) 
    } 
    } 

    def startMessageProcessor(actorA: ActorRef, actorB: ActorRef, actorC: ActorRef) = { 
    actorSystem.actorOf(Props(classOf[MessageProcessor], actorA, actorB, actorC)) 
    } 

    def createConsumerSource(config: Map[String, String]) = { 
    val kafkaMBAddress = config("bootstrap-servers") 
    val groupID = config("groupId") 
    val topicSubscription = config("subscription-topic").split(',').toList 
    println(s"Subscriptiontopics $topicSubscription") 

    val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(kafkaMBAddress) 
     .withGroupId(groupID) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription:_*)) 
    } 

    def createConsumerSink() = { 
    Sink.foreach(println) 
    } 
}  

Dans ce cas actorA, actorB et actorC font une interaction de traitement de la logique métier et la base de données. Y at-il quelque chose qui me manque dans la gestion des consommateurs Akka Reactive Kafka tels que la validation, erreur, ou la configuration d'étranglement? Parce que regarder dans la décharge de tas, je pourrais deviner que les messages s'accumulent.

Répondre

5

Une chose que je changerais est la suivante:

val future = consumerSource.map { message => 
    val m = s"${message.record.value()}" 
    messageProcessor ? m 
}.runWith(consumerSink) 

Dans le code ci-dessus, vous utilisez ask pour envoyer des messages à l'acteur messageProcessor et attendre des réponses, mais pour ask de fonctionner en tant que mécanisme de contre-pression, vous devez l'utiliser avec mapAsync (plus d'informations sont dans le documentation). Quelque chose comme le suivant:

val future = 
    consumerSource 
    .mapAsync(parallelism = 5) { message => 
     val m = s"${message.record.value()}" 
     messageProcessor ? m 
    } 
    .runWith(consumerSink) 

Ajustez le niveau de parallélisme au besoin.

+0

Merci pour la solution. Sauvé ma journée. – Deepakkumar