Je le code ci-dessous qui crée une trame de données comme ci-dessous:Parsing FRAM de données pour ajouter une nouvelle colonne et mettre à jour la colonne pyspark
ratings = spark.createDataFrame(
sc.textFile("myfile.json").map(lambda l: json.loads(l)),
)
ratings.registerTempTable("mytable")
final_df = sqlContext.sql("select * from mytable");
The data frame look something like this
Je stocker les created_at
et user_id
en liste:
user_id_list = final_df.select('user_id').rdd.flatMap(lambda x: x).collect()
created_at_list = final_df.select('created_at').rdd.flatMap(lambda x: x).collect()
et l'analyse syntaxique par l'un de la liste pour appeler une autre fonction:
for i in range(len(user_id_list)):
status=get_status(user_id_list[I],created_at_list[I])
Je veux créer une nouvelle colonne dans mon cadre de données appelé état et mettre à jour la valeur correspondant user_id_list
et created_at_list value
Je sais que je dois utiliser cette fonctionnalité - mais pas sûr de savoir comment procéder
final_df.withColumn('status', 'give the condition here')