2015-08-12 2 views
14

J'utilise PySpark et j'ai une base de données Spark avec un tas de colonnes numériques. Je veux ajouter une colonne qui est la somme de toutes les autres colonnes. Supposons que mes données contiennent les colonnes "a", "b" et "c". Je sais que je peux le faire:Ajouter une somme de colonnes comme nouvelle colonne dans la base de données PySpark

df.withColumn('total_col', df.a + df.b + df.c) 

Le problème est que je ne veux pas taper chaque colonne individuellement et les ajouter, surtout si j'ai beaucoup de colonnes. Je veux être en mesure de le faire automatiquement ou en spécifiant une liste de noms de colonnes que je veux ajouter. Y a-t-il une autre façon de faire cela?

+0

Ceci est beaucoup plus facile avec les RDD que les données par exemple. si data est un tableau représentant une ligne, alors vous pouvez faire 'RDD.map (données lambda: (données, somme (données)))'. La principale raison pour laquelle cela est plus difficile avec une structure de données spark est de déterminer ce qui est autorisé en tant qu'expression de colonne dans 'withColumn'. Cela ne semble pas très bien documenté. – Paul

+0

Cela ne semble pas fonctionner non plus (PySpark 1.6.3): 'dftest.withColumn (" times ", sum ((dftest [c]> 2) .cast (" int ") pour c dans dftest.columns [1:])) 'et puis, ' dftest.select ('a', 'b', 'c', 'd'). Rdd.map (lambda x: (x, somme (x))) .prendre (2) ' Ne semble pas fonctionner –

Répondre

24

Ce n'était pas évident. Je ne vois aucune somme basée sur des lignes des colonnes définies dans l'API Dataframes spark.

Version 2

Cela peut se faire de manière assez simple:

newdf = df.withColumn('total', sum(df[col] for col in df.columns)) 

df.columns est fourni par pyspark comme une liste de chaînes donnant tous les noms de colonnes dans la Spark dataframe. Pour une somme différente, vous pouvez fournir toute autre liste de noms de colonnes à la place.

Je n'ai pas essayé cela comme ma première solution parce que je ne savais pas comment il se comporterait. Mais ça fonctionne.

Version 1

Ceci est trop compliqué, mais fonctionne aussi bien.

Vous pouvez le faire:

  1. utilisation df.columns pour obtenir une liste des noms des colonnes
  2. utilisation que les noms liste pour une liste des colonnes
  3. passe cette liste pour quelque chose qui invoquera la fonction d'ajout de colonne surchargée dans un fold-type functional manner

Avec reduce de python, une certaine connaissance de la façon dont fonctionne la surcharge opérateur, et le code pyspark pour les colonnes here qui devient:

def column_add(a,b): 
    return a.__add__(b) 

newdf = df.withColumn('total_col', 
     reduce(column_add, (df[col] for col in df.columns))) 

RemarqueCet est un python réduire, pas une étincelle RDD réduire, et le terme entre parenthèses dans le second paramètre pour réduire exige la parenthèse parce qu'il est une expression de générateur de liste.

Testé, ça marche!

$ pyspark 
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache() 
>>> df 
DataFrame[a: bigint, b: bigint, c: bigint] 
>>> df.columns 
['a', 'b', 'c'] 
>>> def column_add(a,b): 
...  return a.__add__(b) 
... 
>>> df.withColumn('total', reduce(column_add, (df[col] for col in df.columns))).collect() 
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)] 
+0

@Salmonerd Merci. Il est parfois utile de se rappeler que la classe de données de l'étincelle est immuable, et donc, pour effectuer des changements dans les données, vous devez appeler quelque chose qui renvoie une nouvelle image. – Paul

+3

La version 2 ne fonctionne pas avec Spark 1.5.0 et CDH-5.5.2. et Python version 3.4. Il lance une erreur: "AttributeError: l'objet 'generator' n'a pas d'attribut '_get_object_id" – Hemant

+0

Les deux solutions sont sympas et soignées. Je me demande pourquoi vous n'avez pas utilisé les fonctions définies par l'utilisateur pour cela? – ThePrincess