2017-10-06 3 views
1

J'ai créé des données dans Spark, puis effectué une enfin, opération de jointure je dois enregistrer la sortie des fichiers partitionnés.Comment faire partition personnalisée en dataframe d'allumage avec saveAsTextFile

Je convertis trame de données dans RDD et puis enregistrez sous forme de fichier texte qui me permet d'utiliser plusieurs delimiter-char. Ma question est de savoir comment utiliser les colonnes dataframe en tant que partition personnalisée dans ce cas.

Je ne peux pas utiliser ci-dessous option pour la partition personnalisée, car il ne supporte pas delimiter multi-CHAR:

dfMainOutput.write.partitionBy("DataPartiotion","StatementTypeCode") 
    .format("csv") 
    .option("delimiter", "^") 
    .option("nullValue", "") 
    .option("codec", "gzip") 
    .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

Pour utiliser delimiter multi-ombles j'ai converti cela en RDD comme ci-dessous le code:

dfMainOutput.rdd.map(x=>x.mkString("|^|")).saveAsTextFile("dir path to store") 

Mais dans l'option ci-dessus comment pourrais-je partition personnalisée basée sur les colonnes « DataPartiotion » et « StatementTypeCode »?

Dois-je reconvertir à nouveau de RDD à un dataframe?

Voici mon code que j'ai essayé

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
     .select($"LineItem_organizationId", $"LineItem_lineItemId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition_1").as("DataPartition_1"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), 
     when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), 
     when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), 
     when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), 
     when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), 
     when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), 
     when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), 
     when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), 
     when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), 
     when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), 
     when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), 
     when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), 
     when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), 
     when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), 
     when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), 
     when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), 
     when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), 
     when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), 
     when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), 
     when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), 
     when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), 
     when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), 
     when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), 
     when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), 
     when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) 
     .filter(!$"FFAction".contains("D")) 

val dfMainOutputFinal = dfMainOutput.select(concat_ws("|^|", columns.map(c => col(c)): _*).as("concatenated")) 

    dfMainOutputFinal.write.partitionBy("DataPartition_1","StatementTypeCode") 
    .format("csv") 
    .option("codec", "gzip") 
    .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

Répondre

1

Cela peut être fait en utilisant concat_ws, cette fonction fonctionne de façon similaire à mkString mais peut être effectuée sur directement sur dataframe. Cela rend l'étape de conversion à redondante rdd et la méthode df.write.partitionBy() peut être utilisée. Un petit exemple qui concaténer toutes les colonnes disponibles,

import org.apache.spark.sql.functions._ 
import spark.implicits._ 

val df = Seq(("01", "20000", "45.30"), ("01", "30000", "45.30")) 
    .toDF("col1", "col2", "col3") 

val df2 = df.select($"DataPartiotion", $"StatementTypeCode", 
    concat_ws("|^|", df.schema.fieldNames.map(c => col(c)): _*).as("concatenated")) 

Cela vous donnera un résultat dataframe comme celui-ci,

+--------------+-----------------+------------------+ 
|DataPartiotion|StatementTypeCode|  concatenated| 
+--------------+-----------------+------------------+ 
|   01|   20000|01|^|20000|^|45.30| 
|   01|   30000|01|^|30000|^|45.30| 
+--------------+-----------------+------------------+ 
+1

@Anupam vous pouvez simplement remplacer "col1", "col2", "col3" avec les colonnes que vous voulez utiliser. Tout faire directement sur la base de données devrait être meilleur du point de vue de la performance, mais la reconvertir devrait toujours être rapide (puisqu'il ne s'agit que d'une seule colonne). – Shaido

+0

Il y a un problème avec ceci. Là où ',' est trouvé dans les disques j'obtiens '" "' dans la sortie finale de csv ... J'ai créé la question distincte pour ceci https://stackoverflow.com/questions/ 47002414/add-custom-delimiter-ajoute-double-quotes-dans-le-dernier-spark-data-frame-csv-out/47003011? Noredirect = 1 # comment80963283_47003011 –