2017-10-16 1 views
0

Say, j'ai deux DataFrames Spark avec colonne some_colQuelles propriétés possède Spark DataFrame après la jointure de deux DataFrames avec le même partitionnement?

df_1 = df_1.repartition(50, 'some_col') 
df_2 = df_2.repartition(50, 'some_col') 

df_3 = df_1.join(df_2, on='some_col') 

Je pensais que df_3 devrait également être divisé par some_col et dispose de 50 partitions, mais mes expériences montrent qu'au moins la dernière condition n'est pas vrai. Pourquoi cela arrive-t-il?

Que se passe en termes de chronophages opérations (re-partitionnement ou re-localisation) peut se produire après

df_3 = df_3.repartition(50, 'some_col') 
+0

Pourquoi pensez-vous que 'df_3' devrait avoir 50 partitions? Que dit 'df_3.rdd.getNumPartitions()' juste après la jointure? – Mariusz

+0

J'ai trouvé que 'df_3.rdd.getNumPartitions()' est égal 'spark.default.parallelism'. Cela a l'air étrange. Pourquoi cela arrive-t-il? Il semble beaucoup plus facile de faire la jointure dans les mêmes partitions! – AlexanderLedovsky

+0

Il est toujours égal à 'spark.sql.shuffle.partitions', vous pouvez lire plus ici: https://stackoverflow.com/questions/41359344/why-is-the-number-of-partitions-after-groupby-200 -quoi-est-ce-200-pas-un-autre – Mariusz

Répondre

2

condition que « df_3 doit également être divisée par some_col et dispose de 50 partitions » ne sera true si df_1 et df_2 ont les partitions avec les mêmes valeurs pour "some_col" ie si df_1 a 2 partitions: [(1,2)], [(3,1), (3,7)], (de sorte que les valeurs de some_col sont 1, 3) alors df_2 doit avoir des partitions avec des valeurs de some_col 1,3. Si tel est le cas, en rejoignant df_1 et df_2, il produira df_3 avec le même nombre de partitions que dans df_1 ou df_2.

Dans tous les autres cas, il va essayer de créer 200 partitions par défaut et de mélanger l'ensemble de l'opération de jointure.

pour plus de clarté, vous pouvez essayer exemple suivant:

rdd1 = sc.parallelize([(1,2), (1,9), (2, 3), (3,4)]) 
df1 = rdd1.toDF(['a', 'b']) 
df1 = df1.repartition(3, 'a') 
df1.rdd.glom().collect() #outputs like: 
>> [[Row(a=2,b=3)], [Row(a=3,b=4)], [Row(a=1,b=2), Row(a=1,b=9)]] 

df1.rdd.getNumPartitions() 
>>3 

rdd2 = sc.parallelize([(1,21), (1,91), (2, 31), (3,41)]) 
df2 = rdd2.toDF(['a', 'b']) 
df2 = df2.repartition(3, 'a') 
df2.rdd.glom().collect() #outputs like: 
>> [[Row(a=2,b=31)], [Row(a=3,b=41)], [Row(a=1,b=21), Row(a=1,b=91)]] 

df2.rdd.getNumPartitions() 
>>3 


df3 = df1.join(df2, on='a') 
df3.rdd.glom().collect() #outputs like: 
>> [[Row(a=2,b=3,b=31)], [Row(a=3,b=4,b=41)], [Row(a=1,b=2,b=21), Row(a=1,b=9,b=91)]] 
df21.rdd.getNumPartitions() 
>>3 
+0

Thx! En fait, c'est beaucoup plus clair maintenant. Ai-je raison de dire que si some_col dans df1 et df2 n'a pas les mêmes valeurs (par exemple [1, 3] et [1, 2, 3]) cela provoquera toujours un shuffle? – AlexanderLedovsky