En supposant que j'ai les données suivantes:prendre N valeurs de chaque partition Spark
val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1))
val DataSortRDD = sc.parallelize(DataSort,2)
Et maintenant il y a deux partitions avec:
scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))
On suppose que dans chaque partition les données est déjà trié en utilisant quelque chose comme sortWithinPartitions(col("src").desc,col("rank").desc)
(c'est pour un dataframe mais est juste pour illustrer). Ce que je veux est de chaque partition obtenir pour chaque lettre les deux premières valeurs (s'il y a plus de 2 valeurs). Ainsi, dans cet exemple, le résultat dans chaque partition doit être:
scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))
Je sais que je dois utiliser la fonction mapPartition
mais son dans mon esprit ne sait pas comment puis-je itérer les valeurs de chaque partition et obtenir le premier 2. Un conseil?
Modifier: Plus précisément. Je sais que dans chaque partition les données sont déjà triées par 'lettre' d'abord et après par 'compte'. Donc, mon idée principale est que la fonction d'entrée dans mapPartition
devrait itérer à travers la partition et yield
les deux premières valeurs de chaque lettre. Et ceci pourrait être fait en vérifiant chaque itération la valeur .next()
. Voilà comment je pourrais écrire en python:
def limit_on_sorted(iterator):
oldKey = None
cnt = 0
while True:
elem = iterator.next()
if not elem:
return
curKey = elem[0]
if curKey == oldKey:
cnt +=1
if cnt >= 2:
yield None
else:
oldKey = curKey
cnt = 0
yield elem
DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)
-t-il quelle que soit la le résultat final est _partitioned_? En d'autres termes - si vous obtenez les mêmes résultats mais partitionnés différemment, cela serait-il toujours OK? Le filtrage serait toujours basé sur le partitionnement d'origine comme prévu. –