0

J'ai créé un parquet structure à partir d'un fichier csv en utilisant étincelle:incompatibilité de type de données tout en transformant les données dans les données étincelle

Dataset<Row> df = park.read().format("com.databricks.spark.csv").option("inferSchema", "true") 
      .option("header", "true").load("sample.csv"); 
df.write().parquet("sample.parquet"); 

Je lis le parquet structure et je suis en train de transformer les données en un jeu de données:

Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("sample.parquet"); 
df.createOrReplaceTempView("tmpview"); 
Dataset<Row> namesDF = spark.sql("SELECT *, md5(station_id) as hashkey FROM tmpview"); 

Malheureusement, j'obtiens une erreur de concordance de type de données. Dois-je attribuer explicitement des types de données?

17/04/12 09:21:52 INFO SparkSqlParser: commande d'analyse: SELECT *, md5 (station_id) comme à partir de hashkey tmpview Exception in thread "main" org.apache.spark.sql.AnalysisException : impossible de résoudre 'md5 (tmpview station_id)' en raison de la non-concordance de type de données: argument 1 nécessite le type binaire, cependant, 'tmpview. station_id 'est de type int .; ligne 1 pos 10; 'Projet [station_id # 0, vélos_available # 1, docks_available # 2, heure # 3, md5 (station_id # 0) AS hashkey # 16] + - Sous-requête tmpview, tmpview + - Relation [station_id # 0, vélos_available # 1, docks_available # 2, # 3 fois] parquet

Répondre

1

Oui, comme par Spark documentation, la fonction md5 ne fonctionne que sur binary (text/string) colonnes de sorte que vous devez jeter station_id en string avant d'appliquer md5. Dans SQL Spark, vous pouvez enchaîner les deux md5 et cast ensemble, .: par exemple

Dataset<Row> namesDF = spark.sql("SELECT *, md5(cast(station_id as string)) as hashkey FROM tmpview"); 

vous pouvez également créer une nouvelle colonne dans dataframe et appliquer md5 dessus, .: par exemple

val newDf = df.withColumn("station_id_str", df.col("station_id").cast(StringType)) 
newDf.createOrReplaceTempView("tmpview"); 
Dataset<Row> namesDF = spark.sql("SELECT *, md5(station_id_str) as hashkey FROM tmpview");