2017-10-04 3 views
0

J'utilise Spark 1.6 en scala.Spark scala - Conversion StructType imbriquée en Map

J'ai créé un index dans ElasticSearch avec un objet. L'objet "params" a été créé en tant que Map [String, Map [String, String]]. Exemple:

val params : Map[String, Map[String, String]] = ("p1" -> ("p1_detail" -> "table1"), "p2" -> (("p2_detail" -> "table2"), ("p2_filter" -> "filter2")), "p3" -> ("p3_detail" -> "table3")) 

Cela me donne des documents qui ressemblent à ce qui suit:

{ 
     "_index": "x", 
     "_type": "1", 
     "_id": "xxxxxxxxxxxx", 
     "_score": 1, 
     "_timestamp": 1506537199650, 
     "_source": { 
      "a": "toto", 
      "b": "tata", 
      "c": "description", 
      "params": { 
       "p1": { 
       "p1_detail": "table1" 
       }, 
       "p2": { 
       "p2_detail": "table2", 
       "p2_filter": "filter2" 
       }, 
       "p3": { 
       "p3_detail": "table3" 
       } 
      } 
     } 
    }, 

Je suis en train de lire l'index ElasticSearch afin de mettre à jour les valeurs.

Spark lit l'index avec le schéma suivant:

|-- a: string (nullable = true) 
|-- b: string (nullable = true) 
|-- c: string (nullable = true) 
|-- params: struct (nullable = true) 
| |-- p1: struct (nullable = true) 
| | |-- p1_detail: string (nullable = true) 
| |-- p2: struct (nullable = true) 
| | |-- p2_detail: string (nullable = true) 
| | |-- p2_filter: string (nullable = true) 
| |-- p3: struct (nullable = true) 
| | |-- p3_detail: string (nullable = true) 

Mon problème est que l'objet est lu comme un struct. Afin de gérer et de mettre à jour facilement les champs, je veux avoir une carte car je ne suis pas très familier avec StructType.

J'ai essayé d'obtenir l'objet dans une UDF comme une carte, mais je l'erreur suivante:

User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(params)' due to data type mismatch: argument 1 requires map<string,map<string,string>> type, however, 'params' is of struct<p1:struct<p1_detail:string>,p2:struct<p2_detail:string,p2_filter:string>,p3:struct<p3_detail:string>> type.; 

Code UDF extrait:

val getSubField : Map[String, Map[String, String]] => String = (params : Map[String, Map[String, String]]) => { val return_string = (params ("p1") getOrElse("p1_detail", null.asInstanceOf[String]) return_string } 

Ma question: Comment peut-on convertir cette structure à une carte? J'ai déjà lu vu la méthode toMap disponible dans la documentation mais ne trouve pas comment l'utiliser (pas très familier avec les paramètres implicites) comme je suis un débutant scala.

Merci à l'avance,

+0

pouvez-vous s'il vous plaît ajouter UDF extrait de code? –

+0

L'UDF ne va pas beaucoup aider car j'essaie juste d'obtenir une carte [String, Map [String, String]] où un Struct est attendu. –

+0

'val getSubField: Carte [Chaîne, Carte [Chaîne, Chaîne]] => Chaîne = (paramètres: Carte [Chaîne, Carte [Chaîne, Chaîne]]) => { \t val return_string = (params (" p1 ") getOrElse ("p1_detail", null.asInstanceOf [String]) \t return_string } ' –

Répondre

0

Vous ne pouvez pas spécifier le type de param comme objet StructType, au lieu de spécifier le type de ligne.

//Schema of parameter 
def schema:StructType = (new StructType).add("p1", (new StructType).add("p1_detail", StringType)) 
     .add("p2", (new StructType).add("p2_detail", StringType).add("p2_filter",StringType)) 
     .add("p3", (new StructType).add("p3_detail", StringType)) 

//Not allowed 
val extractVal: schema => collection.Map[Nothing, Nothing] = _.getMap(0) 

Solution:

// UDF example to process struct column 
val extractVal: (Row) => collection.Map[Nothing, Nothing] = _.getMap(0) 

// You would implement something similar 
    val getSubField : Map[String, Map[String, String]] => String = 
    (params : Row) => 
    { 
    val p1 = params.getAs[Row]("p1") 
    ......... 
    return null; 
    } 

J'espère que cela aide!

+0

Je vais essayer, merci beaucoup pour votre aide. –

0

J'ai finalement résolu le problème comme suit:

def convertRowToMap[T](row : Row) : Map[String, T] = { 
    row.schema.fieldNames.filter(field => !row.isNullAt(row.fieldIndex(field))).map(field => field -> row.getAs[T](field)).toMap 
} 

/* udf that converts Row to Map */ 
    val rowToMap : Row => Map[String, Map[String, String]] = (row:Row) => { 
    val map_temp = convertRowToMap[Row](row) 

    val map_to_return = map_temp.map{case(k,v) => k -> convertRowToMap[String](v)} 

    map_to_return 
} 
    val udfrowToMap = udf(rowToMap)