J'ai une trame de données dans pyspark:colonne Ajout à dataframe et mise à jour à pyspark
ratings = spark.createDataFrame(
sc.textFile("transactions.json").map(lambda l: json.loads(l)),
)
ratings.show()
+--------+-------------------+------------+----------+-------------+-------+
|click_id| created_at| ip|product_id|product_price|user_id|
+--------+-------------------+------------+----------+-------------+-------+
| 123|2016-10-03 12:50:33| 10.10.10.10| 98373| 220.5| 1|
| 124|2017-02-03 11:51:33| 10.13.10.10| 97373| 320.5| 1|
| 125|2017-10-03 12:52:33| 192.168.2.1| 96373| 20.5| 1|
| 126|2017-10-03 13:50:33|172.16.11.10| 88373| 220.5| 2|
| 127|2017-10-03 13:51:33| 10.12.15.15| 87373| 320.5| 2|
| 128|2017-10-03 13:52:33|192.168.1.10| 86373| 20.5| 2|
| 129|2017-08-03 14:50:33| 10.13.10.10| 78373| 220.5| 3|
| 130|2017-10-03 14:51:33| 12.168.1.60| 77373| 320.5| 3|
| 131|2017-10-03 14:52:33| 10.10.30.30| 76373| 20.5| 3|
+--------+-------------------+------------+----------+-------------+-------+
ratings.registerTempTable("transactions")
final_df = sqlContext.sql("select * from transactions");
Je veux ajouter une nouvelle colonne à cette trame de données appelé status
puis mettre à jour la colonne d'état en fonction de created_at
et user_id
.
Le created_at
et user_id
sont lues à partir de la table donnée transations
et passé à une fonction qui renvoie le get_status(user_id,created_at)
status
. Ce status
doit être mis dans la table de transaction comme une nouvelle colonne pour le correspondant user_id
et created_at
Puis-je exécuter et modifier la commande de mise à jour pyspark? Comment cela peut-il être fait en utilisant pyspark?
Le '' created_at' et user_id' sont lues à partir de la table donnée 'transations 'et passé à une fonction' get_status (user_id, created_at) 'qui renvoie le' status'. Ce 'status' doit être placé dans la table de transaction en tant que nouvelle colonne pour les' user_id' et 'created_at' correspondants – Firstname