Dans l'un de nos cas d'utilisation en streaming; un de nos logiciels de capteurs envoie 5 messages ~ 20MB JSON/Sec au sujet de Kafka avec 50 partitions, quand le streaming étincelle essaye de lire les messages de Kafka il se bloque avec une exception. Afin de mieux comprendre la situation, nous arrivons à ce que le logiciel du capteur n'envoie qu'un message de 20 Mo/s mais déclenche l'application avec la même erreur. Veuillez me faire savoir s'il me manque quelque chose pour gérer un tel scénario.Spark Streaming se bloque à Kafka Ran hors des messages avant d'atteindre l'exception de fin de fin
Nous avons la configuration suivante
-Kafka 0.9.0 server.properties
message.max.bytes=60000000
replica.fetch.max.bytes=120000000
-Spark 1.6.1 Config DirectAPI sur le fil
val kafkaParams = Map[String, String](
"security.protocol" -> "SASL_PLAINTEXT",
"group.id" -> groupid,
"metadata.broker.list" -> kafkaBrokerList,
"max.partition.fetch.bytes" -> "60000000")
-Spark Envoyer
spark-submit \
--verbose \
--master yarn-cluster \
--num-executors 3 \
--executor-memory 7g \
--executor-cores 3 \
--conf spark.driver.memory=1024m \
--conf spark.streaming.backpressure.enabled=false \
--conf spark.streaming.kafka.maxRatePerPartition=3 \
--conf spark.streaming.concurrentJobs=3 \
--conf spark.speculation=true \
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
--files kafka_jaas.conf#kafka_jaas.conf,user.headless.keytab#user.headless.keytab \
--driver-java-options "-Djava.security.auth.login.config=./kafka_jaas.conf -Dhttp.proxyHost=PROXY_IP -Dhttp.proxyPort=8080 -Dhttps.proxyHost=PROXY_IP -Dhttps.proxyPort=8080 -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-driver.properties" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-executor.properties" \
--class com.spark.demo.StreamProcessor /home/user/demo.jar /tmp/data/out 60 KAFKA_BROKER:6667 "groupid" topic_name
- Exception:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, IP_HOST): java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 197 for topic x_topic_3 partition 24 start 196. This should not happen, and indicates that messages may have been lost
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace
:
J'utilise DirectAPI qui utilise simple consommateur, pour simple consommateur 'max .partition.fetch.bytes' est le paramètre pour aller chercher le maximum d'octets de message si je suis correct. – nilesh1212
Dans KafkaRDD.scala # fetecBatch (ligne 190), vous pouvez trouver '.addFetch (part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)', allez le 'kc.config.fetchMessageMaxBytes' vous trouverez la * clé * est ** fetch.message.max.bytes **. – klion26
Alors Quelle est la différence entre 'max.partition.fetch.bytes' et 'fetch.message.max.byte' par rapport à DirectAPI – nilesh1212