2017-08-31 4 views
0

J'ai l'application pyspark suivante qui génère des séquences de processus enfants/parents à partir d'un CSV d'ID de processus enfant/parent. Considérant le problème en tant qu'arbre, j'utilise une recherche itérative en profondeur à partir des noeuds feuilles (un processus qui n'a pas d'enfants) et itération à travers mon fichier pour créer ces fermetures où le processus 1 est le parent à traiter 2 qui est le parent du processus 3 et ainsi de suite. En d'autres termes, étant donné un csv comme indiqué ci-dessous, est-il possible d'implémenter une recherche en profondeur (itérative ou récursive) en utilisant des pyspark-isms pyspark & pour générer lesdites fermetures sans avoir à utiliser le .collect () fonction (qui est incroyable cher)?Pyspark - Depth-First Recherche sur Dataframe

from pyspark.sql.functions import monotonically_increasing_id 
import copy 
from pyspark.sql import SQLContext 
from pyspark import SparkContext 

class Test(): 
    def __init__(self): 
     self.process_list = [] 

def main(): 

    test = Test() 
    sc = SparkContext.getOrCreate() 
    sqlContext = SQLContext(sc) 
    df = sc.textFile("<path to csv>") 
    df = df.map(lambda line: line.split(",")) 
    header = df.first() 
    data = df.filter(lambda row: row != header) 
    data = data.toDF(header) 

    data.createOrReplaceTempView("flat") 
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat 
          where doc_parent_pid is not null AND 
          doc_process_pid is not null") 
    data = data.select(monotonically_increasing_id().alias("rowId"), "*") 

    data.createOrReplaceTempView("data") 
    leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
           where doc_parent_pid != -1 AND 
           doc_process_pid == -1") 
    leaf_df = leaf_df.rdd.collect() 
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
          where doc_process_pid != -1") 
    data.createOrReplaceTempView("data") 

    for row in leaf_df: 
     path = [] 
     rowID = row[0] 
     data = data.filter(data['rowId'] != rowID) 
     parentID = row[4] 
     path.append(parentID) 
     while (True): 
      next_df = sqlContext.sql(
       "select doc_process_pid, doc_parent_pid from data where 
       doc_process_pid == " + str(parentID)) 

      next_df_rdd = next_df.collect() 
      print("parent: ", next_df_rdd[0][1]) 
      parentID = next_df_rdd[0][1] 

      if (int(parentID) != -1): 
       path.append(next_df_rdd[0][1]) 
      else: 
       test.process_list.append(copy.deepcopy(path)) 
       break 

     print("final: ", test.process_list) 


main() 

Voici mon csv:

doc_process_pid doc_parent_pid 
    1    -1 
    2    1 
    6    -1 
    7    6 
    8    7 
    9    8 
    21   -1 
    22   21 
    24   -1 
    25   24 
    26   25 
    27   26 
    28   27 
    29   28 
    99    6 
    107   99 
    108   -1 
    109   108 
    222   109 
    1000   7 
    1001  1000 
    -1   9 
    -1   22 
    -1   29 
    -1   107 
    -1   1001 
    -1   222 
    -1   2 

Il représente les relations de processus enfants/parents. Si nous considérons cela comme un arbre, alors les nœuds feuilles sont définis par doc_process_id == -1 et les nœuds racine sont des processus où doc_parent_process == -1.

Le code ci-dessus génère deux trames de données:

nœuds feuilles:

+---------------+--------------+ 
|doc_process_pid|doc_parent_pid| 
+---------------+--------------+ 
|    -1|    9| 
|    -1|   22| 
|    -1|   29| 
|    -1|   107| 
|    -1|   1001| 
|    -1|   222| 
|    -1|    2| 
+---------------+--------------+ 

Les processus enfant/parent restant sans nœuds feuilles:

+---------------+--------------+ 
|doc_process_pid|doc_parent_pid| 
+---------------+--------------+ 
|    1|   -1| 
|    2|    1| 
|    6|   -1| 
|    7|    6| 
|    8|    7| 
|    9|    8| 
|    21|   -1| 
|    22|   21| 
|    24|   -1| 
|    25|   24| 
|    26|   25| 
|    27|   26| 
|    28|   27| 
|    29|   28| 
|    99|    6| 
|   107|   99| 
|   108|   -1| 
|   109|   108| 
|   222|   109| 
|   1000|    7| 
+---------------+--------------+ 

La sortie serait:

[[1, 2], 
[6, 99, 107], 
[6, 99, 7, 1000, 1001], 
[6, 7, 1000, 8, 9], 
[21, 22], 
[24, 25, 26, 27, 28, 29], 
[108, 109, 222]]) 

