3

TL; DR:Spark Application Streaming échoue avec KafkaException: String dépasse la taille maximale ou IllegalArgumentException

Mon Spark très simple streaming d'application échoue dans le pilote avec le "KafkaException: String dépasse la taille maximale". Je vois la même exception à l'exécuteur testamentaire mais je trouve aussi quelque part les journaux de l'exécuteur testamentaire d'un IllegalArgumentException sans autre information qu'il

problème complet:

J'utilise Spark streaming pour lire quelques messages d'un sujet Kafka . Ce est ce que je fais:

val conf = new SparkConf().setAppName("testName") 
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis)) 
val kafkaParams = Map(
     "metadata.broker.list" -> "somevalidaddresshere:9092", 
     "auto.offset.reset" -> "largest" 
    ) 
val topics = Set("data") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     streamingContext, 
     kafkaParams, 
     topics 
    ).map(_._2) // only need the values not the keys 

Ce que je fais avec les données Kafka est l'impression que l'aide:

stream.print() 

Mon application a évidemment plus de code que cela, mais pour localiser mon problème J'ai dépouillé tout ce que je pouvais éventuellement du code

J'essaie d'exécuter ce code sur YARN. Ceci est mon étincelle soumettre ligne:

./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties 

Le fichier streamconfig.properties est juste un fichier de propriétés régulière qui est probablement sans rapport avec le problème ici

Après avoir essayé d'exécuter l'application échoue assez rapidement avec le exception suivante sur le conducteur:

16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767. 
    at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73) 
    at kafka.api.TopicData$.headerSize(FetchResponse.scala:107) 
    at kafka.api.TopicData.<init>(FetchResponse.scala:113) 
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:103) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) 
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Je ne vois même pas mon code dans la trace de la pile

examen l'exécuteur testamentaire, je trouve la même exception que dans le pilote mais aussi enfoui au plus profond est bas l'exception suivante:

16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8) 
java.lang.IllegalArgumentException 
    at java.nio.Buffer.limit(Buffer.java:275) 
    at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) 
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) 
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) 
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Je ne sais pas quelle est la IllegalArgument puisqu'aucune information est incluse

La version Spark mon YARN utilise est 1.6.0. J'ai également vérifié mon pom contient Spark 1.6.0 et pas une version antérieure. Mon champ d'application est "fourni"

Je lis manuellement les données à partir du même sujet et les données sont simplement des JSON simples. Les données ne sont pas énormes du tout. Certainement plus petit que 32767. Aussi je suis capable de lire ces données en utilisant le consommateur régulier de ligne de commande pour c'est bizarre

recherche sur Google cette exception malheureusement n'a pas fourni d'informations utiles

Est-ce que quelqu'un a une idée sur la façon de comprendre quel est exactement le problème ici?

Merci à l'avance

+2

Le sujet de la question ("données") est-il le nom de sujet réel que vous utilisez?Après la trace de la pile dans le code source, c'est la validation de la longueur du sujet qui échoue dans ce cas. – maasg

+0

Non, je l'ai modifié pour la question, le vrai nom de la rubrique est juste une chaîne régulière, la même que celle que j'utilise pour y accéder depuis la ligne de commande – Gideon

+0

Combien de temps cela dure-t-il? – maasg

Répondre

1

Après beaucoup de creuser, je pense avoir trouvé quel était le problème. Je cours Spark sur YARN (1.6.0-cdh5.7.0). Cloudera a le nouveau client Kafka (version 0.9) qui a eu un changement d'inter protocole par rapport aux versions précédentes. Cependant, notre Kafka est de la version 0.8.2.