(Spark 2.0.2)Spark: Parquet opérations dataframe échouent lorsque forçant schéma sur
Le problème se lève ici quand vous avez des fichiers de parquet avec différents schémas et la force du schéma lors de la lecture. Même si vous pouvez imprimer le schéma et exécuter show()
correctement, vous ne pouvez pas appliquer de logique de filtrage sur les colonnes manquantes.
Voici les deux exemples schèmes:
// assuming you are running this code in a spark REPL
import spark.implicits._
case class Foo(i: Int)
case class Bar(i: Int, j: Int)
Alors Bar
comprend tous les champs de Foo
et ajoute une (j
). Dans la vie réelle, cela se produit lorsque vous commencez avec le schéma Foo
et plus tard, vous avez décidé que vous aviez besoin de plus de champs et vous retrouviez le schéma Bar
.
Simulons les deux fichiers parquet différents.
// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")
Ce que nous voulons ici est de toujours lire des données en utilisant le schéma plus générique Bar
. En d'autres termes, les lignes écrites sur le schéma Foo
doivent avoir la valeur j
pour être nulles.
Cas 1: Nous avons lu un mélange des deux schémas
spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
| i| j|
+---+----+
| 1| 2|
| 1|null|
+---+----+
spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
| i| j|
+---+---+
| 1| 2|
+---+---+
Cas n ° 2: Nous avons seulement des données Bar
spark.read.parquet("/tmp/bar").show()
+---+---+
| i| j|
+---+---+
| 1| 2|
+---+---+
de cas 3: Nous disposons de données Foo
scala> spark.read.parquet("/tmp/foo").show()
+---+
| i|
+---+
| 1|
+---+
Le cas problématique est 3, où notre schéma résultant est de type Foo
et non de Bar
. Puisque nous migrons vers le schéma Bar
, nous voulons toujours obtenir le schéma Bar
à partir de nos données (anciennes et nouvelles).
La solution suggérée serait de définir le schéma par programme pour toujours être Bar
. Voyons voir comment faire:
val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false))
show running() fonctionne très bien:
scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
| i| j|
+---+----+
| 1|null|
+---+----+
Cependant, si vous essayez de filtrer sur la colonne manquante j, les choses ne.
scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Merci pour l'aide! Peut-être que c'est une régression pour 2.0.2. En tout cas j'ai trouvé l'astuce listée dans ma réponse pour faire l'affaire. – marios