2016-04-01 3 views
3

Lors de l'écriture d'un fichier sur HDFS à l'aide de Spark, cette opération est assez rapide lorsque vous n'utilisez pas le partitionnement. Au lieu de cela, lorsque j'utilise le partitionnement pour écrire le fichier, le délai d'écriture augmente de facteur ~ 24.Le partitionnement Spark pour l'écriture de fichier est très lent

Pour le même fichier, écrire sans partition prend environ 600ms. Ecrire avec partition par Id (va générer exactement 1.000 partitions, comme il y a 1.000 ids dans le fichier) ça prend environ 14 secondes.

Certains d'entre vous ont-ils la même expérience que l'écriture d'un fichier partitionné prend beaucoup de temps? Quelle est la cause de cette situation, peut-être que Spark doit créer 1.000 dossiers et fichiers pour chaque partition? Savez-vous comment cela peut être accéléré?

val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) } 

val df = myRdd.toDF 

df.write.mode(SaveMode.Append) 
.partitionBy("id") 
.parquet(path) 
+0

Pourriez-vous inclure le code que vous utilisez? – zero323

Répondre

0

exécuteurs Spark communiquer avec HDFS pour écrire les données qu'ils ont, cela dépend de la façon dont vos données sont réparties sur le cluster après le partitionnement.

Apparemment, pour de plus petits morceaux de données, le temps nécessaire pour établir les connexions de plusieurs nœuds exécuteurs à HDFS et écrire serait plus important si on le compare à l'écriture séquentielle du fichier entier.

Comment éviter ceci:

par étincelle par défaut partitionne les données à l'aide partitionneur Hash (tables de hachage de la clé et la clé avec la même hachage va au même noeud) essayez de spécifier le partitionneur Range, s'il vous plaît trouver les extraits d'échantillons ci-dessous:

L'extrait suivant utilise Hash partitionneur yourRdd.groupByKey() saveAsTextFile ("HDFS PATH");.

L'extrait suivant utilise notre gamme personnalisée partitionneur Il crée 8 partitions comme mentionné dans RangePartitioner(8, yourRdd) et de l'écriture à travers 8 connexions serait un meilleur choix alors écrit, par 1000 connexions.

val tunedPartitioner = new RangePartitioner(8, yourRdd) 
val partitioned = yourRdd.partitionBy(tunedPartitioner).saveAsTextFile("HDFS PATH"); 

Encore une fois c'est un compromis entre les données à écrire et le nombre de partitions que vous créez.

+1

L'idée est bonne, mais elle ne fonctionnera pas avec les trames de données. Pouvez-vous montrer un exemple de repartitionnement des données et de l'enregistrer dans le parquet? – alexeipab

+0

@alexeipab Vous ne pouvez pas actuellement partitionner en utilisant un partitionneur personnalisé. La seule chose que vous pouvez faire est de partitionner par colonne en utilisant repartition. Vous pouvez également utiliser myDF.rdd.partitionBy() pour partitionner le RDD sous-jacent à votre base de données. – Vektor88