Comme tout le monde sait que les partitions de Spark ont un énorme impact sur les performances de toutes les opérations "larges", elles sont généralement personnalisées dans les opérations. J'expérimentait le code suivant:Dans Apache Spark, pourquoi RDD.union ne conserve pas le partitionneur?
val rdd1 =
sc.parallelize(1 to 50).keyBy(_ % 10)
.partitionBy(new HashPartitioner(10))
val rdd2 =
sc.parallelize(200 to 230).keyBy(_ % 13)
val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)
val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)
Je vois que par défaut cogroup()
donne toujours un RDD avec le partitionneur personnalisé, mais ne union()
pas, il reviendra toujours à défaut. Ceci est contre-intuitif car nous supposons généralement qu'un PairRDD doit utiliser son premier élément comme clé de partition. Existe-t-il un moyen de "forcer" Spark à fusionner 2 PairRDDs pour utiliser la même clé de partition?
Il existe en fait un RDD union-aware-aware que je pense est censé être utilisé automatiquement dans les cas où le partitionnement pourrait être préservé; Je ne sais pas pourquoi il n'est pas appliqué ici. Voir https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123 et https://github.com/apache/spark /blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala –
Wow, cool! Je n'ai jamais su ça. On dirait que c'est seulement utilisé quand les deux RDD ont le même partitionneur. Je vais ajouter cela à la réponse, merci! –
Merci beaucoup! C'est une optimisation très importante. BTW si ce n'est pas optimal pour tous les cas, je peux toujours écrire une union zip + dans la partition de toute façon – tribbloid