2017-08-01 4 views
0

Quelle est la meilleure solution pour généraliser la conversion de RDD [Vector] en DataFrame avec scala/spark 1.6. Les entrées sont différentes RDD [Vector]. Le nombre de colonnes dans Vector peut être compris entre 1 et n pour différents RDD.Spark - Convertir RDD [Vector] en DataFrame avec des colonnes variables

J'ai essayé d'utiliser une bibliothèque sans forme, en leur indiquant le nombre et le type des colonnes déclarées. ES:

val df = rddVector.map(_.toArray.toList) 
    .collect { 
      case t: List[Double] if t.length == 3 => t.toHList[Double :: Double :: Double :: HNil].get.tupled.productArity 
    } 
    .toDF("column_1", "column_2", "column_3") 

Merci!

+0

D'après ce que je comprends, répondis-je quelque chose de similaire ici: https://stackoverflow.com/a/45009516/7224597 Pouvez-vous vérifier si cela fonctionne pour vous? – philantrovert

Répondre

2

Cela a fonctionné pour moi.

// Create a vector rdd 
    val vectorRDD = sc.parallelize(Seq(Seq(123L, 345L), Seq(567L, 789L), Seq(567L, 789L, 233334L))). 
    map(s => Vectors.dense(s.toSeq.map(_.toString.toDouble).toArray)) 

    // Calculate the maximum length of the vector to create a schema 
    val vectorLength = vectorRDD.map(x => x.toArray.length).max() 

    // create the dynamic schema 
    var schema = new StructType() 
    var i = 0 
    while (i < vectorLength) { 
    schema = schema.add(StructField(s"val${i}", DoubleType, true)) 
    i = i + 1 
    } 

    // create a rowRDD variable and make each row have the same arity 
    val rowRDD = vectorRDD.map { x => 
    var row = new Array[Double](vectorLength) 
    val newRow = x.toArray 

    System.arraycopy(newRow, 0, row, 0, newRow.length); 

    println(row.length) 

    Row.fromSeq(row) 
    } 

    // create your dataframe 
    val dataFrame = sqlContext.createDataFrame(rowRDD, schema) 

Sortie:

root 
|-- val0: double (nullable = true) 
|-- val1: double (nullable = true) 
|-- val2: double (nullable = true) 

+-----+-----+--------+ 
| val0| val1| val2| 
+-----+-----+--------+ 
|123.0|345.0|  0.0| 
|567.0|789.0|  0.0| 
|567.0|789.0|233334.0| 
+-----+-----+--------+ 
+0

Merci, dans cette solution, vous devez créer un schéma fixe. Je ne connais pas le schéma. Le schéma est variable. Ma version Spark est 1.6, non 2.0. –

+1

J'ai mis à jour la réponse pour accommoder la condition que vous avez fournie. Ce n'est pas le plus net de la solution mais ça marcherait :) –