2017-10-03 2 views
1

J'utilise PySpark pour exécuter un grand nombre de calculs dans un RDD assez large dont chaque aspect de bloc comme ceci:PySpark fenêtre Fonction Comprehension

ID CHK C1 Flag1 V1 V2 C2 Flag2 V3 V4 
341 10 100 TRUE 10 10 150 FALSE 10 14 
341 9 100 TRUE 10 10 150 FALSE 10 14 
341 8 100 TRUE 14 14 150 FALSE 10 14 
341 7 100 TRUE 14 14 150 FALSE 10 14 
341 6 100 TRUE 14 14 150 FALSE 10 14 
341 5 100 TRUE 14 14 150 FALSE 10 14 
341 4 100 TRUE 14 14 150 FALSE 12 14 
341 3 100 TRUE 14 14 150 FALSE 14 14 
341 2 100 TRUE 14 14 150 FALSE 14 14 
341 1 100 TRUE 14 14 150 FALSE 14 14 
341 0 100 TRUE 14 14 150 FALSE 14 14 

J'ai beaucoup d'occurrences de l'ID (cela dépend de C1 valeurs, par exemple de 100 à 130 et ainsi de suite pour beaucoup de C1, pour chaque entier j'ai un ensemble de 11 lignes comme celles ci-dessus) et j'ai beaucoup d'identifiants. Ce que je dois faire est d'appliquer une formule dans le groupe de chaque lignes et ajouter deux colonnes qui calculera:

D1 = ((row.V1 - prev_row.V1)/2)/((row.V2 + prev_row.V2)/2) 
D2 = ((row.V3 - prev_row.V3)/2)/((row.V4 + prev_row.V4)/2) 

Ce que je l'ai fait (comme je l'ai trouvé dans cet article utile: https://arundhaj.com/blog/calculate-difference-with-previous-row-in-pyspark.html) est de définir une fenêtre:

my_window = Window.partitionBy().orderBy(desc("CHK")) 

et pour chaque calcul intermédiaire I créé une colonne "temp":

df = df.withColumn("prev_V1", lag(df.V1).over(my_window)) 
df = df.withColumn("prev_V21", lag(df.TA1).over(my_window)) 
df = df.withColumn("prev_V3", lag(df.SSQ2).over(my_window)) 
df = df.withColumn("prev_V4", lag(df.TA2).over(my_window)) 
df = df.withColumn("Sub_V1", F.when(F.isnull(df.V1 - df.prev_V1), 0).otherwise((df.V1 - df.prev_V1)/2)) 
df = df.withColumn("Sub_V2", (df.V2 + df.prev_V2)/2) 
df = df.withColumn("Sub_V3", F.when(F.isnull(df.V3 - df.prev_V3), 0).otherwise((df.V3 - df.prev_V3)/2)) 
df = df.withColumn("Sub_V4", (df.V4 + df.prev_V4)/2) 
df = df.withColumn("D1", F.when(F.isnull(df.Sub_V1/df.Sub_V2), 0).otherwise(df.Sub_V1/df.Sub_V2)) 
df = df.withColumn("D2", F.when(F.isnull(df.Sub_V3/df.Sub_V4), 0).otherwise(df.Sub_V3/df.Sub_V4)) 

Enfin, je me suis débarrassé des colonnes temporaires:

final_df = df.select(*columns_needed) 

Il a fallu beaucoup de temps et je continué à obtenir:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 

Je sais que je ne fais pas cela correctement comme le bloc de code ci-dessus est à l'intérieur d'un couple de pour les boucles afin de faire les calculs pour tous les ID, c.-à-boucle en utilisant:

unique_IDs = list(df1.toPandas()['ID'].unique()) 

mais après avoir examiné plus sur les fonctions de la fenêtre PySpark je crois que en réglant correctement la fenêtre partitionBy() je pouvais obtenir le même résultat moyen plus facile.

Je jetais un coup d'oeil sur Avoid performance impact of a single partition mode in Spark window functions mais je ne suis toujours pas sûr de savoir comment régler correctement ma partition de fenêtre pour que cela fonctionne. Est-ce que quelqu'un peut me fournir de l'aide ou un aperçu sur la façon dont je pourrais y remédier?

Merci

+1

Ce que je dois faire est d'appliquer une formule dans chaque groupe de lignes - vous voulez dire groupe d'ID et C1? Si oui, alors votre partition devrait être sur ces colonnes. – Suresh

Répondre

0

Je suppose que la formule doit être appliquée à chaque groupe d'identité (c'est la raison pour laquelle j'ai choisi de partition sur « ID »).

Vous pouvez éviter d'utiliser les colonnes « temporaires » avec quelque chose comme ceci:

# used to define the lag of a specific column 
w_lag=Window.partitionBy("id","C1").orderBy(desc('chk')) 


df = df.withColumn('D1',((df.V1-F.lag(df.V1).over(w_lag))/2)\ 
         /((df.V2+F.lag(df.V2).over(w_lag))/2)) 

df = df.withColumn('D2',((df.V3-F.lag(df.V3).over(w_lag))/2)\ 
         /((df.V4+F.lag(df.V4).over(w_lag))/2)) 

Le résultat est:

+---+---+---+-----+---+---+---+-----+---+---+-------------------+-------------------+ 
| id|chk| C1|Flag1| V1| V2| C2|Flag2| V3| V4|     D1|     D2| 
+---+---+---+-----+---+---+---+-----+---+---+-------------------+-------------------+ 
|341| 10|100| true| 10| 10|150| true| 10| 14|    null|    null| 
|341| 9|100| true| 10| 10|150| true| 10| 14|    0.0|    0.0| 
|341| 8|100| true| 14| 14|150| true| 10| 14|0.16666666666666666|    0.0| 
|341| 7|100| true| 14| 14|150| true| 10| 14|    0.0|    0.0| 
|341| 6|100| true| 14| 14|150| true| 10| 14|    0.0|    0.0| 
|341| 5|100| true| 14| 14|150| true| 10| 14|    0.0|    0.0| 
|341| 4|100| true| 14| 14|150| true| 12| 14|    0.0|0.07142857142857142| 
|341| 3|100| true| 14| 14|150| true| 14| 14|    0.0|0.07142857142857142| 
|341| 2|100| true| 14| 14|150| true| 14| 14|    0.0|    0.0| 
|341| 1|100| true| 14| 14|150| true| 14| 14|    0.0|    0.0| 
|341| 0|100| true| 14| 14|150| true| 14| 14|    0.0|    0.0| 
+---+---+---+-----+---+---+---+-----+---+---+-------------------+-------------------+ 
+1

Cela semble proche de ce que j'avais à l'esprit, ma seule préoccupation est de savoir si cela fonctionnerait pour l'ensemble de données entier comme par exemple. le prochain "bloc de données" aura aussi id = 341 et chk = 10 -> 0 mais le C1 sera 110. Penses-tu que faire w_lag = Window.partitionBy ("id", "C1"). orderBy (desc ('chk')) résoudrait cela? Enfin, je pense que vous avez quelque chose d'un peu comme D1 = 0,166666 il aurait dû être: ((14-10)/2)/(14 + 14/2) = 2/14 = 0.1428 – Swan87

+0

Je viens d'éditer ma réponse . En effet: Window.partitionBy ("id", "C1") devrait être mieux pour ce que vous voulez. Comme pour D1 = 0.16666: Je pense que c'est ((14-10)/2)/((14 + 10)/2) = 2/12 = 0.1666 – plalanne

+0

Vous avez raison, ma mauvaise! Merci beaucoup pour la réponse! – Swan87