2017-04-26 1 views
3

Tous,Comment utiliser quantilediscretizer d'allumage sur plusieurs colonnes

J'ai une configuration de pipeline ml comme ci-dessous

import org.apache.spark.ml.feature.QuantileDiscretizer 
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}  
import org.apache.spark.ml.Pipeline 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql._ 
import scala.util.Random 

val nRows = 10000 
val nCols = 1000 
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) } 
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) }) 
val df = spark.createDataFrame(data, schema) 
df.cache() 

//Get continuous feature name and discretize them 
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1) 
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df)) 
val pipeline = new Pipeline().setStages(discretizers) 
val model = pipeline.fit(df) 

Quand je lance ce, étincelle semble configurer chaque discretizer comme une tâche séparée. Existe-t-il un moyen d'exécuter tous les discrétiseurs en tant que travail unique avec ou sans pipeline? Merci pour l'aide, j'apprécie.

Répondre

0
import org.apache.spark.ml.feature.QuantileDiscretizer 

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)) 
val df = spark.createDataFrame(data).toDF("id", "hour") 

val discretizer = new QuantileDiscretizer() 
.setInputCol("hour") 
.setOutputCol("result") 
.setNumBuckets(3) 

val result = discretizer.fit(df).transform(df) 
result.show() 

pris de quantilediscretizer

il fonctionne comme un seul travail pour une seule colonne, au-dessous il fonctionne aussi comme un seul travail, mais pour plusieurs colonnes:

def discretizerFun (col: String, bucketNo: Int): 
org.apache.spark.ml.feature.QuantileDiscretizer = { 

val discretizer = new QuantileDiscretizer() 

discretizer 
.setInputCol(col) 
.setOutputCol(s"${col}_result") 
.setNumBuckets(bucketNo) 
} 


val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0, 
88.3), (4, 2.2, 0.8)) 

val df = spark.createDataFrame(data).toDF("id", "hour", "temp") 

df.show

val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df)) 

df.show

meilleur moyen est de convertir cette fonction en udf mais il est peut-être la question ayant trait à org.apache.spark.ml.feature.QuantileDiscretizer - type, si on peut le faire, alors vous aurez moyen agréable et propre de faire la transformation paresseuse

+0

C'est un colonne, je parle de plusieurs colonnes – sramalingam24

+0

ah, désolé, édité le code, cela fonctionne, pas très soignée cependant, vous aurez besoin de l'envelopper à nouveau avec quelque chose pour le rendre plus réutilisable –

+0

Il fonctionne de la même façon que l'utilisation d'un pipeline ml, mais bon effort. Cela ne semble pas être le cas, espérons qu'ils y ajouteront setInputCols dans un futur proche ou qu'ils devront écrire votre propre sélection de fonctionnalités. – sramalingam24