0

J'essaie de résoudre un problème de mémoire que je vois dans ma configuration d'étincelle et à ce stade, je suis incapable de conclure sur une analyse concrète de la raison pour laquelle je vois cela. Je vois toujours ce problème en écrivant un dataframe au parquet ou au kafka. Mon dataframe a 5000 lignes. Son schéma estEcriture dans Parquet/Kafka: Exception dans le fil "dag-scheduler-event-loop" java.lang.OutOfMemoryError

root 

    |-- A: string (nullable = true) 
    |-- B: string (nullable = true) 
    |-- C: string (nullable = true) 
    |-- D: array (nullable = true) 
    | |-- element: string (containsNull = true) 
    |-- E: array (nullable = true) 
    | |-- element: string (containsNull = true) 
    |-- F: double (nullable = true) 
    |-- G: array (nullable = true) 
    | |-- element: double (containsNull = true) 
    |-- H: integer (nullable = true) 
    |-- I: double (nullable = true) 
    |-- J: double (nullable = true) 
    |-- K: array (nullable = true) 
    | |-- element: double (containsNull = false) 

De cette colonne G peut avoir une taille de cellule jusqu'à 16MB. La taille totale de mes données est d'environ 10 Go partitionné en 12 partitions. Avant d'écrire, j'essaie de créer 48 partitions en utilisant repartition(), mais le problème est vu même si j'écris sans repartitionner. Au moment de cette exception, j'ai seulement un Dataframe en cache avec une taille d'environ 10Go. Mon pilote a 19 Go de mémoire libre et les 2 exécuteurs ont 8 Go de mémoire libre chacun. La version spark est 2.1.0.cloudera1 et la version scala est 2.11.8.

J'ai les paramètres ci-dessous:

spark.driver.memory  35G 
spark.executor.memory 25G 
spark.executor.instances 2 
spark.executor.cores 3 
spark.driver.maxResultSize  30g 
spark.serializer  org.apache.spark.serializer.KryoSerializer 
spark.kryoserializer.buffer.max 1g 
spark.rdd.compress  true 
spark.rpc.message.maxSize  2046 
spark.yarn.executor.memoryOverhead  4096 

Le retraçage d'exception est

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError 
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) 
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) 
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) 
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) 
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) 
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:991) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:765) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:764) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:764) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1228) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Toute autre idée?

+0

Nous avons eu un problème similaire. L'augmentation de la taille du tas Java l'a résolu. Voir https://stackoverflow.com/q/1565388/5039312 – Marco

Répondre

-1

Nous avons finalement trouvé le problème. Nous avons exécuté la régression logistique de kfold dans scala sur 5000 lignes de données avec k de taille 4. Après la classification, nous avons obtenu 4 données de sortie de test de taille 1250, chacune étant partitionnée par au moins 200 partitions. Donc, nous avions plus de 800 partitions sur 5000 lignes de données. Le code procéderait alors à repartitionner ces données en 48 partitions. Notre système n'a pas pu gérer cette répartition probablement en raison du brassage. Pour résoudre ce problème, nous avons repartitionné chaque dataframe de sortie en un nombre plus petit (au lieu de le faire sur la base de données combinée) et cela a résolu le problème.