2017-09-06 4 views
0

Je voudrais résoudre le problème suivant impliquant des données de série chronologique. Mon Dataset ressemble à ceci:Spark série temporelle Filtre et GroupBy [champs de tableau]

productId |websiteID | price | D |M |Y 
A   |80  | 50,6 |2 |4 |2016 
A   |80  | 51,3 |3 |4 |2016 
A   |789  | 49,6 |2 |4 |2016 
B   |90  | 115,3 |12 |2 |2017 
B   |90  | 113,2 |11 |2 |2017 
B   |250  | 119,6 |12 |2 |2017 
B   |250  | 110,3 |11 |2 |2017 
C   |789  | 80,6 |5 |6 |2015 
C   |789  | 80,6 |6 |6 |2015 
A   |789  | 52,6 |2 |4 |2016 

Chaque produit est vendu par plusieurs sites et j'ai le prix par jour. D'abord, je veux créer un cadre de données où je regroupe tous les prix par produit et site web sous la forme de séries chronologiques. comment créer un attribut List ou Array dans spark?

Je veux ceci:

productId |websiteID | price time series 
A   |80   | [ 50.6,51,3] 
A   |789   | [49.9,52.56] 
B   |90   | [115.3,113.6] 
B   |250   | [119.6,110.3] 
C   |789   | [80.6,80.6 ] 

Une fois la trame de données est créé comme ci-dessus, je veux calculer la distance entre encliean les séries chronologiques de chaque site. Mon dernier dataframe devrait ressembler à celui-ci, la clé est composée de l'ID prodcut et de l'ensemble du site de remorquage Id, et un fichier flottant où je peux stocker la valeur de la distance calculée.

Key    |euclidean distance 
(A,set(80,789)) |1.8867 
(B,set(90,250) |5.4203 
+0

savez-vous comment calculer la distance euclidienne? –

+0

Oui, ce n'est pas le problème. nous pouvons prendre n'importe quelle distance, merci – FiesAtoS

+0

si a et b sont deux vecteurs de la même taille, je calcule la distance comme ça: 'Math.sqrt (une carte zip b (x => Math.pow (x._1-x ._2,2)) sum) ' – FiesAtoS

Répondre

1

Compte tenu de l'entrée dataframe comme

+---------+---------+-----+---+---+----+ 
|productId|websiteID|price|D |M |Y | 
+---------+---------+-----+---+---+----+ 
|A  |80  |50,6 |2 |4 |2016| 
|A  |80  |51,3 |3 |4 |2016| 
|A  |789  |49,6 |2 |4 |2016| 
|B  |90  |115,3|12 |2 |2017| 
|B  |90  |113,2|11 |2 |2017| 
|B  |250  |119,6|12 |2 |2017| 
|B  |250  |110,3|11 |2 |2017| 
|C  |789  |80,6 |5 |6 |2015| 
|C  |789  |80,6 |6 |6 |2015| 
|A  |789  |52,6 |2 |4 |2016| 
+---------+---------+-----+---+---+----+ 

Vous pouvez obtenir le dataframe milieu que vous désirez en utilisant collect_list fonction d'agrégation

import org.apache.spark.sql.functions._ 
df.orderBy("D","M","Y").groupBy("productId","websiteID").agg(collect_list("price").as("price time series")) 

vous devriez avoir

+---------+---------+-----------------+ 
|productId|websiteID|price time series| 
+---------+---------+-----------------+ 
|B  |250  |[119,6, 110,3] | 
|B  |90  |[115,3, 113,2] | 
|A  |789  |[49,6, 52,6]  | 
|C  |789  |[80,6, 80,6]  | 
|A  |80  |[50,6, 51,3]  | 
+---------+---------+-----------------+ 

maintenant le reste des étapes est de calculer euclidean distance

+0

Merci pour votre réponse, cela fonctionne très bien, mais comment prenez-vous en compte les valeurs de temps (D, M, Y), la série de temps de prix devra être triée en fonction de la date – FiesAtoS

+0

vous pouvez user orderBy :) laissez-moi mettre à jour la réponse –

+0

j'ai cette exception: org.apache.spark.sql.AnalysisException: fonction non définie collection_liste; J'ai utilisé spark 1.6.2 – FiesAtoS