20

Je me demande comment je peux obtenir ce qui suit dans Spark (Pyspark)Spark Ajouter une nouvelle colonne à dataframe avec une valeur de ligne précédente

initiale dataframe:

+--+---+ 
|id|num| 
+--+---+ 
|4 |9.0| 
+--+---+ 
|3 |7.0| 
+--+---+ 
|2 |3.0| 
+--+---+ 
|1 |5.0| 
+--+---+ 

résultant dataframe:

+--+---+-------+ 
|id|num|new_Col| 
+--+---+-------+ 
|4 |9.0| 7.0 | 
+--+---+-------+ 
|3 |7.0| 3.0 | 
+--+---+-------+ 
|2 |3.0| 5.0 | 
+--+---+-------+ 

J'arrive généralement à "ajouter" de nouvelles colonnes à une base de données en utilisant quelque chose comme: df.withColumn("new_Col", df.num * 10)

Cependant, je n'ai aucune idée sur la façon dont je peux réaliser ce "déplacement de lignes" pour la nouvelle colonne, de sorte que la nouvelle colonne a la valeur d'un champ de la ligne précédente (comme indiqué dans l'exemple). Je n'ai pas non plus trouvé quoi que ce soit dans la documentation de l'API sur la façon d'accéder à une certaine ligne dans un DF par index.

Toute aide serait appréciée.

Répondre

24

Vous pouvez utiliser la fonction de fenêtre lag comme suit

from pyspark.sql.functions import lag, col 
from pyspark.sql.window import Window 

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"]) 
w = Window().partitionBy().orderBy(col("id")) 
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show() 

## +---+---+-------+ 
## | id|num|new_col| 
## +---+---+-------| 
## | 2|3.0| 5.0| 
## | 3|7.0| 3.0| 
## | 4|9.0| 7.0| 
## +---+---+-------+ 

mais certaines questions importantes:

  1. si vous avez besoin d'une opération globale (non partitionné par d'autres colonnes/colonnes), il est extrêmement inefficace.
  2. vous avez besoin d'un moyen naturel de commander vos données.

Alors que le deuxième problème n'est presque jamais un problème, le premier peut être un deal-breaker. Si tel est le cas, vous devez simplement convertir votre DataFrame en RDD et calculer manuellement lag. Voir par exemple:

Autres liens utiles: