2017-10-17 12 views
0

Je le code ci-dessous qui crée une trame de données comme ci-dessous:Parsing FRAM de données pour ajouter une nouvelle colonne et mettre à jour la colonne pyspark

ratings = spark.createDataFrame(
    sc.textFile("myfile.json").map(lambda l: json.loads(l)), 
) 



ratings.registerTempTable("mytable") 

final_df = sqlContext.sql("select * from mytable"); 

The data frame look something like this

Je stocker les created_at et user_id en liste:

user_id_list = final_df.select('user_id').rdd.flatMap(lambda x: x).collect() 
created_at_list = final_df.select('created_at').rdd.flatMap(lambda x: x).collect() 

et l'analyse syntaxique par l'un de la liste pour appeler une autre fonction:

for i in range(len(user_id_list)): 
    status=get_status(user_id_list[I],created_at_list[I]) 

Je veux créer une nouvelle colonne dans mon cadre de données appelé état et mettre à jour la valeur correspondant user_id_list et created_at_list value

Je sais que je dois utiliser cette fonctionnalité - mais pas sûr de savoir comment procéder

final_df.withColumn('status', 'give the condition here') 

Répondre

0

Ne créez pas de listes. Donnez simplement une fonction UDF à la base de données

import pyspark.sql.functions as F 
status_udf = F.udf(lambda x: get_status(x[0], x[1])) 
df = df.select(df.columns + [status_udf(F.col('user_id_list'), \ 
       F.col('created_at_list value')).alias('status')])