2017-10-20 40 views
1

Assez nouveau pour spark/scala. Je me demande s'il existe un moyen facile d'agréger un tableau [Double] d'une manière colonne-sage. Voici un exemple:Aggregating array element wise

c1 c2 c3 
------------------------- 
1  1 [1.0, 1.0, 3.4] 
1  2 [1.0, 0,0, 4.3] 
2  1 [0.0, 0.0, 0.0] 
2  3 [1.2, 1.1, 1.1] 

Ensuite, lors de l'agrégation, je finirais avec une table qui ressemble à:

c1 c3prime 
------------- 
1  [2.0, 1.0, 7.7] 
2  [1.2, 1.1, 1.1] 

En regardant UDAF maintenant, mais je me demandais si je dois coder du tout?

Merci de votre considération.

Répondre

0

prenant les valeurs de tableau de c3 sont de la même taille, vous pouvez la somme des élément par élément colonne au moyen d'une UDF comme ci-dessous:

val df = Seq(
    (1, 1, Seq(1.0, 1.0, 3.4)), 
    (1, 2, Seq(1.0, 0.0, 4.3)), 
    (2, 1, Seq(0.0, 0.0, 0.0)), 
    (2, 3, Seq(1.2, 1.1, 1.1)) 
).toDF("c1", "c2", "c3") 

def elementSum = udf(
    (a: Seq[Seq[Double]]) => { 
    val zeroSeq = Seq.fill[Double](a(0).size)(0.0) 
    a.foldLeft(zeroSeq)(
     (a, x) => (a zip x).map{ case (u, v) => u + v } 
    ) 
    } 
) 

val df2 = df.groupBy("c1").agg(
    elementSum(collect_list("c3")).as("c3prime") 
) 

df2.show(truncate=false) 
// +---+-----------------------------+ 
// |c1 |c3prime      | 
// +---+-----------------------------+ 
// |1 |[2.0, 1.0, 7.699999999999999]| 
// |2 |[1.2, 1.1, 1.1]    | 
// +---+-----------------------------+ 
+0

Salut Leo, c'est exceptionnel. Merci beaucoup. L'exemple artificiel fonctionne exactement comme vous l'avez spécifié et comme prévu. – Kirby

0

est ici un sans UDF. Il utilise les fonctions de fenêtre de Spark. Je ne sais pas comment il est efficace, car il implique plusieurs groupBy s

df.show 

// +---+---+---------------+ 
// | c1| c2|    c3| 
// +---+---+---------------+ 
// | 1| 1|[1.0, 1.0, 3.4]| 
// | 1| 2|[1.0, 0.0, 4.3]| 
// | 2| 1|[0.0, 0.0, 0.0]| 
// | 2| 2|[1.2, 1.1, 1.1]| 
// +---+---+---------------+ 

import org.apache.spark.sql.expressions.Window 

val window = Window.partitionBy($"c1", $"c2").orderBy($"c1", $"c2") 

df.withColumn("c3", explode($"c3")) 
    .withColumn("rn", row_number() over window) 
    .groupBy($"c1", $"rn").agg(sum($"c3").as("c3")) 
    .orderBy($"c1", $"rn") 
    .groupBy($"c1") 
    .agg(collect_list($"c3").as("c3prime")).show 

// +---+--------------------+ 
// | c1|    c3prime| 
// +---+--------------------+ 
// | 1|[2.0, 1.0, 7.6999...| 
// | 2|  [1.2, 1.1, 1.1]| 
// +---+--------------------+ 
+0

Merci philantrovert. C'est une perspective intéressante aussi. Je vais jouer avec ça pour voir si ça aide. – Kirby

0

Vous pouvez combiner certains inbuilt functions tels que groupBy, agg, sum, array, alias (as), etc. pour obtenir la finale désirée dataframe.

import org.apache.spark.sql.functions._ 
df.groupBy("c1") 
    .agg(sum($"c3"(0)).as("c3_1"), sum($"c3"(1)).as("c3_2"), sum($"c3"(2)).as("c3_3")) 
    .select($"c1", array("c3_1","c3_2","c3_3").as("c3prime")) 

J'espère que la réponse est utile.