2017-02-05 1 views
0

Objectif: J'essaie d'exécuter une requête sur un fichier parquet [lire à partir de S3], puis de l'écrire en tant que fichier texte délimité par des tabulations dans un autre compartiment S3. Tout cela est fait dans une application d'étincelle exécutée sur un cluster EMR sur AmazonLe travail Spark donne NullPointerException sur Amazon EMR

J'ai lu les autres questions similaires sur StackOverflow mais en vain.

//Read parquet file 
final Dataset<Row> df = spark.read().parquet(fileName) 
//Register temp table for convinience 
df.registerTempTable(tableName) 

//Valid SQL query string - Verified using data bricks on the same parquet file 
final String queryString = //SQL String 

//Result of running sql query on parquet file 
final Dataset<Row> result = spark.sql(queryString) 

//Check if result is null - result is NOT NULL 

final JavaRDD<Row> rowJavaRDD = result.toJavaRDD() 

//Check if rowJavaRDD is null - rowJavaRDD is NOT NULL 

//Coalesce and write to a text file 
rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath) 

mais je reçois un NPE sur la ligne rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath)

J'ai essayé d'utiliser broadcast aussi bien mais cela n'a rien donné. Essayé sans la coalesce mais toujours obtenir la même erreur.

J'ai vérifié la savePath est valide et il n'y a pas d'autorisations S3 problème

J'ai essayé de faire la même place dans un ./spark-shell en utilisant scala sur le même fichier parquet et il a bien fonctionné:/

Courir étincelle 2.0.2 sur le cluster EMR. [La version 2.1 donne une classCastException sur autre chose - donc la mise à jour est un problème encore plus grand]

Toute aide serait appréciée.

StackTrace est la suivante:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 38.0 failed 4 times, most recent failure: Lost task 
7.3 in stage 38.0 (TID 13586, ip-10-30-1-150.ec2.internal): java.lang.NullPointerException 
     at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:231) 
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1203) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 
     at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
     at org.apache.spark.scheduler.Task.run(Task.scala:86) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
     at scala.Option.foreach(Option.scala:257) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955) 
     at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459) 
     at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
     at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
     at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438) 
     at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:549) 
     at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45) 

Liens connexes

1) NPE saveAsTextFile

2) RDD as textfile

Répondre

0

Comme mentionné dans le premier lien connexe vous vous fourni (NPE saveAsTextFile), le fait que cela arrive quand vous appelez saveAsTex tFile() ne signifie pas que le problème a quelque chose à voir avec cette ligne, puisque toutes les transformations que vous avez faites jusqu'à ce point sont exécutées paresseusement.

Une recherche rapide sur Google de "OnHeapColumnVector NullPointerException" s'est avérée SPARK-16518 pour moi. On dirait que c'est un problème qui a été un problème depuis 2.0.0 et qui n'est toujours pas résolu.

+0

J'utilise également cette même commande sur d'autres fichiers de parquet et cela fonctionne très bien. C'est juste sur ce fichier que je reçois le NPE. J'ai téléchargé le fichier parquet ennuyeux localement et essayé la même commande - et ça a bien fonctionné: // –

0

Il semble que org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt a frappé une colonne null (en raison d'un bogue). Vérifiez si vous trouvez un int32 dans le schéma de votre fichier parquet. Voir une autre version de ceci ici: https://issues.apache.org/jira/browse/HIVE-14294

De même, partagez le schéma du fichier parquet.