J'ai créé des données dans Spark, puis effectué une enfin, opération de jointure je dois enregistrer la sortie des fichiers partitionnés.Comment faire partition personnalisée en dataframe d'allumage avec saveAsTextFile
Je convertis trame de données dans RDD et puis enregistrez sous forme de fichier texte qui me permet d'utiliser plusieurs delimiter-char. Ma question est de savoir comment utiliser les colonnes dataframe en tant que partition personnalisée dans ce cas.
Je ne peux pas utiliser ci-dessous option pour la partition personnalisée, car il ne supporte pas delimiter multi-CHAR:
dfMainOutput.write.partitionBy("DataPartiotion","StatementTypeCode")
.format("csv")
.option("delimiter", "^")
.option("nullValue", "")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
Pour utiliser delimiter multi-ombles j'ai converti cela en RDD comme ci-dessous le code:
dfMainOutput.rdd.map(x=>x.mkString("|^|")).saveAsTextFile("dir path to store")
Mais dans l'option ci-dessus comment pourrais-je partition personnalisée basée sur les colonnes « DataPartiotion » et « StatementTypeCode »?
Dois-je reconvertir à nouveau de RDD à un dataframe?
Voici mon code que j'ai essayé
val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition_1").as("DataPartition_1"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"),
when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"),
when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"),
when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"),
when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"),
when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction")))
.filter(!$"FFAction".contains("D"))
val dfMainOutputFinal = dfMainOutput.select(concat_ws("|^|", columns.map(c => col(c)): _*).as("concatenated"))
dfMainOutputFinal.write.partitionBy("DataPartition_1","StatementTypeCode")
.format("csv")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
@Anupam vous pouvez simplement remplacer "col1", "col2", "col3" avec les colonnes que vous voulez utiliser. Tout faire directement sur la base de données devrait être meilleur du point de vue de la performance, mais la reconvertir devrait toujours être rapide (puisqu'il ne s'agit que d'une seule colonne). – Shaido
Il y a un problème avec ceci. Là où ',' est trouvé dans les disques j'obtiens '" "' dans la sortie finale de csv ... J'ai créé la question distincte pour ceci https://stackoverflow.com/questions/ 47002414/add-custom-delimiter-ajoute-double-quotes-dans-le-dernier-spark-data-frame-csv-out/47003011? Noredirect = 1 # comment80963283_47003011 –