2017-10-14 2 views
0

Je voudrais calculer un mode pour plusieurs colonnes en même temps dans Spark et utiliser ces valeurs calculées pour imputer des erreurs dans un DataFrame. J'ai trouvé comment calculer par exemple un moyen, mais un mode est plus complexe je pense.Calculer un mode pour plusieurs colonnes

Voici un calcul de moyenne:

val multiple_mean = df.na.fill(df.columns.zip(
    df.select(intVars.map(mean(_)): _*).first.toSeq 
).toMap) 

Je suis en mesure de calculer un mode en mode de force brute:

var list = ArrayBuffer.empty[Float] 

for(column <- df.columns){ 
    list += df.select(column).groupBy(col(column)).count().orderBy(desc("count")).first.toSeq(0).asInstanceOf[Float] 
} 

val multiple_mode = df.na.fill(df.columns.zip(list.toSeq).toMap) 

Quelle façon serait la meilleure si l'on considère une performance?

Nous vous remercions de votre aide.

Répondre

2

Vous pouvez utiliser UserDefinedAggregateFunction. Le code ci-dessous est testé dans l'étincelle 1.6.2

Créez d'abord une classe qui étend UserDefinedAggregateFunction.

import org.apache.spark.sql.Row 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class ModeUDAF extends UserDefinedAggregateFunction{ 

    override def dataType: DataType = StringType 

    override def inputSchema: StructType = new StructType().add("input", StringType) 

    override def deterministic: Boolean = true 

    override def bufferSchema: StructType = new StructType().add("mode", MapType(StringType, LongType)) 

    override def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = Map.empty[Any, Long] 
    } 

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val buff0 = buffer.getMap[Any, Long](0) 
    val inp = input.get(0) 
    buffer(0) = buff0.updated(inp, buff0.getOrElse(inp, 0L) + 1L) 
    } 

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    val mp1 = buffer1.getMap[Any, Long](0) 
    val mp2 = buffer2.getMap[Any, Long](0) 

    buffer1(0) = mp1 ++ mp2.map { case (k, v) => k -> (v + mp1.getOrElse(k, 0L)) } 
    } 

    override def evaluate(buffer: Row): Any = { 
    lazy val st = buffer.getMap[Any, Long](0).toStream 
    val mode = st.foldLeft(st.head){case (e, s) => if (s._2 > e._2) s else e} 
    mode._1 
    } 

} 

Après l'avoir utilisé, vous pouvez l'utiliser de la manière suivante.

val modeColumnList = List("some", "column", "names") // or df.columns.toList 
val modeAgg = new ModeUDAF() 
val aggCols = modeColumnList.map(c => modeAgg(df(c))) 
val aggregatedModeDF = df.agg(aggCols.head, aggCols.tail: _*) 
aggregatedModeDF.show() 

Vous pouvez également utiliser .collect sur la dernière image pour collecter le résultat dans une structure de données scala.

Remarque: Les performances de cette solution dépendent de la cardinalité de la colonne d'entrée.

+0

Merci, je vois que c'est seulement résonable lorsque la cardinalité est faible. J'essaye ceci sur mes données générées où chaque catégorie a seulement des valeurs 1,2,3 et cette méthode était très lente. –