1

J'utilise la source XML de databricks. Voici mes données d'exemple XML.Source XML pour fonctionnement Spark et groupby

<ds Name="abc"> 
    <node begin="18" end="22" val="Organic" type="type1"> 
     <hs id="0" begin="18" end="91" /> 
    </node> 
    <node begin="22" end="23" val="Cereal"> 
     <hs id="0" begin="18" end="91" /> 
    </node> 
    <node begin="23" end="25" val="Kellogs" type="type2"> 
     <hs id="0" begin="18" end="91" /> 
    </node> 
    <node begin="22" end="23" val="Harry" type="type1"> 
     <hs id="1" begin="108" end="520" /> 
    </node> 
    <node begin="23" end="25" val="Potter" type="type1"> 
     <hs id="1" begin="108" end="520" /> 
    </node> 
</ds> 

Je veux combiner toutes les node.val (dans le même ordre que dans le fichier XML] regroupés par le hs id).

Par exemple, o/p pour les données ci-dessus doit être:

Nom hs id Val

abc 0 Céréales Bio

abc 1 Harry Potter

Voici où je charge la source XML à partir de databricks:

Je ne suis pas sûr comment grouper l'ensemble de données par hs id et m'assurer que la commande est retenue.

val df_ds = sqlContext.sql("SELECT Name, node.type from ds") 

Répondre

1

Essayez:

import scala.collection.mutable.LinkedHashMap 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.udf 

val comb = udf((rows: Seq[Row]) => { 
    val result = LinkedHashMap[Long, Array[String]]() 
    for (row <- rows) { 
    val id = row.getAs[Row]("hs").getAs[Long]("id") 
    result(id) = result.getOrElse(id, Array[String]()) :+ row.getAs[String]("val") 
    } 
    result.values.toArray.map(x => x.mkString(" ")) 
}) 

df.printSchema 
root 
|-- Name: string (nullable = true) 
|-- node: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- begin: long (nullable = true) 
| | |-- end: long (nullable = true) 
| | |-- hs: struct (nullable = true) 
| | | |-- #VALUE: string (nullable = true) 
| | | |-- begin: long (nullable = true) 
| | | |-- end: long (nullable = true) 
| | | |-- id: long (nullable = true) 
| | |-- type: string (nullable = true) 
| | |-- val: string (nullable = true) 

df.withColumn("comb", comb(df("node"))) 
+0

Il y avait un petit bug dans le code: je devais remplacer la ligne avec ceci: résultat (id) = result.getOrElse (id, Array [chaîne] (id .toString)): + row.getAs [String] ("pos") – user3803714