Je suis très nouveau pour scala (En général, je le fais dans R)Scala/Spark: Conversion de données zéro gonflé à dataframe libsvm
J'ai importé un grand dataframe (colonnes, 2000+ lignes 100000+) qui est zéro-gonflé.
Tâche Pour convertir les données au format libsvm
étapes Si je comprends bien les étapes sont les suivantes
- Assurez-colonnes de fonction sont définies à DoubleType et Target est une Int
- Itérer sur chaque ligne, en conservant chaque valeur> 0 dans un tableau et l'index de sa colonne dans un autre tableau
- Convertir en RDD [LabeledPoint]
- Enregistrer RDD en format libsvm
Je suis coincé sur 3 (mais peut-être) parce que je fais étape 2 mal.
Voici mon code:
Fonction principale:
@Test
def testSpark(): Unit =
{
try
{
var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
assertTrue(true)
}
catch
{
case ex: Exception =>
{
println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
fail()
}
}
}
Convertir chaque ligne à LabeledPoint:
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
try
{
val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
val rowValuesItr: Iterable[Double] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))
return lp
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
Problème Alors je tente de créer un dataframe de la points d'attache qui peuvent facilement être convertis en un RDD.
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
Mais je reçois l'erreur suivante:
SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
Avez-vous essayé 'spark.implicits._' d'importation comme le message d'erreur mentionné? En outre, habituellement 'return' n'est pas utilisé dans scala, il peut créer des problèmes. – Shaido
Tâche non sérialisable et org.apache.spark.SparkException: Tâche non sérialisable après l'ajout d'implicits – Jake
Je pense que je dois regarder les matrices – Jake