2015-07-19 5 views
1

j'ai une trame de données Spark qui ressemble à ceci (horodatage simplifiant et les valeurs de colonne id pour plus de clarté):Suppression de lignes redondantes dans une trame de données Spark avec des données de séries chronologiques

| Timestamp | id |  status | 
-------------------------------- 
|   1 | 1 |  pending | 
|   2 | 2 |  pending | 
|   3 | 1 | in-progress | 
|   4 | 1 | in-progress | 
|   5 | 3 | in-progress | 
|   6 | 1 |  pending | 
|   7 | 4 |  closed | 
|   8 | 1 |  pending | 
|   9 | 1 | in-progress | 

Il est une série chronologique des événements d'état . Ce que je voudrais finir est seulement les lignes représentant un changement d'état. En ce sens, le problème peut être considéré comme un problème de suppression des lignes redondantes - par ex. les entrées aux heures 4 et 8 - toutes deux pour id = 1 - doivent être supprimées car elles ne représentent pas un changement de statut pour un identifiant donné.

Pour l'ensemble au-dessus de lignes, cela donnerait (ordre étant sans importance):

| Timestamp | id |  status | 
-------------------------------- 
|   1 | 1 |  pending | 
|   2 | 2 |  pending | 
|   3 | 1 | in-progress | 
|   5 | 3 | in-progress | 
|   6 | 1 |  pending | 
|   7 | 4 |  closed | 
|   9 | 1 | in-progress | 

plan original était de partitionner par id et de l'état, commande par horodatage, et choisissez la première ligne pour chaque partition - cependant cela donnerait

| Timestamp | id |  status | 
-------------------------------- 
|   1 | 1 |  pending | 
|   2 | 2 |  pending | 
|   3 | 1 | in-progress | 
|   5 | 3 | in-progress | 
|   7 | 4 |  closed | 

c'est-à-dire qu'il perd des changements d'état répétés.

N'importe quel pointeur apprécié, je suis nouveau dans les blocs de données et peut manquer un tour ou deux.

Répondre

1

Utilisation de la fonction de fenêtre lag devrait faire l'affaire

case class Event(timestamp: Int, id: Int, status: String) 

val events = sqlContext.createDataFrame(sc.parallelize(
    Event(1, 1, "pending") :: Event(2, 2, "pending") :: 
    Event(3, 1, "in-progress") :: Event(4, 1, "in-progress") :: 
    Event(5, 3, "in-progress") :: Event(6, 1, "pending") :: 
    Event(7, 4, "closed") :: Event(8, 1, "pending") :: 
    Event(9, 1, "in-progress") :: Nil 
)) 

events.registerTempTable("events") 

val query = """SELECT timestamp, id, status FROM (
    SELECT timestamp, id, status, lag(status) OVER (
     PARTITION BY id ORDER BY timestamp 
    ) AS prev_status FROM events) tmp 
    WHERE prev_status IS NULL OR prev_status != status 
    ORDER BY timestamp, id""" 

sqlContext.sql(query).show 

requête interne

SELECT timestamp, id, status, lag(status) OVER (
    PARTITION BY id ORDER BY timestamp 
) AS prev_status FROM events 

crée le tableau ci-dessous où prev_status est une valeur précédente de status pour une donnée id et commandé par timestamp.

+---------+--+-----------+-----------+ 
|timestamp|id|  status|prev_status| 
+---------+--+-----------+-----------+ 
|  1| 1| pending|  null| 
|  3| 1|in-progress| pending| 
|  4| 1|in-progress|in-progress| 
|  6| 1| pending|in-progress| 
|  8| 1| pending| pending| 
|  9| 1|in-progress| pending| 
|  2| 2| pending|  null| 
|  5| 3|in-progress|  null| 
|  7| 4|  closed|  null| 
+---------+--+-----------+-----------+ 

requête externe

SELECT timestamp, id, status FROM (...) 
WHERE prev_status IS NULL OR prev_status != status 
ORDER BY timestamp, id 

filtres simplement les lignes où prev_status est-NULL (première ligne pour une donnée id) ou prev_status est différente de status (il y a eu un changement d'état entre les horodatages consécutifs). Commande ajoutée juste pour faciliter l'inspection visuelle.

+0

Nice, merci - lag FTW! –