2015-04-30 1 views
18

Comme tout le monde sait que les partitions de Spark ont ​​un énorme impact sur les performances de toutes les opérations "larges", elles sont généralement personnalisées dans les opérations. J'expérimentait le code suivant:Dans Apache Spark, pourquoi RDD.union ne conserve pas le partitionneur?

val rdd1 = 
    sc.parallelize(1 to 50).keyBy(_ % 10) 
    .partitionBy(new HashPartitioner(10)) 
val rdd2 = 
    sc.parallelize(200 to 230).keyBy(_ % 13) 

val cogrouped = rdd1.cogroup(rdd2) 
println("cogrouped: " + cogrouped.partitioner) 

val unioned = rdd1.union(rdd2) 
println("union: " + unioned.partitioner) 

Je vois que par défaut cogroup() donne toujours un RDD avec le partitionneur personnalisé, mais ne union() pas, il reviendra toujours à défaut. Ceci est contre-intuitif car nous supposons généralement qu'un PairRDD doit utiliser son premier élément comme clé de partition. Existe-t-il un moyen de "forcer" Spark à fusionner 2 PairRDDs pour utiliser la même clé de partition?

Répondre

33

union est une opération très efficace, car elle ne déplace aucune donnée. Si rdd1 a 10 partitions et rdd2 a 20 partitions alors rdd1.union(rdd2) aura 30 partitions: les partitions des deux RDD mis l'une après l'autre. Ceci est juste un changement de comptabilité, il n'y a pas de mélange.

Mais nécessairement, il rejette le partitionneur. Un partitionneur est construit pour un nombre donné de partitions. Le RDD résultant a un nombre de partitions différent de rdd1 et de rdd2. Après avoir pris l'union, vous pouvez exécuter repartition pour mélanger les données et les organiser par clé.


Il existe une exception à ce qui précède. Si rdd1 et rdd2 ont le même partitionneur (avec le même nombre de partitions), union se comporte différemment. Il joindra les partitions des deux RDD par paires, en lui donnant le même nombre de partitions que chacune des entrées. Cela peut impliquer de déplacer des données (si les partitions n'étaient pas co-localisées) mais n'impliquerait pas de mélange. Dans ce cas, le partitionneur est conservé. (Le code pour cela est dans PartitionerAwareUnionRDD.scala.)

+4

Il existe en fait un RDD union-aware-aware que je pense est censé être utilisé automatiquement dans les cas où le partitionnement pourrait être préservé; Je ne sais pas pourquoi il n'est pas appliqué ici. Voir https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123 et https://github.com/apache/spark /blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala –

+0

Wow, cool! Je n'ai jamais su ça. On dirait que c'est seulement utilisé quand les deux RDD ont le même partitionneur. Je vais ajouter cela à la réponse, merci! –

+0

Merci beaucoup! C'est une optimisation très importante. BTW si ce n'est pas optimal pour tous les cas, je peux toujours écrire une union zip + dans la partition de toute façon – tribbloid