2017-09-01 4 views
0

Je suis très nouveau pour scala (En général, je le fais dans R)Scala/Spark: Conversion de données zéro gonflé à dataframe libsvm

J'ai importé un grand dataframe (colonnes, 2000+ lignes 100000+) qui est zéro-gonflé.

Tâche Pour convertir les données au format libsvm

étapes Si je comprends bien les étapes sont les suivantes

  1. Assurez-colonnes de fonction sont définies à DoubleType et Target est une Int
  2. Itérer sur chaque ligne, en conservant chaque valeur> 0 dans un tableau et l'index de sa colonne dans un autre tableau
  3. Convertir en RDD [LabeledPoint]
  4. Enregistrer RDD en format libsvm

Je suis coincé sur 3 (mais peut-être) parce que je fais étape 2 mal.

Voici mon code:

Fonction principale:

@Test 
def testSpark(): Unit = 
{ 
try 
{ 

    var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv") 


    val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType) 

    val indexer = new StringIndexer() 
    .setInputCol("Majors_Final") 
    .setOutputCol("Majors_Final_Indexed") 
    val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped) 
    val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType) 



    //only doubles accepted by sparse vector, so that's what we filter for 
    val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType) 

    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) 


    val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF() 


    assertTrue(true) 
} 
catch 
{ 
    case ex: Exception => 
    { 

    println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}") 
    fail() 
    } 
    } 
} 

Convertir chaque ligne à LabeledPoint:

@throws(classOf[Exception]) 
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint = 
{ 
    try 
    { 
    val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq) 

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*) 
    val rowValuesItr: Iterable[Double] = sortedValuesMap.values 

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]() 
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]() 
    var currentPosition: Int = 0 
    rowValuesItr.foreach 
    { 
     kv => 
     if (kv > 0) 
     { 
      valuesArray += kv; 
      positionsArray += currentPosition; 
     } 
     currentPosition = currentPosition + 1; 
    } 

    val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray)) 

    return lp 

    } 
    catch 
    { 
    case ex: Exception => 
    { 
     throw new Exception(ex) 
    } 
    } 
} 

Problème Alors je tente de créer un dataframe de la points d'attache qui peuvent facilement être convertis en un RDD.

val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF() 

Mais je reçois l'erreur suivante:

SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

+1

Avez-vous essayé 'spark.implicits._' d'importation comme le message d'erreur mentionné? En outre, habituellement 'return' n'est pas utilisé dans scala, il peut créer des problèmes. – Shaido

+0

Tâche non sérialisable et org.apache.spark.SparkException: Tâche non sérialisable après l'ajout d'implicits – Jake

+0

Je pense que je dois regarder les matrices – Jake

Répondre

0

OK, donc j'Ignoré dataframe et créé un tableau de LabeledPoints Whish est facilement converti en un RDD. Le reste est facile.

Je souligne que, même si cela fonctionne, je suis nouveau à scala et il peut y avoir des façons plus efficaces de le faire.

Fonction principale est maintenant comme suit:

val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv") 
    val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType) 

    val indexer = new StringIndexer() 
    .setInputCol("Majors_Final") 
    .setOutputCol("Majors_Final_Indexed") 
    val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped) 
    val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType) 

    mDFFinal.show() 
    //only doubles accepted by sparse vector, so that's what we filter for 
    val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType) 
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) 

    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]() 

    mDFFinal.collect().foreach 
    { 

    row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed")); 

    } 

    val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq) 

    MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")