2017-10-01 10 views
1

Je voudrais créer une UDF qui effectue les opérations suivantes:Comment écrire UDF avec des valeurs comme références à d'autres colonnes?

A DataFrame a 5 colonnes et souhaitez créer la 6ème colonne avec la somme que la valeur qui contiennent le nom de la première et la deuxième colonne.

Permettez-moi d'imprimer le DataFrame et expliquer avec qui:

case class salary(c1: String, c2: String, c3: Int, c4: Int, c5: Int) 

val df = Seq(
    salary("c3", "c4", 7, 5, 6), 
    salary("c5", "c4", 8, 10, 20), 
    salary("c5", "c3", 1, 4, 9)) 
    .toDF() 

DataFrame résultat

+---+---+---+---+---+ 
| c1| c2| c3| c4| c5| 
+---+---+---+---+---+ 
| c3| c4| 7| 5| 6| 
| c5| c4| 8| 10| 20| 
| c5| c3| 1| 4| 9| 
+---+---+---+---+---+ 

df.withColumn("c6",UDFName(c1,c2)) 

Et le résultat de cette colonne doit être:

1º rang (C3, C4) Puis 7 + 5 = 12

2º Rangée (C5, C4) Puis 2 0 + 10 = 30

3e rangée (C5, C3) Ensuite, 9 + 1 = 10

+0

Veuillez considérer [accepter] (https://meta.stackexchange.com/questions/5234/how-does-accepting-an-answer-work) une réponse aux questions que vous posez. – Shaido

Répondre

1

Une fonction définie par l'utilisateur (UDF) a accès aux valeurs qui sont transmises directement en tant que paramètres d'entrée.

Si vous souhaitez accéder aux autres colonnes, un UDF n'aura accès qu'aux et vous les passerez en tant que paramètres d'entrée. Avec cela, vous devriez facilement réaliser ce que vous recherchez.

Je recommande fortement d'utiliser la fonction struct pour combiner toutes les autres colonnes.

struct (Col.: colonne *): Colonne Crée une nouvelle colonne de struct. Vous pouvez également utiliser la méthode Dataset.columns pour accéder aux colonnes à struct

colonnes: Array [chaîne] Retours tous les noms de colonnes comme un tableau.

2

Il n'y a vraiment pas besoin d'UDF ici. Il suffit d'utiliser la colonne MapType virtuelle:

import org.apache.spark.sql.functions.{col, lit, map} 

// We use an interleaved list of column name and column value 
val values = map(Seq("c3", "c4", "c5").flatMap(c => Seq(lit(c), col(c))): _*) 

// Check the first row 
df.select(values).limit(1).show(false) 
+------------------------------+ 
|map(c3, c3, c4, c4, c5, c5) | 
+------------------------------+ 
|Map(c3 -> 7, c4 -> 5, c5 -> 6)| 
+------------------------------+ 

et de l'utiliser dans l'expression:

df.withColumn("c6", values($"c1") + values($"c2")) 
+---+---+---+---+---+---+ 
| c1| c2| c3| c4| c5| c6| 
+---+---+---+---+---+---+ 
| c3| c4| 7| 5| 6| 12| 
| c5| c4| 8| 10| 20| 30| 
| c5| c3| 1| 4| 9| 10| 
+---+---+---+---+---+---+ 

Il est beaucoup plus propre, plus rapide et plus sûr que de traiter avec UDFs et Rows:

import org.apache.spark.sql.functions.{struct, udf} 
import org.apache.spark.sql.Row 

val f = udf((row: Row) => for { 
    // Use Options to avoid problems with null columns 
    // Explicit null checks should be faster, but much more verbose 
    c1 <- Option(row.getAs[String]("c1")) 
    c2 <- Option(row.getAs[String]("c2")) 

    // In this case we could (probably) skip Options below 
    // but Ints in Spark SQL can get null 
    x <- Option(row.getAs[Int](c1)) 
    y <- Option(row.getAs[Int](c2)) 
} yield x + y) 

df.withColumn("c6", f(struct(df.columns map col: _*))) 
+---+---+---+---+---+---+ 
| c1| c2| c3| c4| c5| c6| 
+---+---+---+---+---+---+ 
| c3| c4| 7| 5| 6| 12| 
| c5| c4| 8| 10| 20| 30| 
| c5| c3| 1| 4| 9| 10| 
+---+---+---+---+---+---+