Il semble que lorsque je demande CONCAT
sur un dataframe
dans spark sql
et magasin qui dataframe
comme fichier csv dans un endroit HDFS
, alors il y a des guillemets doubles supplémentaires ajoutés à cette seule colonne concat
dans le résultat fichier .Spark dataframe databricks csv ajoute guillemets supplémentaires
Cette double guillemets ne sont pas ajoutées lorsque j'Appy show.This guillemets doubles sont ajoutées que lorsque je stocke que dataframe
en tant que fichier csv
Il semble que je dois enlever les doubles supplémentaires citations qui est ajouté quand je enregistrez le dataframe
en tant que fichier csv.
J'utilise le pot com.databricks:spark-csv_2.10:1.1.0
Spark Version 1.5.0 est-cdh5.5.1
Entrée:
campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, 1
campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, 2
Sortie prévue:
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
Code Spark :
object campaignResultsMergerETL extends BaseETL {
val now = ApplicationUtil.getCurrentTimeStamp()
val conf = new Configuration()
val fs = FileSystem.get(conf)
val log = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
//---------------------
code for sqlContext Initialization
//---------------------
val campaignResultsDF = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
campaignResultsDF.registerTempTable("campaign_results")
val campaignGroupedDF = sqlContext.sql(
"""
|SELECT campaign_file_name,
|campaign_name,
|tracker_id,
|SUM(campaign_measure) AS campaign_measure
|FROM campaign_results
|GROUP BY campaign_file_name,campaign_name,tracker_id
""".stripMargin)
campaignGroupedDF.registerTempTable("campaign_results_full")
val campaignMergedDF = sqlContext.sql(
s"""
|SELECT campaign_file_name,
|tracker_id,
|CONCAT(campaign_name,'\"=\"' ,campaign_measure),
|"$now" AS audit_timestamp
|FROM campaign_results_full
""".stripMargin)
campaignMergedDF.show(20)
saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)
}
def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
{
log.info("saveAsCSVFile method started")
if (fs.exists(new Path(hdfs_output_loc))){
fs.delete(new Path(hdfs_output_loc), true)
}
campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
log.info("saveAsCSVFile method ended")
}
}
Résultat de campaignMergedDF.show(20)
est correct et fonctionne très bien.
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
Résultat de saveAsCSVFiles
: Ceci est incorrect.
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, "campaign_name_1""=""1", 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, "campaign_name_1""=""2", 2017-06-06 17:09:31
Quelqu'un peut-il m'aider sur ce problème?
I changé mon code selon votre suggestion et cela a fonctionné .. parfait .. Merci beaucoup .. –
@SurenderRaja - Super! :-) – Yaron