2016-09-04 3 views
0

Je tente d'enregistrer un RDD Spark en tant que fichier texte gzippé (ou plusieurs fichiers texte) dans un compartiment S3. Le compartiment S3 est monté sur dbfs. Je suis en train d'enregistrer le fichier en utilisant les éléments suivants:Écriture de Spark RDD en tant que fichier texte dans le compartiment S3

rddDataset.saveAsTextFile("/mnt/mymount/myfolder/") 

Mais en essayant, je l'erreur persiste:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 18.0 failed 4 times, most recent failure: Lost task 32.3 in stage 18.0 (TID 279, ip-10-81-194-225.ec2.internal): java.lang.NullPointerException 

Cependant, je ne vois quelques fichiers écrits dans le seau S3 . J'ai également essayé d'utiliser rddDataset.repartition(1).saveAsTextFile("/mnt/mymount/myfolder/"), comme conseillé here, mais cela s'est terminé dans la même erreur.

Cela semble être similaire à this question, alors peut-être que les erreurs sont dues à des valeurs nulles dans mon RDD? Mais quand j'essaye val newRDD = rddDataset.map(line => line).filter(x => x!= null).filter(x => x!=" ").filter(x => x!="") et essaye de sauver ce RDD, j'obtiens la même erreur. En outre, rddDataset.count() renvoie une erreur similaire. Je crée rddDataset à partir d'une base de données, qui affiche toutes ses lignes très bien. Cependant, je peux reproduire le java.lang.NullPointerException si je convertir mon dataframe d'origine à un RDD:

val testRDD = df.rdd 
testRDD.count() 

> org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 85.0 failed 4 times, most recent failure: Lost task 32.3 in stage 85.0 (TID 1668, ip-10-81-194-241.ec2.internal): java.lang.NullPointerException 

J'ai fourni un des traces de pile ci-dessous:

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1927) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1209) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1154) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1457) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1436) 
Caused by: java.lang.NullPointerException 

Aussi, lorsque j'ouvre les informations onglet pour la scène après l'exécution rddDataset.repartition(200).saveAsTextFile(/mnt/mymount/myfolder) je peux trouver les détails d'erreur:

java.lang.NullPointerException 
at linef9b86491b9da46b9858e22af0cc8257227.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:48) 
at linef9b86491b9da46b9858e22af0cc8257227.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:48) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr35$(Unknown Source) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:46) 
at org.apache.spark.scheduler.Task.run(Task.scala:96) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:235) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
+0

Pouvez-vous publier l'intégralité de la trace de pile pour le NPE? –

Répondre

0

je ne gère pas correctement les valeurs nULL dans un de mes UDFs en pré-traitement de la d à. Concrètement, je devais changer un de mes UDFs de

val converter = (arg: String) => { 
    arg.split("").mkString("_").replace(":","_") 
} 

à

val converter = (arg: String) => { 
    if (arg == null || arg== "") "" else arg.split("").mkString("_").replace(":","_") 
} 

Tout semblait fonctionner après cela.