2017-10-16 3 views
0

J'ai une trame de données dans pyspark:colonne Ajout à dataframe et mise à jour à pyspark

ratings = spark.createDataFrame(
    sc.textFile("transactions.json").map(lambda l: json.loads(l)), 
) 
ratings.show() 

+--------+-------------------+------------+----------+-------------+-------+ 
|click_id|   created_at|   ip|product_id|product_price|user_id| 
+--------+-------------------+------------+----------+-------------+-------+ 
|  123|2016-10-03 12:50:33| 10.10.10.10|  98373|  220.5|  1| 
|  124|2017-02-03 11:51:33| 10.13.10.10|  97373|  320.5|  1| 
|  125|2017-10-03 12:52:33| 192.168.2.1|  96373|   20.5|  1| 
|  126|2017-10-03 13:50:33|172.16.11.10|  88373|  220.5|  2| 
|  127|2017-10-03 13:51:33| 10.12.15.15|  87373|  320.5|  2| 
|  128|2017-10-03 13:52:33|192.168.1.10|  86373|   20.5|  2| 
|  129|2017-08-03 14:50:33| 10.13.10.10|  78373|  220.5|  3| 
|  130|2017-10-03 14:51:33| 12.168.1.60|  77373|  320.5|  3| 
|  131|2017-10-03 14:52:33| 10.10.30.30|  76373|   20.5|  3| 
+--------+-------------------+------------+----------+-------------+-------+ 

ratings.registerTempTable("transactions") 
final_df = sqlContext.sql("select * from transactions"); 

Je veux ajouter une nouvelle colonne à cette trame de données appelé status puis mettre à jour la colonne d'état en fonction de created_at et user_id.

Le created_at et user_id sont lues à partir de la table donnée transations et passé à une fonction qui renvoie le get_status(user_id,created_at)status. Ce status doit être mis dans la table de transaction comme une nouvelle colonne pour le correspondant user_id et created_at

Puis-je exécuter et modifier la commande de mise à jour pyspark? Comment cela peut-il être fait en utilisant pyspark?

Répondre

0

Vous ne savez pas exactement ce que vous voulez faire exactement. Vous devriez vérifier window functions ils vous permettent de comparer, somme ... des lignes dans un cadre.

Par exemple

import pyspark.sql.functions as psf 
from pyspark.sql import Window 
w = Window.partitionBy("user_id").orderBy(psf.desc("created_at")) 
ratings.withColumn(
    "status", 
    psf.when(psf.row_number().over(w) == 1, "active").otherwise("inactive")).sort("click_id").show() 

+--------+-------------------+------------+----------+-------------+-------+--------+ 
|click_id|   created_at|   ip|product_id|product_price|user_id| status| 
+--------+-------------------+------------+----------+-------------+-------+--------+ 
|  123|2016-10-03 12:50:33| 10.10.10.10|  98373|  220.5|  1|inactive| 
|  124|2017-02-03 11:51:33| 10.13.10.10|  97373|  320.5|  1|inactive| 
|  125|2017-10-03 12:52:33| 192.168.2.1|  96373|   20.5|  1| active| 
|  126|2017-10-03 13:50:33|172.16.11.10|  88373|  220.5|  2|inactive| 
|  127|2017-10-03 13:51:33| 10.12.15.15|  87373|  320.5|  2|inactive| 
|  128|2017-10-03 13:52:33|192.168.1.10|  86373|   20.5|  2| active| 
|  129|2017-08-03 14:50:33| 10.13.10.10|  78373|  220.5|  3|inactive| 
|  130|2017-10-03 14:51:33| 12.168.1.60|  77373|  320.5|  3|inactive| 
|  131|2017-10-03 14:52:33| 10.10.30.30|  76373|   20.5|  3| active| 
+--------+-------------------+------------+----------+-------------+-------+--------+ 

Il vous donne le dernier clic de chaque utilisateur

Si vous voulez passer un UDF pour créer une nouvelle colonne de deux autres existants. Disons que vous avez une fonction qui prend la user_id et created_at comme arguments

from pyspark.sql.types import * 
def get_status(user_id,created_at): 
    ... 

get_status_udf = psf.udf(get_status, StringType()) 

StringType() ou selon vos dataType sorties de fonction

ratings.withColumn("status", get_status_udf("user_id", "created_at")) 
+0

Le '' created_at' et user_id' sont lues à partir de la table donnée 'transations 'et passé à une fonction' get_status (user_id, created_at) 'qui renvoie le' status'. Ce 'status' doit être placé dans la table de transaction en tant que nouvelle colonne pour les' user_id' et 'created_at' correspondants – Firstname