2017-05-10 1 views
1

Je veux mapper un tableau dans tous les 3 éléments, la sortie est de paires [k, v], par exemple:carte et réduire un tableau

input: array(1,2,3,4,5,6,7,8,9,7,12,11) 
output: (1 => 2,3) (4 => 5,6)(7 => 8,9) (7 => 12,11) 

Et je veux aussi réduire ces paires par touches, par exemple, si je veux récupérer les données avec une clé = 7, la sortie sera (7=> 8,9,12,11).

Merci beaucoup.

+0

Cela sonne comme un bon endroit pour simplement itérer pour chaque paire de trois («i + = 3»), et utiliser un multimap pour ajouter des valeurs. Généralement dans un magasin de clé/valeur, vous n'auriez pas de toute façon des valeurs clés en double – Rogue

+5

Avez-vous une question à nous poser? "Je veux" n'est pas une question. –

+1

Quelle est la relation avec Apache Spark? –

Répondre

3

Je pense que ce dont vous avez besoin est suivant

val input = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 7, 12, 11) 
val output = input.toSeq.grouped(3) 
    .map(g => (g.head, g.tail)).toList 
    .groupBy(_._1) 
    .mapValues(l => l.flatMap(_._2)) 

Résultat serait

Carte (4 -> Liste (5, 6), 7 -> Liste (8, 9, 12, 11), 1 -> Liste (2, 3))

+0

modifier bon avec groupped() – BDR

+0

Bonjour, j'ai un autre problème, si le tableau d'entrée est une partition de RDD, ce qui signifie que je dois créer [k, v] paires dans une partition, il ça va? – miaoiao

+0

@miaoiao, je ne suis pas sûr de ce que vous essayez de faire, mais avec RDD ce serait vraiment difficile à réaliser. RDD est comme "Resilient ** Distributed ** Dataset" qui signifie par défaut il n'y a aucune garantie sur la commande et même sur la division entre les noeuds (exemple: vous pourriez avoir RDD 6-éléments qui est divisé 4: 2 entre deux Spark-nœuds et votre opération n'a aucun sens après une telle séparation). Vous devriez probablement agréger des données sur une étape précédente au lieu de la jeter dans une liste uniforme et d'essayer de la grouper à partir de là. – SergGr

1

Essayez ceci:

res0 = list.grouped(3).map {x => (x(0), List(x(1),x(2)))}.toList 
// you must dump your converted data format into your storage eg hdfs. 
// And not the entire thing in the form of array. Transform in form of 
// (key,value) and dump in hdfs. That will save a lot of computation. 

res1 = sc.parallelize(res0) 
res2 = res1.reduceByKey(_++_).collect 

Mais je ne suis pas sûr à quel point cette solution évolutive serait.

EDIT

val res1 = sc.parallelize(arr) 
// (1,2,3,4,5,6,7,8,9,7,12,11) 
val res2 = res1.zipWithIndex.map(x._2/3,List(x._1)) 
// (1,0),(2,1),...(12,10),(11,11) -> (0,1),(0,2),(0,3),(1,4),(1,5),(1,6) 
val res3 = res2.reduceByKey(_++_).map(_._2) 
//(0,List(1,2,3)),(1,List(4,5,6)) -> List(1,2,3),List(4,5,6) 
val res4 = res3.map(x => x match { 
    case x1::xs => (x1,xs) 
}).reduceByKey(_++_) 

//List(1,2,3) - > (1,List(2,3)) -> reduceByKey 
//(1,List(2,3)),(4,List(5,6)),(7,List(8,9,12,11)) 
+0

Merci beaucoup! J'apprécie vraiment votre aide! – miaoiao

+0

Bonjour, j'ai un autre problème, si l'entrée Array est une partition de RDD, ce qui signifie que j'ai besoin de créer des [k, v] paires à l'intérieur d'une partition, c'est ok? – miaoiao

+0

C'est ce que j'ai dit dans la réponse, de pré-traiter le tableau à [K, V] avant. –