2017-05-17 2 views
0

J'ai deux RDD avec les mêmes clés et des valeurs différentes. Je demande aux deux d'entre eux la même .partitionBy(partitioner) puis je les rejoindre:Pourquoi le même HashPartitioner appliqué sur deux RDD avec les mêmes clés ne partitionne pas également

val partitioner = new HashPartitioner(partitions = 4) 

val a = spark.sparkContext.makeRDD(Seq(
    (1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E"), (6, "F"), (7, "G"), (8, "H") 
)).partitionBy(partitioner) 

val b = spark.sparkContext.makeRDD(Seq(
    (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f"), (7, "g"), (8, "h") 
)).partitionBy(partitioner) 

println("A:") 
a.foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

println("B:") 
b.foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

println("Join:") 
a.join(b, partitioner) 
    .foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

Je reçois:

A: 
(2,B) (3,C) (4,D) (6,F) (7,G) 
(8,H) (1,A) 
(5,E) 

B: 
(3,c) (7,g) 
(1,a) (5,e) 
(2,b) (6,f) 
(4,d) (8,h) 

Join: 
(6,(F,f)) (1,(A,a)) (2,(B,b)) (5,(E,e)) (4,(D,d)) (8,(H,h)) 
(3,(C,c)) (7,(G,g)) 

La première question est pourquoi les partitions A et B différents et pourquoi joinRDD est différent des deux?

Répondre

1

Le partitionnement est exactement le même dans tous les cas. Le problème est la méthode que vous utilisez. N'oubliez pas que chaque partition est traitée dans un thread distinct. Si vous exécutez ce code plusieurs fois, vous verrez que la sortie est en réalité non déterministe.

Essayez par exemple quelque chose comme ceci:

a.glom.collect.map(_.mkString(" ")).foreach(println) 
(4,D) (8,H) 
(1,A) (5,E) 
(2,B) (6,F) 
(3,C) (7,G) 
b.glom.collect.map(_.mkString(" ")).foreach(println) 
(4,d) (8,h) 
(1,a) (5,e) 
(2,b) (6,f) 
(3,c) (7,g) 
a.join(b).glom.collect.map(_.mkString(" ")).foreach(println) 
(4,(D,d)) (8,(H,h)) 
(1,(A,a)) (5,(E,e)) 
(6,(F,f)) (2,(B,b)) 
(3,(C,c)) (7,(G,g)) 

Notez que l'ordre des valeurs dans chaque partition peut encore être non-déterministe si exécuté dans un contexte non local, mais le contenu de chaque partition sera Je suis le même que ci-dessus.