j'ai essayé de mettre en œuvre pivotant similaire au serveur sql étincelleScala Spark - agrégation et Pivotant Sur la base de temps Période

A partir de maintenant, j'utilise SqlContext et l'application de toute la transformation dans le sql. Je voudrais savoir si je peux tirer directement du serveur sql et implémenter la fonction de pivot en utilisant spark.

Voici un exemple de ce que je suis en train de requêtes SQL Achievement Server dessous-

create table #temp(ID Int, MonthPrior int, Amount float);

insert into #temp values (100,1,10),(100,2,20),(100,3,30),(100,4,10),(100,5,20),(100,6,60),(200,1,10),(200,2,20),(200,3,30),(300,4,10),(300,5,20),(300,6,60);

select * from #temp;

| ID | MoisPrix | Montant |
| ------- | ---------- | ------ |
| 100 | 1 | 10 |
| 100 | 2 | 20 |
| 100 | 3 | 30 |
| 100 | 4 | 10 |
| 100 | 5 | 20 |
| 100 | 6 | 60 |
| 200 | 1 | 10 |
| 200 | 2 | 20 |
| 200 | 3 | 30 |
| 300 | 4 | 10 |
| 300 | 5 | 20 |
| 300 | 6 | 60 |

Select ID,coalesce([1],0) as Amount1Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0) as Amount1to3Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0)+coalesce([4],0)+coalesce([5],0)+coalesce([6],0) as Amount_AllMonths from (select * from #temp) A pivot (sum(Amount) for MonthPrior in ([1],[2],[3],[4],[5],[6])) as Pvt

| ID | Amount1Mth | Amount1to3Mth | Amount_AllMonths |
| ------- | ------- | ------- | --- |
| 100 | 10 | 60 | 150 |
| 200 | 10 | 60 | 60 |
| 300 | 0 | 0 | 90 |



Si votre colonne Amount est de Pour le type Decimal, il est préférable d'utiliser java.math.BigDecimal comme type d'argument correspondant. Notez que les méthodes + et sum ne sont plus applicables par conséquent sont remplacés par add et reduce respectivement.

import org.apache.spark.sql.functions._ 
import java.math.BigDecimal 

val df = Seq(
    (100, 1, new BigDecimal(10)), 
    (100, 2, new BigDecimal(20)), 
    (100, 3, new BigDecimal(30)), 
    (100, 4, new BigDecimal(10)), 
    (100, 5, new BigDecimal(20)), 
    (100, 6, new BigDecimal(60)), 
    (200, 1, new BigDecimal(10)), 
    (200, 2, new BigDecimal(20)), 
    (200, 3, new BigDecimal(30)), 
    (300, 4, new BigDecimal(10)), 
    (300, 5, new BigDecimal(20)), 
    (300, 6, new BigDecimal(60)) 
).toDF("ID", "MonthPrior", "Amount") 

// UDF to combine 2 array-type columns to map 
def arrayToMap = udf(
    (a: Seq[Int], b: Seq[BigDecimal]) => (a zip b).toMap 

// Create array columns which get zipped into a map 
val df2 = df.groupBy("ID").agg(
    col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap") 

// UDF to sum map values for keys from 1 thru n (0 for all) 
def sumMapValues = udf(
    (m: Map[Int, BigDecimal], n: Int) => 
    if (n > 0) 
     m.collect{ case (k, v) => if (k <= n) v else new BigDecimal(0) }.reduce(_ add _) 
     m.collect{ case (k, v) => v }.reduce(_ add _) 

val df3 = df2.withColumn("Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1))). 
    withColumn("Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3))). 
    withColumn("Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0))). 
    select(col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths")) 

| ID|   Amount1Mth|  Amount1to3Mth| Amount_AllMonths| 
|300|    0E-18|    0E-18|90.00000000000000...| 

Une approche serait de créer une colonne de tableaux de MonthPrior et Amount-type de carte, et appliquer une UDF que la somme des valeurs de carte basée sur un paramètre entier:

val df = Seq(
    (100, 1, 10), 
    (100, 2, 20), 
    (100, 3, 30), 
    (100, 4, 10), 
    (100, 5, 20), 
    (100, 6, 60), 
    (200, 1, 10), 
    (200, 2, 20), 
    (200, 3, 30), 
    (300, 4, 10), 
    (300, 5, 20), 
    (300, 6, 60) 
).toDF("ID", "MonthPrior", "Amount") 

import org.apache.spark.sql.functions._ 

// UDF to combine 2 array-type columns to map 
def arrayToMap = udf(
    (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap 

// Aggregate columns into arrays and apply arrayToMap UDF to create map column 
val df2 = df.groupBy("ID").agg(
    col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap") 

// UDF to sum map values for keys from 1 thru n (0 for all) 
def sumMapValues = udf(
    (m: Map[Int, Int], n: Int) => 
    if (n > 0) m.collect{ case (k, v) => if (k <= n) v else 0 }.sum else 
     m.collect{ case (k, v) => v }.sum 

// Apply sumMapValues UDF to the map column 
val df3 = df2.withColumn("Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1))). 
    withColumn("Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3))). 
    withColumn("Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0))). 
    select(col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths")) 

| ID|Amount1Mth|Amount1to3Mth|Amount_AllMonths| 
|300|   0|   0|    90| 
|100|  10|   60|    150| 
|200|  10|   60|    60| 

Merci, @LeoC Je vais analyser cette approche. Semble fonctionner –


Heureux que cela aide.Veuillez fermer la question en acceptant la réponse si elle résout le problème affiché. –


(un: Seq [Int], b: Seq [Int]) => (un zip b) .toMap pose des problèmes. Je travaille sur ce 'ne peut pas résoudre 'UDF (col_1, col_2)' en raison de la non-concordance de type de données: l'argument 2 requiert le type de tableau, cependant, 'col_2' est de type array . ' Essayé en utilisant nombre/décimal dans udf. N'a toujours pas fonctionné –


Merci @LeoC. solution ci-dessus travaillé. J'ai également essayé le suivant -

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Column 

lazy val months = (((df select ($"MonthPrior") distinct) sort 
($"MonthPrior".asc)).rdd map (_.getAs[Int](0)) collect).toList 

lazy val sliceSpec = List((0, 2, "1-2"), (0, 3, "1-3"), (0, 4, "1-4"), (0, 5, "1-5"), (0, 6, "1-6")) 

lazy val createGroup: List[Any] => ((Int, Int, String) => Column) = sliceMe => (start, finish, aliasName) => 
    sliceMe slice (start, finish) map (value => col(value.toString)) reduce (_ + _) as aliasName 

lazy val grouper = createGroup(months).tupled 

lazy val groupedCols = sliceSpec map (group => grouper(group)) 

val pivoted = df groupBy ($"ID") pivot ("MonthPrior") agg (sum($"Amount")) 

val writeMe = pivoted select ((pivoted.columns map col) ++ (groupedCols): _*) 

z.show(writeMe sort ($"ID".asc))