2017-09-06 3 views
1

J'ai un grand nombre de fichiers parquet dans un répertoire qui représente différentes tables du même schéma de données et je veux les fusionner en un seul gros RDD. Idéalement, je voudrais faire une carte réduire où le mappeur émet de petits RDD et le réducteur les fusionne. Cependant, je ne pouvais pas comprendre comment émettre des RDD dans un mappeur. Des idées?Comment assembler plusieurs fichiers parquet à l'aide de Spark Map Reduce?

La première ligne ci-dessous génère la liste des fichiers dans le répertoire et la deuxième ligne doit générer le RDD complet. Cependant, il donne une erreur impossible à sérialiser car je ne pense pas que vous pouvez créer un RDD dans une instance de carte.

arr = map(lambda x: ["/mnt/s3/rds/27jul2017-parquet/%s-%s-%s.parquet" % (x[0], x[1], x[2]), x[1].zfill(10), x[2].zfill(10)], map(lambda x: x.name.split('.')[0].split('-'), dbutils.fs.ls('/mnt/s3/rds/27jul2017-parquet/'))) 
result = sorted(arr, key=lambda x: x[1])  
sc.parallelize(arr).map(lambda x: (1, spark.read.parquet(x[0]))).reduceByKey(lambda x,y: x.unionAll(y)) 
+0

https://stackoverflow.com/a/37257709/647053 –

+1

Copie possible de [Lecture de fichiers parquet de multi répertoires pleins dans Pyspark] (https://stackoverflow.com/questions/37257111/reading-parquet-files-from-multiple-directories-in-pyspark) –

+0

regardez aussi https://stackoverflow.com/questions/37257111/ read-parquet-files-from-multiple-directories-in-pyspark –

Répondre

1

Au lieu de spécifier un fichier dans spark.read.parquet spécifiez le répertoire, vous obtiendrez une trame de données (pas un RDD) contenant toutes les données:

df = spark.read.parquet("/mnt/s3/rds/27jul2017-parquet/") 

map parcourt les lignes de votre RDD pour opérer des changements, il ne peut pas charger de fichiers si vous pouviez vous retrouver avec un RDD dont les lignes sont des dataframes ...