2017-07-31 1 views
1

J'ai une grande trame de données avec 222 colonnes que je veux faire comme l'exemple suivantvaleur de vente d'une rangée à l'autre scala dataaframe

|id  |day   |col1 |col2 | col3 .................... 
+----------+----------------+-------+-----+ 
|  329|    0| null|2.0 
|  329|    42| null|null 
|  329|    72| 5.55|null 
|  329|    106| null|null 
|  329|    135| null|3.0 
|  329|    168| null|4.0 
|  329|    189| 4.995|null 
|  329|    212| null|6.0 
|  329|    247| null|null 
|  329|    274| null|8.0 



|id  |  day  |col1 |col2 |....................... 
+----------+----------------+-------+-----+ 
|  329|    0| null|2.0 
|  329|    42| null|2.0 
|  329|    72| 5.55|2.0 
|  329|    106| 5.55|2.0 
|  329|    135| 5.55|3.0 
|  329|    168| 5.55|4.0 
|  329|    189| 4.995|4.0 
|  329|    212| 4.995|6.0 
|  329|    247| 4.995|6.0 
|  329|    274| 4.995|8.0 
. 
. 
. 
. 
. 

1.read ligne 1 2.i ont id unique de 85 duodecies et chaque id ont 10 résultats (par exemple montré qu'un seul ID) 3.if dans la ligne 2 données sont présentes pas le prendre à partir de la ligne précédente ID

je suis tel résultat

id   | day   |original_col1 |Result_col1|prevValue| 
+----------+----------------+--------------+-----------+---------+ 
|  329|    0| null  | null |  null| 
|  329|    42| null  | null |  null| 
|  329|    72| 5.55  | 5.55 |  null| 
|  329|    106| null  | 5.55 |  5.55| 
|  329|    135| null  | null |  null| 
|  329|    168| null  | null |  null| 
|  329|    189| 4.995  | 4.995 |  null| 
|  329|    212| null  | 4.995 | 4.995| 
|  330|.................................................... 
|  330|..................................................... 
     . 
+0

Y at-il un moyen déterministe de trier les données pour pouvoir utiliser les fonctions de fenêtrage (lag)? Je comprends que vous voulez appliquer la logique ci-dessus dans une partition définie par col "id" mais à moins que vous ayez un moyen de définir un ordre (en supposant que l'ordre compte), dans le cas de partition avec "1" vous pourriez vous retrouver avec la valeur nulle dans la rangée 1/2/3 et les résultats vont être différents. S'il n'y a pas d'ordre dans les données, vous pouvez essayer d'utiliser la fonction monotonically_increasing_id() pour générer un order_id juste après avoir lu les données du fichier/source. – Traian

+0

j'oublie ajouter une colonne s'il vous plaît vérifier maintenant, ID est unique et ID obligatoire est rien mais l'utilisateur et chaque ID ont moins de 6 enregistrements. –

Répondre

3

Vous ne pouvez pas y parvenir avec la fonction de fenêtrage existante (par exemple décalage). Vous auriez besoin d'utiliser un concept similaire pour le partitionnement et le tri, mais avec une logique personnalisée pour rouler les valeurs non nulles.

case class MyRec(id: Integer, day: Integer, col1: Option[Double], col2: Option[Double]) 

defined class MyRec 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

val ds = Seq(
    MyRec(329, 0, None, Some(2.0)), 
    MyRec(329, 42, None, None), 
    MyRec(329, 72, Some(5.55), None), 
    MyRec(329, 106, None, None), 
    MyRec(329, 135, None, Some(3.0)), 
    MyRec(329, 168, None, Some(4.0)), 
    MyRec(329, 189, Some(4.995), None), 
    MyRec(329, 212, None, Some(6.0)), 
    MyRec(329, 247, None, None), 
    MyRec(329, 274, None, Some(8.0)) 
).toDS() 

ds.printSchema() 
ds.show(false) 

val updated_ds = ds.repartition('id).sortWithinPartitions('id, 'day) 
    .mapPartitions(iter => { 
    var crtId: Integer = null 
    var prevId: Integer = null 
    var rollingVals = collection.mutable.Map[String, Option[Double]]() 
    for (rec <- iter) yield { 
     crtId = rec.id 

     // 1st record for new id 
     if (prevId == null || crtId != prevId) { 
     rollingVals = collection.mutable.Map[String, Option[Double]]() 
     prevId = crtId 
     } 

     rollingVals("col1") = if (rec.col1.isDefined) rec.col1 else rollingVals.getOrElse("col1", None) 
     rollingVals("col2") = if (rec.col2.isDefined) rec.col2 else rollingVals.getOrElse("col2", None) 
     MyRec(rec.id, rec.day, rollingVals("col1"), rollingVals("col2")) 
    } 
    }) 

updated_ds.printSchema() 
updated_ds.show(false) 

// Exiting paste mode, now interpreting. 

root 
|-- id: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- col1: double (nullable = true) 
|-- col2: double (nullable = true) 

+---+---+-----+----+ 
|id |day|col1 |col2| 
+---+---+-----+----+ 
|329|0 |null |2.0 | 
|329|42 |null |null| 
|329|72 |5.55 |null| 
|329|106|null |null| 
|329|135|null |3.0 | 
|329|168|null |4.0 | 
|329|189|4.995|null| 
|329|212|null |6.0 | 
|329|247|null |null| 
|329|274|null |8.0 | 
+---+---+-----+----+ 

root 
|-- id: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- col1: double (nullable = true) 
|-- col2: double (nullable = true) 

+---+---+-----+----+ 
|id |day|col1 |col2| 
+---+---+-----+----+ 
|329|0 |null |2.0 | 
|329|42 |null |2.0 | 
|329|72 |5.55 |2.0 | 
|329|106|5.55 |2.0 | 
|329|135|5.55 |3.0 | 
|329|168|5.55 |4.0 | 
|329|189|4.995|4.0 | 
|329|212|4.995|6.0 | 
|329|247|4.995|6.0 | 
|329|274|4.995|8.0 | 
+---+---+-----+----+ 

ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields] 
updated_ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields] 
+0

ça marche merci –

1

Utilisez la fonction de fenêtre, puis au cas où:

val df2 = df 
    .withColumn("prevValue", lag('col1, 1).over(Window.partitionBy('id).orderBy('day))) 
    .withColumn("col1", when('col1.isNull, 'prevValue).otherwise('col1)) 

importation spark.implicits._ également

+0

je suis nouveau donc s'il vous plaît comprendre mai il est facile erreur: la méthode surchargée valeur décalage avec des alternatives: (e: org.apache.spark.sql.Column, offset: Int, defaultValue: Any) org.apache.spark.sql .Column (columnName: String, offset: Int, defaultValue: Tous) org.apache.spark.sql.Column (columnName: String, offset: Int) org.apache.spark.sql.Column (e : org.apache.spark.sql.Column, offset: Int) org.apache.spark.sql.Column ne peut pas être appliqué à (Chaîne) –

+0

@RahulNirdhar Désolé, j'ai oublié un argument. Maintenant ça devrait marcher :) –

+0

merci, mais ça ne marche pas comme je veux, ça donne le mauvais résultat s'il vous plaît voir le résultat en question –