2017-05-02 1 views
4

J'essaie de tester comment écrire des données dans HDFS 2.7 en utilisant Spark 2.1. Mes données sont une simple séquence de valeurs factices et la sortie doit être partitionnée par les attributs: id et clé.Comment enregistrer un fichier parqueté partitionné dans Spark 2.1?

// Simple case class to cast the data 
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int) 

// Actual data to be stored 
val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1), 
    SimpleTest("test", 12, 13.5.toFloat, 2), 
    SimpleTest("test", 12, 13.5.toFloat, 3), 
    SimpleTest("simple", 12, 13.5.toFloat, 1), 
    SimpleTest("simple", 12, 13.5.toFloat, 2), 
    SimpleTest("simple", 12, 13.5.toFloat, 3) 
) 

// Spark's workflow to distribute, partition and store 
// sc and sql are the SparkContext and SparkSession, respectively 
val testDataP = sc.parallelize(testData, 6) 
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key") 
testDf.write.partitionBy("id", "key").parquet("/path/to/file") 

J'attends pour obtenir la structure arborescente suivante dans HDFS:

- /path/to/file 
    |- /id=test/key=1/part-01.parquet 
    |- /id=test/key=2/part-02.parquet 
    |- /id=test/key=3/part-03.parquet 
    |- /id=simple/key=1/part-04.parquet 
    |- /id=simple/key=2/part-05.parquet 
    |- /id=simple/key=3/part-06.parquet 

Mais quand je lance le code précédent je reçois la sortie suivante:

/path/to/file/id=/key=24/ 
|-/part-01.parquet 
|-/part-02.parquet 
|-/part-03.parquet 
|-/part-04.parquet 
|-/part-05.parquet 
|-/part-06.parquet 

Je ne sais pas s'il y a quelque chose qui ne va pas dans le code, ou est-ce qu'il y a autre chose que fait Spark?

J'exécution spark-submit comme suit:

étincelle soumettre --name APP --master 30G --executor-mémoire --driver-mémoire locale 30G noyaux --executor-8 --num -executors 8 --conf spark.io.compression.codec = lzf --conf spark.akka.frameSize = 1024 --conf spark.driver.maxResultSize = 1g --conf spark.sql.orc.compression.codec = non compressé - -conf spark.sql.parquet.filterPushdown = true --class myClass myFatJar.jar

Répondre

1

J'ai trouvé une solution! Selon Cloudera, est un problème de configuration mapred-site.xml (vérifier le lien ci-dessous). Aussi, au lieu d'écrire le dataframe comme: testDf.write.partitionBy("id", "key").parquet("/path/to/file")

Je l'ai fait comme suit: testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file"). Vous pouvez remplacer <namenode> et <port> par le nom et le port du nom de module HDFS, respectivement.

Un merci spécial à @ jacek-laskowski pour sa précieuse contribution.

Références:

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

Writing to HDFS in Spark/Scala

5

Intéressant depuis ... eh bien ... "ça marche pour moi".

Comme vous décrivez votre ensemble de données à l'aide SimpleTest classe cas Spark 2.1 vous êtes import spark.implicits._ loin d'avoir un dactylographiées Dataset. Dans mon cas, spark est sql.

En d'autres termes, vous ne devez pas créer testDataP et testDf (en utilisant sql.createDataFrame).

import spark.implicits._ 
... 
val testDf = testData.toDS 
testDf.write.partitionBy("id", "key").parquet("/path/to/file") 

Dans un autre terminal (après avoir sauvegardé dans le répertoire /tmp/testDf):

$ tree /tmp/testDf/ 
/tmp/testDf/ 
├── _SUCCESS 
├── id=simple 
│   ├── key=1 
│   │   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
│   ├── key=2 
│   │   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
│   └── key=3 
│    └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
└── id=test 
    ├── key=1 
    │   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
    ├── key=2 
    │   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
    └── key=3 
     └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 

8 directories, 7 files