ont des cas d'utilisation où nous voulons lire les fichiers de S3 qui a JSON. Ensuite, en fonction d'une valeur de nœud JSON particulière, nous voulons regrouper les données et les écrire dans S3.par colonne dans Cloisonnement Apache Spark à S3
Je suis capable de lire les données, mais pas en mesure de trouver un bon exemple sur la façon dont la partition des données repose sur la clé JSON, puis télécharger vers S3. Quelqu'un peut-il donner un exemple ou me diriger vers un tutoriel qui peut m'aider avec ce cas d'utilisation?
J'ai obtenu le schéma de mes données après la création du dataframe:
root
|-- customer: struct (nullable = true)
| |-- customerId: string (nullable = true)
|-- experiment: string (nullable = true)
|-- expiryTime: long (nullable = true)
|-- partitionKey: string (nullable = true)
|-- programId: string (nullable = true)
|-- score: double (nullable = true)
|-- startTime: long (nullable = true)
|-- targetSets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- featured: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- data: struct (nullable = true)
| | | | | |-- asinId: string (nullable = true)
| | | | |-- pk: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- reason: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- recommended: array (nullable = true)
| | | |-- element: string (containsNull = true)
Je veux partitionner les données basées sur le hachage aléatoire sur la colonne customerId. Mais quand je fais ceci:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
Il donne l'erreur:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
S'il vous plaît laissez-moi savoir que je peux accéder à la colonne customerId.