J'ai l'application pyspark suivante qui génère des séquences de processus enfants/parents à partir d'un CSV d'ID de processus enfant/parent. Considérant le problème en tant qu'arbre, j'utilise une recherche itérative en profondeur à partir des noeuds feuilles (un processus qui n'a pas d'enfants) et itération à travers mon fichier pour créer ces fermetures où le processus 1 est le parent à traiter 2 qui est le parent du processus 3 et ainsi de suite. En d'autres termes, étant donné un csv comme indiqué ci-dessous, est-il possible d'implémenter une recherche en profondeur (itérative ou récursive) en utilisant des pyspark-isms pyspark & pour générer lesdites fermetures sans avoir à utiliser le .collect () fonction (qui est incroyable cher)?Pyspark - Depth-First Recherche sur Dataframe
from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext
class Test():
def __init__(self):
self.process_list = []
def main():
test = Test()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sc.textFile("<path to csv>")
df = df.map(lambda line: line.split(","))
header = df.first()
data = df.filter(lambda row: row != header)
data = data.toDF(header)
data.createOrReplaceTempView("flat")
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat
where doc_parent_pid is not null AND
doc_process_pid is not null")
data = data.select(monotonically_increasing_id().alias("rowId"), "*")
data.createOrReplaceTempView("data")
leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_parent_pid != -1 AND
doc_process_pid == -1")
leaf_df = leaf_df.rdd.collect()
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_process_pid != -1")
data.createOrReplaceTempView("data")
for row in leaf_df:
path = []
rowID = row[0]
data = data.filter(data['rowId'] != rowID)
parentID = row[4]
path.append(parentID)
while (True):
next_df = sqlContext.sql(
"select doc_process_pid, doc_parent_pid from data where
doc_process_pid == " + str(parentID))
next_df_rdd = next_df.collect()
print("parent: ", next_df_rdd[0][1])
parentID = next_df_rdd[0][1]
if (int(parentID) != -1):
path.append(next_df_rdd[0][1])
else:
test.process_list.append(copy.deepcopy(path))
break
print("final: ", test.process_list)
main()
Voici mon csv:
doc_process_pid doc_parent_pid
1 -1
2 1
6 -1
7 6
8 7
9 8
21 -1
22 21
24 -1
25 24
26 25
27 26
28 27
29 28
99 6
107 99
108 -1
109 108
222 109
1000 7
1001 1000
-1 9
-1 22
-1 29
-1 107
-1 1001
-1 222
-1 2
Il représente les relations de processus enfants/parents. Si nous considérons cela comme un arbre, alors les nœuds feuilles sont définis par doc_process_id == -1 et les nœuds racine sont des processus où doc_parent_process == -1.
Le code ci-dessus génère deux trames de données:
nœuds feuilles:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| -1| 9|
| -1| 22|
| -1| 29|
| -1| 107|
| -1| 1001|
| -1| 222|
| -1| 2|
+---------------+--------------+
Les processus enfant/parent restant sans nœuds feuilles:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| 1| -1|
| 2| 1|
| 6| -1|
| 7| 6|
| 8| 7|
| 9| 8|
| 21| -1|
| 22| 21|
| 24| -1|
| 25| 24|
| 26| 25|
| 27| 26|
| 28| 27|
| 29| 28|
| 99| 6|
| 107| 99|
| 108| -1|
| 109| 108|
| 222| 109|
| 1000| 7|
+---------------+--------------+
La sortie serait:
[[1, 2],
[6, 99, 107],
[6, 99, 7, 1000, 1001],
[6, 7, 1000, 8, 9],
[21, 22],
[24, 25, 26, 27, 28, 29],
[108, 109, 222]])
Pensées? Bien que ce soit un peu spécifique, je tiens à souligner la question généralisée d'effectuer des recherches en profondeur en premier pour générer des fermetures de séquences représentées dans ce format DataFrame.
Merci d'avance pour l'aide!
Salut Marie! D'abord, merci beaucoup pour l'aide! C'est génial!!!! Vraie question rapide - qu'est-ce que "psf.array" dans df_end = df.filter ("node = -1"). Drop ('node'). Select (psf.array (* [c pour c en inverse (df.columns) si c! = "noeud"]). alias ("branche")). unionAll ( df_end ) Merci encore pour l'aide !! –
Bonjour Brian. oops my bad J'importe toujours 'pyspark.sql.functions' avec alias' psf'. Je vais modifier la réponse – MaFF
Nm - "psf" est juste un stand pour un pyspark.sql.function.array. MERCI x un million x un million! C'est incroyablement perspicace! –