2017-02-18 1 views
12

J'essaie de sauver un DataFrame à HDFS au format Parquet en utilisant DataFrameWriter, divisé par trois valeurs de la colonne, comme ceci:Comment partitionner et écrire DataFrame dans Spark sans supprimer les partitions sans nouvelles données?

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

Comme mentionné dans this question, partitionBy supprimera toute la hiérarchie existante des partitions à path et les a remplacés par les partitions au dataFrame. Étant donné que de nouvelles données incrémentielles pour un jour particulier viendront périodiquement, ce que je veux est de remplacer seulement les partitions dans la hiérarchie pour lesquelles dataFrame a des données, laissant les autres intacts.

Pour ce faire, il semble que je dois enregistrer chaque partition individuellement à l'aide de son chemin complet, quelque chose comme ceci:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

Cependant, je vais avoir du mal à comprendre la meilleure façon d'organiser les données en une seule partition DataFrame s afin que je puisse les écrire en utilisant leur chemin complet. Une idée était quelque chose comme:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

Mais foreachPartition fonctionne sur un Iterator[Row] qui n'est pas idéal pour l'écriture en format parquet.

J'ai également envisagé d'utiliser un select...distinct eventdate, hour, processtime pour obtenir la liste des partitions, puis de filtrer le bloc de données d'origine par chacune de ces partitions et d'enregistrer les résultats dans leur chemin d'accès partitionné complet. Mais la requête distincte plus un filtre pour chaque partition ne semble pas très efficace car il y aurait beaucoup d'opérations de filtrage/écriture.

J'espère qu'il y a une manière plus propre de préserver les partitions existantes pour lesquelles dataFrame n'a aucune donnée?

Merci d'avoir lu.

Version Spark: 2.1

Répondre

0

Vous pouvez essayer le mode en annexe.

dataFrame.write.format("parquet") 
.mode("append") 
.partitionBy("year","month") 
.option("path",s"$path/table_name") 
.saveAsTable(s"stg_table_name") 
1

L'option de mode Append a un attrape! J'ai testé et j'ai vu que cela permettrait de conserver les fichiers de partition existants. Cependant, le problème cette fois-ci est le suivant: Si vous exécutez le même code deux fois (avec les mêmes données), il créera de nouveaux fichiers parquet au lieu de remplacer ceux existants pour les mêmes données (Spark 1.6). Donc, au lieu d'utiliser Append, nous pouvons encore résoudre ce problème avec Overwrite. Au lieu d'écraser au niveau de la table, nous devrions remplacer au niveau de la partition.

df.write.mode(SaveMode.Overwrite) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

Voir le lien suivant pour plus d'informations:

Overwrite specific partitions in spark dataframe write method

(je l'ai mis à jour ma réponse après le commentaire de suriyanto Thnx..)

+0

Avez-vous testé si lorsque vous écrivez la même données deux fois qu'il remplace l'ancienne partition? De mon test, il crée un nouveau fichier parquet dans le répertoire de la partition, ce qui provoque le double des données. Je suis sur Spark 2.2. – suriyanto