2017-10-11 11 views
2

ont des cas d'utilisation où nous voulons lire les fichiers de S3 qui a JSON. Ensuite, en fonction d'une valeur de nœud JSON particulière, nous voulons regrouper les données et les écrire dans S3.par colonne dans Cloisonnement Apache Spark à S3

Je suis capable de lire les données, mais pas en mesure de trouver un bon exemple sur la façon dont la partition des données repose sur la clé JSON, puis télécharger vers S3. Quelqu'un peut-il donner un exemple ou me diriger vers un tutoriel qui peut m'aider avec ce cas d'utilisation?

J'ai obtenu le schéma de mes données après la création du dataframe:

root 
|-- customer: struct (nullable = true) 
| |-- customerId: string (nullable = true) 
|-- experiment: string (nullable = true) 
|-- expiryTime: long (nullable = true) 
|-- partitionKey: string (nullable = true) 
|-- programId: string (nullable = true) 
|-- score: double (nullable = true) 
|-- startTime: long (nullable = true) 
|-- targetSets: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- featured: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- data: struct (nullable = true) 
| | | | | |-- asinId: string (nullable = true) 
| | | | |-- pk: string (nullable = true) 
| | | | |-- type: string (nullable = true) 
| | |-- reason: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- recommended: array (nullable = true) 
| | | |-- element: string (containsNull = true) 

Je veux partitionner les données basées sur le hachage aléatoire sur la colonne customerId. Mais quand je fais ceci:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save"); 

Il donne l'erreur:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true)); 

S'il vous plaît laissez-moi savoir que je peux accéder à la colonne customerId.

Répondre

3

Prenons par exemple ensemble de données sample.json

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"} 
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"} 
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"} 

il commencer à bidouiller Maintenant, avec Spark

val jsonDf = spark.read 
    .format("json") 
    .load("path/of/sample.json") 

jsonDf.show() 

+---------+-------+-----+-----+ 
|  CITY|CUST_ID|STATE| ZIP| 
+---------+-------+-----+-----+ 
| San Jose| 115734| CA|95106| 
|Allentown| 115728| PA|18101| 
|Allentown| 115730| PA|18101| 
|San Mateo| 114728| CA|94401| 
| Somerset| 114726| NJ| 8873| 
+---------+-------+-----+-----+ 

Ensuite jeu de données de la partition par colonne "ZIP" et écrire à S3

jsonDf.write 
    .partitionBy("ZIP") 
    .save("s3/bucket/location/to/save") 
    // one liner athentication to s3 
    //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save") 

Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3

Edit: Résolution: colonne de partition customerId ne se trouve pas dans le schéma (comme par commentaire)

customerId existe à l'intérieur customer struct, essayez donc extraire le customerId puis faites partition.

df.withColumn("customerId", $"customer.customerId") 
    .drop("customer") 
    .write.partitionBy("customerId") 
    .save("s3/bucket/location/to/save")