2017-07-25 2 views
0

J'essaie d'utiliser Spark DataSet pour charger des données assez importantes de (disons) personnes où les données du sous-ensemble se présentent comme suit.Transformations relationnelles dans Spark

|age|maritalStatus| name|sex| 
+---+-------------+--------+---+ 
| 35|   M| Joanna| F| 
| 25|   S|Isabelle| F| 
| 19|   S| Andy| M| 
| 70|   M| Robert| M| 
+---+-------------+--------+---+ 

Mon besoin est d'avoir des transformations relationnelles où une colonne tire sa valeur d'autre colonne (s). Par exemple, en fonction de l'âge & "sexe" de chaque enregistrement de personne, je dois mettre M. ou Mme/Mme devant chaque attribut "nom". Un autre exemple, c'est que pour une personne avec "l'âge" de plus de 60 ans, je dois le marquer comme un Senior Citizen (colonne dérivée "seniorCitizen" comme Y).

Mon besoin final de données transformées est la suivante:

+---+-------------+---------------------------+---+ 
|age|maritalStatus|   name|seniorCitizen|sex| 
+---+-------------+---------------------------+---+ 
| 35|   M| Mrs. Joanna|   N| F| 
| 25|   S| Ms. Isabelle|   N| F| 
| 19|   S|  Mr. Andy|   N| M| 
| 70|   M| Mr. Robert|   Y| M| 
+---+-------------+--------+------------------+---+ 

La plupart des transformations que Spark fournit sont assez statiques et non dyanmic. Par exemple, comme défini dans les exemples here et here. J'utilise des jeux de données Spark parce que je charge à partir d'une source de données relationnelle, mais si vous pouvez suggérer une meilleure façon de le faire en utilisant des RDD simples, veuillez le faire.

+1

vous pouvez utiliser Dataframes et UDFs pour le faire. –

+0

Je suppose que votre transformation de nom devrait dépendre de maritalStatus et non de l'âge, n'est-ce pas? –

Répondre

2

Vous pouvez utiliser withColumn pour ajouter une nouvelle colonne, pour seniorCitizen à l'utilisation where clause et pour la mise à jour name vous pouvez utiliser une fonction définie par l'utilisateur (udf) comme ci-dessous

import spark.implicits._ 

import org.apache.spark.sql.functions._ 
//create a dummy data 
val df = Seq((35, "M", "Joanna", "F"), 
    (25, "S", "Isabelle", "F"), 
    (19, "S", "Andy", "M"), 
    (70, "M", "Robert", "M") 
).toDF("age", "maritalStatus", "name", "sex") 

// create a udf to update name according to age and sex 
val append = udf((name: String, maritalStatus:String, sex: String) => { 
    if (sex.equalsIgnoreCase("F") && maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}" 
    else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}" 
    else s"Mr. ${name}" 
}) 

//add two new columns with using withColumn 
df.withColumn("name", append($"name", $"maritalStatus", $"sex")) 
    .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show 

Sortie:

+---+-------------+------------+---+-------------+ 
|age|maritalStatus|  name|sex|seniorCitizen| 
+---+-------------+------------+---+-------------+ 
| 35|   M| Mrs. Joanna| F|   N| 
| 25|   S|Ms. Isabelle| F|   N| 
| 19|   S| Mr. Andy| M|   N| 
| 70|   M| Mr. Robert| M|   Y| 
+---+-------------+------------+---+-------------+ 

EDIT:

Voici la sortie sans utiliser UDF

df.withColumn("name", 
    when($"sex" === "F", when($"maritalStatus" === "M", concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name")))) 
    .otherwise(concat(lit("Ms. "), df("name")))) 
    .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")) 

Espérons que cela aide!

+0

J'espère que cela a aidé :) –

1

Spark functions peut vous aider à faire votre travail. Vous pouvez combiner when, concat, lit fonctions comme indiqué ci-dessous

val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name")) 
         .otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name")) 
         .otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name")))) 

val updatedDataSet = dataset.withColumn("name", updateName) 
    .withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N")) 

updatedDataSet est votre nécessaire dataset