2017-08-21 3 views
0

J'ai un dataframe qui a quelques colonnes comme ci-dessous:Comment modifier une valeur de cellule/s basée sur une condition dans Pyspark dataframe

 category| category_id| bucket| prop_count| event_count | accum_prop_count | accum_event_count 
----------------------------------------------------------------------------------------------------- 
nation | nation  | 1  | 222  |  444  | 555    | 6677

Cette trame de données commence à partir de 0 lignes et chaque fonction de mon script ajoute une rangée à ceci.

Il existe une fonction qui doit modifier les valeurs de 1 ou 2 cellules en fonction des conditions. Comment faire ça?

code:

schema = StructType([StructField("category", StringType()), StructField("category_id", StringType()), StructField("bucket", StringType()), StructField("prop_count", StringType()), StructField("event_count", StringType()), StructField("accum_prop_count",StringType())]) 
a_df = sqlContext.createDataFrame([],schema) 

a_temp = sqlContext.createDataFrame([("nation","nation",1,222,444,555)],schema) 
a_df = a_df.unionAll(a_temp) 

lignes ajoutées d'une autre fonction:

a_temp3 = sqlContext.createDataFrame([("nation","state",2,222,444,555)],schema) 
a_df = a_df.unionAll(a_temp3) 

maintenant à modifier, je suis en train de se joindre à une condition.

a_temp4 = sqlContext.createDataFrame([("state","state",2,444,555,666)],schema) 
a_df = a_df.join(a_temp4, [(a_df.category_id == a_temp4.category_id) & (some other cond here)], how = "inner") 

Mais ce code ne fonctionne pas. Je reçois une erreur:

 
+--------+-----------+------+----------+-----------+----------------+--------+-----------+------+----------+-----------+----------------+ 
|category|category_id|bucket|prop_count|event_count|accum_prop_count|category|category_id|bucket|prop_count|event_count|accum_prop_count| 
+--------+-----------+------+----------+-----------+----------------+--------+-----------+------+----------+-----------+----------------+ 
| nation|  state|  2|  222|  444|    555| state|  state|  2|  444|  555|    666| 
+--------+-----------+------+----------+-----------+----------------+--------+-----------+------+----------+-----------+----------------+ 

Comment résoudre ce problème? La sortie correcte doit avoir 2 lignes et la seconde ligne doit avoir une valeur mise à jour

Répondre

1

1). Une jointure interne supprimera les lignes de votre base de données initiale, si vous voulez avoir le même nombre de lignes que a_df (sur la gauche), vous avez besoin d'une jointure à gauche.

2). une condition == dupliquera les colonnes si vos colonnes ont les mêmes noms que vous pouvez utiliser une liste à la place.

3). J'imagine que "une autre condition" fait référence à bucket

4). Vous voulez garder la valeur de a_temp4 si elle existe (la jointure définira ses valeurs à vide si elle ne fonctionne pas), psf.coalesce vous permet de le faire

import pyspark.sql.functions as psf 
a_df = a_df.join(a_temp4, ["category_id", "bucket"], how="leftouter").select(
    psf.coalesce(a_temp4.category, a_df.category).alias("category"), 
    "category_id", 
    "bucket", 
    psf.coalesce(a_temp4.prop_count, a_df.prop_count).alias("prop_count"), 
    psf.coalesce(a_temp4.event_count, a_df.event_count).alias("event_count"), 
    psf.coalesce(a_temp4.accum_prop_count, a_df.accum_prop_count).alias("accum_prop_count") 
    ) 

+--------+-----------+------+----------+-----------+----------------+ 
|category|category_id|bucket|prop_count|event_count|accum_prop_count| 
+--------+-----------+------+----------+-----------+----------------+ 
| state|  state|  2|  444|  555|    666| 
| nation|  nation|  1|  222|  444|    555| 
+--------+-----------+------+----------+-----------+----------------+ 

Si vous ne travaillez avec une ligne dataframes vous devriez envisager de coder la mise à jour directement au lieu d'utiliser la jointure:

def update_col(category_id, bucket, col_name, col_val): 
    return psf.when((a_df.category_id == category_id) & (a_df.bucket == bucket), col_val).otherwise(a_df[col_name]).alias(col_name) 

a_df.select(
    update_col("state", 2, "category", "nation"), 
    "category_id", 
    "bucket", 
    update_col("state", 2, "prop_count", 444), 
    update_col("state", 2, "event_count", 555), 
    update_col("state", 2, "accum_prop_count", 666) 
).show()