2017-10-12 1 views
0

Désolé pour le titre vague, je ne peux pas trouver de meilleure façon de le mettre. Je comprends un peu de python et j'ai de l'expérience avec les dataframes Pandas, mais récemment, j'ai été chargé de regarder quelque chose impliquant Spark et j'ai du mal à faire avancer mon projet.PySpark - Renvoyer une valeur dans une base de données, si une valeur survient sur une autre zone de données où deux champs correspondent

Je suppose que la meilleure façon d'expliquer cela est avec un petit exemple. Imaginez que j'ai dataframe A:

id | Name | 
-------------- 
1 | Random | 
2 | Random | 
3 | Random | 

En plus dataframe B:

id | Fruit | 
------------- 
1 | Pear | 
2 | Pear | 
2 | Apple | 
2 | Banana | 
3 | Pear | 
3 | Banana | 

Maintenant ce que je suis en train de faire est dataframe match A avec B (basé sur la correspondance identifiant) et itérer la colonne des fruits dans la base de données B. Si une valeur apparaît (disons Banana), je veux l'ajouter en tant que colonne à la base de données. Pourrait être une somme simple (à chaque fois que la banane apparaît ajouter 1 à une colonne), ou tout simplement classer si elle arrive une fois. Ainsi, par exemple, une sortie pourrait ressembler à ceci:

id | Name | Banana 
--------------------- 
1 | Random | 0 
2 | Random | 1 
3 | Random | 1 

Ma question est Itère dataframes Spark, et comment je peux connecter les deux si le match ne se produit. J'essayais de faire quelque chose à cet effet:

def fruit(input): 

    fruits = {"Banana" : "B"} 

    return fruits[input] 

fruits = df.withColumn("Output", fruit("Fruit")) 

Mais ça ne marche pas vraiment. Des idées? Excuses d'avance mon expérience avec Spark est très petite.

Répondre

0

Espérons que cela aide!

#sample data 
A = sc.parallelize([(1,"Random"), (2,"Random"), (3,"Random")]).toDF(["id", "Name"]) 
B = sc.parallelize([(1,"Pear"), (2,"Pear"), (2,"Apple"), (2,"Banana"), (3,"Pear"), (3,"Banana")]).toDF(["id", "Fruit"]) 

df_temp = A.join(B, A.id==B.id, 'inner').drop(B.id) 
df = df_temp.groupby(df_temp.id, df_temp.Name).\ 
    pivot("Fruit").\ 
    count().\ 
    na.fill(0) 
df.show() 

sortie est

+---+------+-----+------+----+ 
| id| Name|Apple|Banana|Pear| 
+---+------+-----+------+----+ 
| 1|Random| 0|  0| 1| 
| 3|Random| 0|  1| 1| 
| 2|Random| 1|  1| 1| 
+---+------+-----+------+----+ 

Modifier la note: Si vous ne souhaitez que peu de fruits puis

from pyspark.sql.functions import col 
#list of fruits you are interested in 
fruit_list = ["Pear", "Banana"] 
df = df_temp.\ 
    filter(col('fruit').isin(fruit_list)).\ 
    groupby(df_temp.id, df_temp.Name).\ 
    pivot("Fruit").\ 
    count().\ 
    na.fill(0) 
df.show() 


+---+------+------+----+ 
| id| Name|Banana|Pear| 
+---+------+------+----+ 
| 1|Random|  0| 1| 
| 3|Random|  1| 1| 
| 2|Random|  1| 1| 
+---+------+------+----+ 
+0

Merci pour cela, très utile! Question de suivi rapide, sauriez-vous comment je pourrais compter le champ pivoté basé sur les valeurs de la colonne plutôt que la colonne entière elle-même? Dans l'exemple ci-dessus j'ai eu quelques fruits mais dans le contexte réel j'ai des milliers de valeurs et je ne suis intéressé que par quelques-unes, serait génial de faire quelque chose comme une instruction if pour compter seulement si la valeur de la colonne correspond x – rubs90

+0

Heureux que cela a aidé! SVP vérifier ma mise à jour ci-dessus pour une réponse à votre question de suivi. – Prem