2017-09-11 7 views
0

J'ai cette entrée:timeseries de fenêtre avec étape Spark/Scala

timestamp,user 
1,A 
2,B 
5,C 
9,E 
12,F 

Le résultat souhaité est:

timestampRange,userList 
1 to 2,[A,B] 
3 to 4,[] Or null 
5 to 6,[C] 
7 to 8,[] Or null 
9 to 10,[E] 
11 to 12,[F] 

J'ai essayé d'utiliser Window, mais le problème, il ne comprend pas la plage d'horodatage vide.

Des indices seraient utiles.

Répondre

1

Je ne sais pas si la fonction veuvage couvrira les écarts entre les gammes, mais vous pouvez prendre l'approche suivante:

Définir une trame de données, df_ranges:

val ranges = List((1,2), (3,4), (5,6), (7,8), (9,10)) 
val df_ranges = sc.parallelize(ranges).toDF("start", "end") 
+-----+---+ 
|start|end| 
+-----+---+ 
| 1| 2| 
| 3| 4| 
| 5| 6| 
| 7| 8| 
| 9| 10| 
+-----+---+ 

données avec la colonne d'horodatage, df_data :

val data = List((1,"A"), (2,"B"), (5,"C"), (9,"E")) 
val df_data = sc.parallelize(data).toDF("timestamp", "user") 
+---------+----+ 
|timestamp|user| 
+---------+----+ 
|  1| A| 
|  2| B| 
|  5| C| 
|  9| E| 
+---------+----+ 

joindre les deux dataframe sur les start, end, timestamp colonnes:

df_ranges.join(df_data, df_ranges.col("start").equalTo(df_data.col("timestamp")).or(df_ranges.col("end").equalTo(df_data.col("timestamp"))), "left") 

+-----+---+---------+----+ 
|start|end|timestamp|user| 
+-----+---+---------+----+ 
| 1| 2|  1| A| 
| 1| 2|  2| B| 
| 5| 6|  5| C| 
| 9| 10|  9| E| 
| 3| 4|  null|null| 
| 7| 8|  null|null| 
+-----+---+---------+----+ 

Maintenant, faites une simple agrégation avec collect_list fonction:

res4.groupBy("start", "end").agg(collect_list("user")).orderBy("start") 
+-----+---+------------------+ 
|start|end|collect_list(user)| 
+-----+---+------------------+ 
| 1| 2|   [A, B]| 
| 3| 4|    []| 
| 5| 6|    [C]| 
| 7| 8|    []| 
| 9| 10|    [E]| 
+-----+---+------------------+