2017-10-04 13 views
0

En utilisant Spark dataframe, je dois calculer le pourcentage en utilisant le dessous formule complexe:Spark dataframe GroupBy et calculer la fonction d'agrégat complexe

Groupe par « KEY » et calculer « re_pct » comme (somme (sa)/somme (sa/(PCT/100))) * 100

Par exemple, l'entrée est dataframe

val values1 = List(List("01", "20000", "45.30"), List("01", "30000", "45.30")) 
    .map(row => (row(0), row(1), row(2))) 

val DS1 = values1.toDF("KEY", "SA", "PCT") 
DS1.show() 

+---+-----+-----+ 
|KEY| SA| PCT| 
+---+-----+-----+ 
| 01|20000|45.30| 
| 01|30000|45.30| 
+---+-----+-----+ 

Résultat attendu:

+---+-----+--------------+ 
|KEY| re_pcnt   | 
+---+-----+--------------+ 
| 01| 45.30000038505 | 
+---+-----+--------------+ 

J'ai essayé de calculer comme ci-dessous

val result = DS1.groupBy("KEY").agg(((sum("SA").divide(
    sum(
    ("SA").divide(
     ("PCT").divide(100) 
    ) 
) 
)) * 100).as("re_pcnt")) 

Mais face Erreur: (36, 16) fracture de la valeur n'est pas membre de chaîne ("SA") diviser ({

Toute suggestion. mettre en œuvre la logique ci-dessus?

Répondre

1

Vous pouvez essayer d'importer spark.implicits._ puis utilisez $ de se référer à une colonne.

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

val result = DS1.groupBy("KEY") 
    .agg(((sum($"SA").divide(sum(($"SA").divide(($"PCT").divide(100))))) * 100) 
    .as("re_pcnt")) 

Ce qui vous donnera la sortie demandée. Si vous ne souhaitez pas importer, vous pouvez toujours utiliser la commande col() au lieu de $.


Il est possible d'utiliser une chaîne en entrée à la fonction agg() avec l'utilisation de expr(). Cependant, la chaîne d'entrée doit être modifiée un peu. Ce qui suit donne exactement le même résultat que précédemment, mais utilise une chaîne à la place:

val opr = "sum(SA)/(sum(SA/(PCT/100))) * 100" 
val df = DS1.groupBy("KEY").agg(expr(opr).as("re_pcnt")) 

Notez que .as("re_pcnt") doivent être à l'intérieur de la méthode agg(), il ne peut pas être à l'extérieur.

+0

oui ... ça marche. Merci beaucoup. Maintenant je veux passer le calcul en utilisant une variable comme ci-dessous val formulaCal = "((somme (col (\" SA \ ")). Divide (somme ((col (\" SA \ ")). diviser ((col (\ "PCT \")). diviser (100))))) * 100" val result2 = DS1.groupBy ("clé") .agg (formulaCal) .as ("re_pcnt ")) \t Mais cela ne fonctionne pas – raam

+0

@raam Ajout d'informations supplémentaires à la réponse indiquant comment utiliser une expression en tant que variable (de légères modifications sont nécessaires pour l'expression). Si la réponse vous a aidé, pensez à [accepter/upvoting] (https://stackoverflow.com/help/someone-answers). – Shaido

+1

... Merci beaucoup .. c'est exactement ce que je cherche. – raam

0

Votre code fonctionne presque parfaitement. Il suffit de mettre le symbole « $ » afin de vous indiquer vous faire passer une colonne:

val result = DS1.groupBy($"KEY").agg(((sum($"SA").divide(
    sum(
    ($"SA").divide(
     ($"PCT").divide(100) 
    ) 
) 
)) * 100).as("re_pcnt")) 

Voici la sortie:

result.show() 
+---+-------+                 
|KEY|re_pcnt| 
+---+-------+ 
| 01| 45.3| 
+---+-------+