2015-12-04 2 views
1

J'utilise boto3 pour lire des fichiers de S3, cela s'est avéré être beaucoup plus rapide que sc.textFile(...). Ces fichiers sont entre 300 Mo et 1 Go environ. Le processus se poursuit comme:PySpark lève java.io.EOFException lors de la lecture de gros fichiers avec boto3

data = sc.parallelize(list_of_files, numSlices=n_partitions) \ 
    .flatMap(read_from_s3_and_split_lines) 

events = data.aggregateByKey(...) 

Lors de l'exécution de ce processus, je reçois l'exception:

15/12/04 10:58:00 WARN TaskSetManager: Lost task 41.3 in stage 0.0 (TID 68, 10.83.25.233): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:203) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139) 
    ... 15 more 

Plusieurs fois, quelques-unes des tâches accident et le travail est en mesure de récupérer. Cependant, parfois l'ensemble du travail se bloque après un certain nombre de ces erreurs. Je n'ai pas pu trouver l'origine de ce problème et semble apparaître et disparaître en fonction du nombre de fichiers lus, des transformations exactes que j'applique ... Il n'échoue jamais lors de la lecture d'un seul fichier.

Répondre

2

J'ai rencontré un problème similaire, mon enquête a montré que le problème était le manque de mémoire libre pour le processus Python. Spark a pris toute la mémoire et le processus Python (où fonctionne PySpark) s'est brisé.

Quelques conseils:

  1. ajouter un peu de mémoire à la machine,
  2. RDD inutiles unpersist,
  3. gérer la mémoire plus sage (ajouter des contraintes sur l'utilisation de la mémoire Spark).