Pensées? Bien que ce soit un peu spécifique, je tiens à souligner la question généralisée d'effectuer des recherches en profondeur en premier pour générer des fermetures de séquences représentées dans ce format DataFrame.

Merci d'avance pour l'aide!

Répondre

1

Je ne pense pas que pyspark c'est la meilleure langue pour le faire.

Une solution consisterait à itérer à travers les niveaux de nœud d'arbre rejoignant la trame de données avec elle-même à chaque fois.

Créons notre dataframe, pas besoin de le diviser en feuilles et d'autres noeuds, nous allons simplement garder le dataframe d'origine:

data = spark.createDataFrame(
    sc.parallelize(
     [[1, -1], [2, 1], [6, -1], [7, 6], [8, 7], [9, 8], [21,-1], [22,21], [24,-1], [25,24], [26,25], [27,26], [28,27], 
     [29,28], [99, 6], [107,99], [108,-1], [109,108], [222,109], [1000,7], [1001,1000], [ -1,9], [ -1,22], [ -1,29], 
     [ -1,107], [ -1, 1001], [ -1,222], [ -1,2]] 
    ), 
    ["doc_process_pid", "doc_parent_pid"] 
) 

Nous allons maintenant créer deux dataframes de cet arbre, on notre base de construction et l'autre sera nos briques de construction:

df1 = data.filter("doc_parent_pid = -1").select(data.doc_process_pid.alias("node")) 
df2 = data.select(data.doc_process_pid.alias("son"), data.doc_parent_pid.alias("node")).filter("node != -1") 

Définissons une fonction pour l'étape i de la construction:

def add_node(df, i): 
    return df.filter("node != -1").join(df2, "node", "inner").withColumnRenamed("node", "node" + str(i)).withColumnRenamed("son", "node") 

Définissons notre état initial:

from pyspark.sql.types import * 
df = df1 
i = 0 
df_end = spark.createDataFrame(
    sc.emptyRDD(), 
    StructType([StructField("branch", ArrayType(LongType()), True)]) 
) 

Lorsqu'une branche est entièrement construite, nous prenons de df et le mettre dans df_end:

import pyspark.sql.functions as psf 
while df.count() > 0: 
    i = i + 1 
    df = add_node(df, i) 
    df_end = df.filter("node = -1").drop('node').select(psf.array(*[c for c in reversed(df.columns) if c != "node"]).alias("branch")).unionAll(
     df_end 
    ) 
    df = df.filter("node != -1") 

A la fin, df est vide et nous avons

df_end.show(truncate=False) 
    +------------------------+ 
    |branch     | 
    +------------------------+ 
    |[24, 25, 26, 27, 28, 29]| 
    |[6, 7, 8, 9]   | 
    |[6, 7, 1000, 1001]  | 
    |[108, 109, 222]   | 
    |[6, 99, 107]   | 
    |[21, 22]    | 
    |[1, 2]     | 
    +------------------------+ 

Le pire des cas pour cet algorithme m est autant de jointures que la longueur maximale de la branche.

+0

Salut Marie! D'abord, merci beaucoup pour l'aide! C'est génial!!!! Vraie question rapide - qu'est-ce que "psf.array" dans df_end = df.filter ("node = -1"). Drop ('node'). Select (psf.array (* [c pour c en inverse (df.columns) si c! = "noeud"]). alias ("branche")). unionAll ( df_end ) Merci encore pour l'aide !! –

+0

Bonjour Brian. oops my bad J'importe toujours 'pyspark.sql.functions' avec alias' psf'. Je vais modifier la réponse – MaFF

+0

Nm - "psf" est juste un stand pour un pyspark.sql.function.array. MERCI x un million x un million! C'est incroyablement perspicace! –