2016-07-27 1 views
0

En supposant que j'ai les données suivantes:prendre N valeurs de chaque partition Spark

val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1)) 
val DataSortRDD = sc.parallelize(DataSort,2) 

Et maintenant il y a deux partitions avec:

scala>DataSortRDD.glom().take(2).head 
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4)) 
scala>DataSortRDD.glom().take(2).tail 
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1))) 

On suppose que dans chaque partition les données est déjà trié en utilisant quelque chose comme sortWithinPartitions(col("src").desc,col("rank").desc) (c'est pour un dataframe mais est juste pour illustrer). Ce que je veux est de chaque partition obtenir pour chaque lettre les deux premières valeurs (s'il y a plus de 2 valeurs). Ainsi, dans cet exemple, le résultat dans chaque partition doit être:

scala>HypotheticalRDD.glom().take(2).head 
Array(("a",5),("b",13),("b",2),("c",4)) 
scala>HypotheticalRDD.glom().take(2).tail 
Array(Array(("a",1),("b",15),("c",3),("c",2))) 

Je sais que je dois utiliser la fonction mapPartition mais son dans mon esprit ne sait pas comment puis-je itérer les valeurs de chaque partition et obtenir le premier 2. Un conseil?

Modifier: Plus précisément. Je sais que dans chaque partition les données sont déjà triées par 'lettre' d'abord et après par 'compte'. Donc, mon idée principale est que la fonction d'entrée dans mapPartition devrait itérer à travers la partition et yield les deux premières valeurs de chaque lettre. Et ceci pourrait être fait en vérifiant chaque itération la valeur .next(). Voilà comment je pourrais écrire en python:

def limit_on_sorted(iterator): 
    oldKey = None 
    cnt = 0 
    while True: 
     elem = iterator.next() 
     if not elem: 
      return 
     curKey = elem[0] 
     if curKey == oldKey: 
      cnt +=1 
      if cnt >= 2: 
       yield None 
     else: 
      oldKey = curKey 
      cnt = 0 
     yield elem 

DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None) 
+0

-t-il quelle que soit la le résultat final est _partitioned_? En d'autres termes - si vous obtenez les mêmes résultats mais partitionnés différemment, cela serait-il toujours OK? Le filtrage serait toujours basé sur le partitionnement d'origine comme prévu. –

Répondre

1

En supposant vous ne se soucient pas vraiment au sujet de la répartition du résultat , vous pouvez utiliser mapPartitionsWithIndex pour incorporer l'ID de partition dans la clé par que vous groupBy, alors vous pouvez facilement prendre les deux premiers éléments pour chaque clé:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitionsWithIndex { 
    // add the partition ID into the "key" of every record: 
    case (partitionId, itr) => itr.map { case (k, v) => ((k, partitionId), v) } 
    } 
    .groupByKey() // groups by letter and partition id 
    // take only first two records, and drop partition id 
    .flatMap { case ((k, _), itr) => itr.take(2).toArray.map((k, _)) } 

println(result.collect().toList) 
// prints: 
// List((a,5), (b,15), (b,13), (b,2), (a,1), (c,4), (c,3)) 

Notez que la le résultat final n'est pas partitionné de la même façon (groupByKey change le partitionnement), je suis en supposant que ce n'est pas critique à ce que vous essayez de faire (ce qui, franchement, m'échappe).

EDIT: si vous voulez éviter et brassage effectuer toutes les opérations au sein de chaque partition:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitions(_.toList.groupBy(_._1).mapValues(_.take(2)).values.flatten.iterator, true) 
+0

Merci pour la réponse. Peut-être devrais-je le mentionner dans la question. La raison pour laquelle je veux utiliser 'mapPartition' est parce que je veux éviter le brassage entre les partitions pour des raisons d'efficacité. Dans votre solution avec 'groupByKey', il y a mélange. –

+0

Je vois. Editer ma réponse pour inclure une solution sans mélange (préserve les partitions) –

+0

Votre réponse est correcte. Ma préoccupation concerne le 'groupBy (_._ 1)'. Pourquoi dois-je regrouper quand je sais que les valeurs sont déjà triées par lettre et après compte? J'ai mis à jour ma question pour clarifier mon idée. –