2017-09-08 2 views
2
lu

(Spark 2.0.2)Spark: Parquet opérations dataframe échouent lorsque forçant schéma sur

Le problème se lève ici quand vous avez des fichiers de parquet avec différents schémas et la force du schéma lors de la lecture. Même si vous pouvez imprimer le schéma et exécuter show() correctement, vous ne pouvez pas appliquer de logique de filtrage sur les colonnes manquantes.

Voici les deux exemples schèmes:

// assuming you are running this code in a spark REPL 
import spark.implicits._ 

case class Foo(i: Int) 
case class Bar(i: Int, j: Int) 

Alors Bar comprend tous les champs de Foo et ajoute une (j). Dans la vie réelle, cela se produit lorsque vous commencez avec le schéma Foo et plus tard, vous avez décidé que vous aviez besoin de plus de champs et vous retrouviez le schéma Bar.

Simulons les deux fichiers parquet différents.

// assuming you are on a Mac or Linux OS 
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo") 
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar") 

Ce que nous voulons ici est de toujours lire des données en utilisant le schéma plus générique Bar. En d'autres termes, les lignes écrites sur le schéma Foo doivent avoir la valeur j pour être nulles.

Cas 1: Nous avons lu un mélange des deux schémas

spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show() 
+---+----+ 
| i| j| 
+---+----+ 
| 1| 2| 
| 1|null| 
+---+----+ 


spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show() 
+---+---+ 
| i| j| 
+---+---+ 
| 1| 2| 
+---+---+ 

Cas n ° 2: Nous avons seulement des données Bar

spark.read.parquet("/tmp/bar").show() 
+---+---+ 
| i| j| 
+---+---+ 
| 1| 2| 
+---+---+ 

de cas 3: Nous disposons de données Foo

scala> spark.read.parquet("/tmp/foo").show() 
+---+ 
| i| 
+---+ 
| 1| 
+---+ 

Le cas problématique est 3, où notre schéma résultant est de type Foo et non de Bar. Puisque nous migrons vers le schéma Bar, nous voulons toujours obtenir le schéma Bar à partir de nos données (anciennes et nouvelles).

La solution suggérée serait de définir le schéma par programme pour toujours être Bar. Voyons voir comment faire:

val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema 
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false)) 

show running() fonctionne très bien:

scala> spark.read.schema(barSchema).parquet("/tmp/foo").show() 
+---+----+ 
| i| j| 
+---+----+ 
| 1|null| 
+---+----+ 

Cependant, si vous essayez de filtrer sur la colonne manquante j, les choses ne.

scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show() 
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481) 
java.lang.IllegalArgumentException: Column [j] was not found in schema! 
    at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) 
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181) 
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169) 
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151) 
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91) 
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58) 
    at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) 
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63) 
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) 
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) 
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) 
    at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) 
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) 
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Répondre

0

Sur Spark 1.6 a bien fonctionné, la récupération de schéma a été modifié, HiveContext a été utilisé:

val barSchema = ScalaReflection.schemaFor[Bar].dataType.asInstanceOf[StructType] 
println(s"barSchema: $barSchema") 
hiveContext.read.schema(barSchema).parquet("tmp/foo").filter($"j".isNotNull).show() 

Le résultat est:

barSchema: StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false)) 
+---+----+ 
| i| j| 
+---+----+ 
| 1|null| 
+---+----+ 
+0

Merci pour l'aide! Peut-être que c'est une régression pour 2.0.2. En tout cas j'ai trouvé l'astuce listée dans ma réponse pour faire l'affaire. – marios

0

Ce qui a fonctionné pour moi est d'utiliser l'API createDataFrame avec RDD[Row] et le nouveau schéma (dont au moins les nouvelles colonnes sont nullables).

// Make the columns nullable (probably you don't need to make them all nullable) 
val barSchemaNullable = org.apache.spark.sql.types.StructType(
    barSchema.map(_.copy(nullable = true)).toArray) 

// We create the df (but this is not what you want to use, since it still has the same issue) 
val df = spark.read.schema(barSchemaNullable).parquet("/tmp/foo") 

// Here is the final API that give a working DataFrame 
val fixedDf = spark.createDataFrame(df.rdd, barSchemaNullable) 

fixedDf.filter($"j".isNotNull).show() 

+---+---+ 
| i| j| 
+---+---+ 
+---+---+