2017-06-20 3 views
0

Je suis toujours nouveau sur Spark/PySpark et j'ai la question suivante. Je suis une liste imbriquée avec ID's en elle:Spark/PySpark: Regrouper par n'importe quelle liste imbriquée

result = [[411, 44, 61], [42, 33], [1, 100], [44, 42]] 

La chose je essaie de réaliser est que si un élément de sous-liste correspond à un élément dans un autre sous-liste tous les deux devraient être fusionnés. Le résultat devrait ressembler à ceci:

merged_result = [[411, 44, 61, 42, 33, 44, 42], [1,100]] 

La première liste de "résultat" correspond à la quatrième liste. La quatrième liste correspond à la seconde, donc tous les 3 devraient être fusionnés en une seule liste. La troisième liste ne correspond à aucune autre liste, elle reste donc la même.

Je pourrais réaliser ceci en écrivant des boucles avec Python.

result_after_matching = [] 
for i in result: 
    new_list = i 
    for s in result: 
     if any(x in i for x in s): 
      new_list = new_list + s 
    result_after_matching.append(set(new_list)) 

#merged_result = [[411, 44, 61, 42], [42,33,44], [1, 100], [44,42,33,411,61]] 

Comme ce n'est pas la sortie désirée je besoin de répéter la boucle et faire un autre ensemble() manifeste la « merged_result »)

set([[411,44,61,42,33], [42,33,44,411,61],[1,100], [44,42,33,411,61]]) 
-> [[411, 44, 61, 42, 33], [1,100]] 

Comme la liste des listes, et les sous-listes obtient de plus en plus gros au fur et à mesure que de nouvelles données seront reçues, ce ne sera pas la fonction à utiliser.

Quelqu'un peut-il me dire s'il existe une fonction, dans Spark/Pyspark, pour faire correspondre/fusionner/grouper/réduire ces listes imbriquées beaucoup plus facilement et plus rapidement ?!

Merci beaucoup d'avance! MG

Répondre

2

La plupart des solutions basées sur rdd ou sur base de données seront probablement inefficaces. C'est parce que la nature de votre problème nécessite que chaque élément de votre ensemble de données soit comparé à tous les autres éléments potentiellement plusieurs fois. Cela fait en sorte que la distribution du travail à travers un cluster est au mieux inefficace. Peut-être une façon différente de le faire serait de reformuler ceci comme un problème de graphique. Si vous traitez chaque élément d'une liste comme un nœud sur un graphique, et chaque liste comme un sous-graphe, alors les composants connectés d'un graphique parent construit à partir des sous-graphes seront le résultat souhaité. Voici un exemple en utilisant le package NetworkX en python:

import networkx as nx 

result = [[411, 44, 61], [42, 33], [1, 100], [44, 42]] 

g = nx.DiGraph() 
for subgraph in result: 
    g.add_path(subgraph) 

u = g.to_undirected() 
output=[] 
for component in nx.connected_component_subgraphs(u): 
    output.append(component.nodes()) 

print(output) 
# [[33, 42, 411, 44, 61], [1, 100]] 

Cela devrait être assez efficace, mais si vos données est très grand, il sera judicieux d'utiliser un outil d'analyse graphique plus évolutive. Spark a une bibliothèque de traitement graphique appelé GraphX:

https://spark.apache.org/docs/latest/graphx-programming-guide.html

Malheureusement, la mise en œuvre de pyspark est à la traîne un peu. Donc, si vous avez l'intention d'utiliser quelque chose comme ça, vous pourriez être bloqué en utilisant scala-spark ou un cadre différent entièrement pour le moment.

+0

Votre solution fonctionne très vite! Même avec les listes 3k +. La seule chose qui ne fonctionnait pas sur le vrai exemple était cette liste de valeurs uniques (par exemple [57]) où ne figurait pas la sortie, avez-vous des explications ?! La valeur unique signifie que cet ID n'est dans aucune autre liste. Donc, je viens de diviser la liste en deux parties avant d'utiliser votre code et les fusionner à nouveau après. – mgruber

+0

En outre, j'ai essayé d'utiliser GraphX ​​mais comme vous l'avez dit ne fonctionnait pas en Python. D'une certaine manière, je ne peux utiliser que des scripts basés sur "Python" lors de notre distribution. Je vais parler à notre architecte de distribution. – mgruber

1

Je pense que vous pouvez utiliser l'action aggregate de RDD. Ci-dessous, je mets l'exemple d'implémentation dans Scala. S'il vous plaît noter que j'ai utilisé la récursivité, pour le rendre plus lisible, mais pour améliorer les performances, il est bon de réimplémenter ces fonctions.

def overlap(s1: Seq[Int], s2: Seq[Int]): Boolean = 
    s1.exists(e => s2.contains(e)) 

def mergeSeq(s1: Seq[Int], s2: Seq[Int]): Seq[Int] = 
    s1.union(s2).distinct 

def mergeSeqWithSeqSeq(s: Seq[Int], ss: Seq[Seq[Int]]): Seq[Seq[Int]] = ss match { 
    case Nil => Seq(s) 
    case h +: tail => 
     if(overlap(h, s)) mergeSeqWithSeqSeq(mergeSeq(h, s), tail) 
     else h +: mergeSeqWithSeqSeq(s, tail) 
} 

def mergeSeqSeqWithSeqSeq(s1: Seq[Seq[Int]], s2: Seq[Seq[Int]]): Seq[Seq[Int]] = s1 match { 
    case Nil => s2 
    case h +: tail => mergeSeqWithSeqSeq(h, mergeSeqSeqWithSeqSeq(tail, s2)) 
} 

val result = rdd 
    .aggregate(Seq.empty[Seq[Int]]) (
     {case (ss, s) => mergeSeqWithSeqSeq(s, ss)}, 
     {case (s1, s2) => mergeSeqSeqWithSeqSeq(s1, s2)} 
    )