3

Dans l'un de nos cas d'utilisation en streaming; un de nos logiciels de capteurs envoie 5 messages ~ 20MB JSON/Sec au sujet de Kafka avec 50 partitions, quand le streaming étincelle essaye de lire les messages de Kafka il se bloque avec une exception. Afin de mieux comprendre la situation, nous arrivons à ce que le logiciel du capteur n'envoie qu'un message de 20 Mo/s mais déclenche l'application avec la même erreur. Veuillez me faire savoir s'il me manque quelque chose pour gérer un tel scénario.Spark Streaming se bloque à Kafka Ran hors des messages avant d'atteindre l'exception de fin de fin

Nous avons la configuration suivante

-Kafka 0.9.0 server.properties

message.max.bytes=60000000 
replica.fetch.max.bytes=120000000 

-Spark 1.6.1 Config DirectAPI sur le fil

val kafkaParams = Map[String, String](
     "security.protocol" -> "SASL_PLAINTEXT", 
     "group.id" -> groupid, 
     "metadata.broker.list" -> kafkaBrokerList, 
     "max.partition.fetch.bytes" -> "60000000") 

-Spark Envoyer

spark-submit \ 
--verbose \ 
--master yarn-cluster \ 
--num-executors 3 \ 
--executor-memory 7g \ 
--executor-cores 3 \ 
--conf spark.driver.memory=1024m \ 
--conf spark.streaming.backpressure.enabled=false \ 
--conf spark.streaming.kafka.maxRatePerPartition=3 \ 
--conf spark.streaming.concurrentJobs=3 \ 
--conf spark.speculation=true \ 
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ 
--files kafka_jaas.conf#kafka_jaas.conf,user.headless.keytab#user.headless.keytab \ 
--driver-java-options "-Djava.security.auth.login.config=./kafka_jaas.conf -Dhttp.proxyHost=PROXY_IP -Dhttp.proxyPort=8080 -Dhttps.proxyHost=PROXY_IP -Dhttps.proxyPort=8080 -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-driver.properties" \ 
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-executor.properties" \ 
--class com.spark.demo.StreamProcessor /home/user/demo.jar /tmp/data/out 60 KAFKA_BROKER:6667 "groupid" topic_name 

- Exception:

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, IP_HOST): java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 197 for topic x_topic_3 partition 24 start 196. This should not happen, and indicates that messages may have been lost 
at scala.Predef$.assert(Predef.scala:179) 
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211) 
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335) 
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Driver stacktrace 

:

Répondre

0

ajouter à ("fetch.message.max.bytes" -> "20971520") kafkaParams, vous pouvez trouver le code source dans ConsumerConfig.scala#114 (Spark 1.6.2)

+0

J'utilise DirectAPI qui utilise simple consommateur, pour simple consommateur 'max .partition.fetch.bytes' est le paramètre pour aller chercher le maximum d'octets de message si je suis correct. – nilesh1212

+0

Dans KafkaRDD.scala # fetecBatch (ligne 190), vous pouvez trouver '.addFetch (part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)', allez le 'kc.config.fetchMessageMaxBytes' vous trouverez la * clé * est ** fetch.message.max.bytes **. – klion26

+0

Alors Quelle est la différence entre 'max.partition.fetch.bytes' et 'fetch.message.max.byte' par rapport à DirectAPI – nilesh1212