3

Comment puis-je calculer la médiane du dollar pour une fenêtre de 3 valeurs précédentes?Comment calculer la médiane de roulement dans Pyspark en utilisant Window()?

Données d'entrée

dollars timestampGMT  
25  2017-03-18 11:27:18 
17  2017-03-18 11:27:19 
13  2017-03-18 11:27:20 
27  2017-03-18 11:27:21 
13  2017-03-18 11:27:22 
43  2017-03-18 11:27:23 
12  2017-03-18 11:27:24 

de données de sortie attendues

dollars timestampGMT   rolling_median_dollar 
25  2017-03-18 11:27:18 median(25) 
17  2017-03-18 11:27:19 median(17,25) 
13  2017-03-18 11:27:20 median(13,17,25) 
27  2017-03-18 11:27:21 median(27,13,17) 
13  2017-03-18 11:27:22 median(13,27,13) 
43  2017-03-18 11:27:23 median(43,13,27) 
12  2017-03-18 11:27:24 median(12,43,13) 

Ci-dessous le code ne marche pas, mais le déplacement avg de pyspark ont ​​F.median().

pyspark: rolling average using timeseries data

EDIT 1: Le défi est sortie le carton de la fonction médiane(). Je ne peux pas faire

df = df.withColumn('rolling_average', F.median("dollars").over(w)) 

Si je voulais la moyenne mobile je aurais pu faire

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

EDIT 2: Essayé à l'aide approxQuantile()

windfun = Window().partitionBy().orderBy(F.col(date_column)).rowsBetwe‌​en(-3, 0) sdf.withColumn("movingMedian", sdf.approxQuantile(col='a', probabilities=[0.5], relativeError=0.00001).over(windfun)) 

Mais obtenir erreur

AttributeError: 'list' object has no attribute 'over' 

EDIT 3

Veuillez donner une solution sans Udf car elle ne bénéficiera pas de l'optimisation du catalyseur.

+0

Avez-vous essayé de commander par 'timestampGMT' et faire le calcul sur les lignes par fenêtre? Juste curieux de savoir quel est le problème (et je me demande si la mise en œuvre de la médiane pourrait être le seul). –

+0

a modifié la question pour inclure le problème exact –

+0

Vu 'df.stat.approxQuantile' et https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles .html? –

Répondre

2

Une façon est de recueillir la colonne $dollars comme une liste par la fenêtre, puis calculer la médiane des listes résultantes en utilisant un udf:

from pyspark.sql.window import Window 
from pyspark.sql.functions import * 
import numpy as np 
from pyspark.sql.types import FloatType 

w = (Window.orderBy(col("timestampGMT").cast('long')).rangeBetween(-2, 0)) 
median_udf = udf(lambda x: float(np.median(x)), FloatType()) 

df.withColumn("list", collect_list("dollars").over(w)) \ 
    .withColumn("rolling_median", median_udf("list")).show(truncate = False) 
+-------+---------------------+------------+--------------+ 
|dollars|timestampGMT   |list  |rolling_median| 
+-------+---------------------+------------+--------------+ 
|25  |2017-03-18 11:27:18.0|[25]  |25.0   | 
|17  |2017-03-18 11:27:19.0|[25, 17] |21.0   | 
|13  |2017-03-18 11:27:20.0|[25, 17, 13]|17.0   | 
|27  |2017-03-18 11:27:21.0|[17, 13, 27]|17.0   | 
|13  |2017-03-18 11:27:22.0|[13, 27, 13]|13.0   | 
|43  |2017-03-18 11:27:23.0|[27, 13, 43]|27.0   | 
|12  |2017-03-18 11:27:24.0|[13, 43, 12]|13.0   | 
+-------+---------------------+------------+--------------+ 
+1

Merci. Mais pouvons-nous le faire sans Udf car il ne bénéficiera pas de l'optimisation du catalyseur? –

+0

il n'y a pas d'alternative Spark natif j'ai peur. – mtoto

+0

Qu'en est-il de l'utilisation de percentRank() avec la fonction de fenêtre? J'ai lu quelque part mais le code n'a pas été donné. Est-ce que sonner une cloche? –