2015-08-04 2 views
11

J'ai deux données appelées gauche et droite.Remplacement des valeurs nulles par 0 après la jointure externe gauche de la structure de l'étincelle

scala> left.printSchema 
root 
|-- user_uid: double (nullable = true) 
|-- labelVal: double (nullable = true) 
|-- probability_score: double (nullable = true) 

scala> right.printSchema 
root 
|-- user_uid: double (nullable = false) 
|-- real_labelVal: double (nullable = false) 

Ensuite, je les rejoins pour obtenir la Dataframe jointe. Il s'agit d'un jointure externe gauche. Toute personne intéressée par la fonction natjoin peut le trouver ici.

https://gist.github.com/anonymous/f02bd79528ac75f57ae8

scala> val joinedData = natjoin(predictionDataFrame, labeledObservedDataFrame, "left_outer") 

scala> joinedData.printSchema 
|-- user_uid: double (nullable = true) 
|-- labelVal: double (nullable = true) 
|-- probability_score: double (nullable = true) 
|-- real_labelVal: double (nullable = false) 

Comme il est une jointure externe gauche, la colonne a real_labelVal lorsque user_uid est nulls pas présent dans le droit.

scala> val realLabelVal = joinedData.select("real_labelval").distinct.collect 
realLabelVal: Array[org.apache.spark.sql.Row] = Array([0.0], [null]) 

je souhaite remplacer les valeurs nulles de la colonne avec 1,0 realLabelVal.

Actuellement je fais ce qui suit:

  1. Je trouve l'index de la colonne real_labelval et utilise l'API spark.sql.Row pour définir les valeurs nulles à 1,0. (Cela me donne un RDD [Row])
  2. Puis j'applique le schéma de la dataframe jointe pour obtenir la frameframe nettoyée.

Le code est le suivant:

val real_labelval_index = 3 
def replaceNull(row: Row) = { 
    val rowArray = row.toSeq.toArray 
    rowArray(real_labelval_index) = 1.0 
    Row.fromSeq(rowArray) 
} 

val cleanRowRDD = joinedData.map(row => if (row.isNullAt(real_labelval_index)) replaceNull(row) else row) 
val cleanJoined = sqlContext.createDataFrame(cleanRowRdd, joinedData.schema) 

Y at-il une façon élégante ou efficace de le faire?

Le lissage n'a pas beaucoup aidé. Merci d'avance.

+0

Que signifie nat dans natjoin? –

+1

@JosiahYoder nat signifie Natural Join. –

Répondre

23

Avez-vous essayé d'utiliser na

joinedData.na.fill(1.0, Seq("real_labelval")) 
+0

Merci pour la réponse rapide. Le problème est que nous utilisons la distribution cloudera et que le cluster a une étincelle 1.3.0. Les fonctions de remplissage ont été introduites dans l'étincelle 1.4 je pense. J'accepte ceci comme réponse. –

+0

Ai-je besoin d'importer quoi que ce soit pour utiliser na? Merci –

+0

@GavinNiu Non, 'na' est une méthode directement sur' DataFrame' –