J'ai un fichier texte et je le lis comme fichier CSV dans le cadre de données d'étincelles. Maintenant, après avoir rejoint quand j'écris quand une fonction afin de sélectionner des colonnes, je reçois en dessous de l'exception.Lorsque la fonction ne fonctionne pas dans la trame de données d'étincelles avec le schéma de détection automatique
Voici mes fichiers csv charge de code
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
Voici mon schéma
latestForEachKey.printSchema()
root
|-- DataPartiotion: string (nullable = true)
|-- LineItem_organizationId: long (nullable = true)
|-- LineItem_lineItemId: integer (nullable = true)
|-- StatementTypeCode_1: string (nullable = true)
|-- LineItemName_1: string (nullable = true)
|-- LocalLanguageLabel_1: string (nullable = true)
|-- FinancialConceptLocal_1: string (nullable = true)
|-- FinancialConceptGlobal_1: string (nullable = true)
|-- IsDimensional_1: boolean (nullable = true)
|-- InstrumentId_1: string (nullable = true)
|-- LineItemSequence_1: string (nullable = true)
|-- PhysicalMeasureId_1: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondary_1: string (nullable = true)
|-- IsRangeAllowed_1: string (nullable = true)
|-- IsSegmentedByOrigin_1: string (nullable = true)
|-- SegmentGroupDescription_1: string (nullable = true)
|-- SegmentChildDescription_1: string (nullable = true)
|-- SegmentChildLocalLanguageLabel_1: string (nullable = true)
|-- LocalLanguageLabel_languageId_1: string (nullable = true)
|-- LineItemName_languageId_1: integer (nullable = true)
|-- SegmentChildDescription_languageId_1: string (nullable = true)
|-- SegmentChildLocalLanguageLabel_languageId_1: string (nullable = true)
|-- SegmentGroupDescription_languageId_1: string (nullable = true)
|-- SegmentMultipleFundbDescription_1: string (nullable = true)
|-- SegmentMultipleFundbDescription_languageId_1: string (nullable = true)
|-- IsCredit_1: string (nullable = true)
|-- FinancialConceptLocalId_1: string (nullable = true)
|-- FinancialConceptGlobalId_1: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondaryId_1: string (nullable = true)
|-- FFAction_1: string (nullable = true)
df1result.printSchema()
root
|-- LineItem_organizationId: long (nullable = true)
|-- LineItem_lineItemId: integer (nullable = true)
|-- StatementTypeCode: string (nullable = true)
|-- LineItemName: string (nullable = true)
|-- LocalLanguageLabel: string (nullable = true)
|-- FinancialConceptLocal: string (nullable = true)
|-- FinancialConceptGlobal: string (nullable = true)
|-- IsDimensional: boolean (nullable = true)
|-- InstrumentId: string (nullable = true)
|-- LineItemSequence: string (nullable = true)
|-- PhysicalMeasureId: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
|-- IsRangeAllowed: boolean (nullable = true)
|-- IsSegmentedByOrigin: boolean (nullable = true)
|-- SegmentGroupDescription: string (nullable = true)
|-- SegmentChildDescription: string (nullable = true)
|-- SegmentChildLocalLanguageLabel: string (nullable = true)
|-- LocalLanguageLabel_languageId: integer (nullable = true)
|-- LineItemName_languageId: integer (nullable = true)
|-- SegmentChildDescription_languageId: integer (nullable = true)
|-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
|-- SegmentGroupDescription_languageId: integer (nullable = true)
|-- SegmentMultipleFundbDescription: string (nullable = true)
|-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
|-- IsCredit: boolean (nullable = true)
|-- FinancialConceptLocalId: integer (nullable = true)
|-- FinancialConceptGlobalId: integer (nullable = true)
|-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
|-- FFAction: string (nullable = true)
C'est là que je reçois erreur
val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
when($"LineItemLineItemName_1".isNotNull, $"LineItemLineItemName_1").otherwise($"LineItemLineItemName").as("LineItemLineItemName"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed").as("IsRangeAllowed"),
when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin").as("IsSegmentedByOrigin"),
when($"SegmentGroupDescription_1".isNotNull, $"SegmentGroupDescription_1").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit").as("IsCredit"),
when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction"))
.filter(!$"FFAction".contains("D"))
dfMainOutput.write
.format("csv")
.option("quote", "\uFEFF")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
ci-dessous est mon exception
org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`IsRangeAllowed_1` IS NOT NULL) THEN `IsRangeAllowed_1` ELSE `IsRangeAllowed` END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;
Je n'ai mentionné aucun type lors du chargement de CSV.
Je suis le chargement à la fois en tant que fichier csv sans schéma, mais dans un IsRangeAllowed_1 est une chaîne où, comme dans un autre, il est BooleanType
Et avec cela, il y a une question que je veux poser. Comment pouvons-nous supprimer le délimiteur par défaut dans la sortie de trame de données et mettre notre délimiteur personnalisé avec partition et avec la compression gzip?
dfMainOutput.rdd.saveAsTextFile("s3://trfsdisu/SPARK/FinancialLineItem/output")
Lorsque la fonction fonctionne bien, merci ... Je suis à la recherche sur delimiter maintenant .just une chose Si je fais partitionnement puis la colonne sur laquelle je fais partition apparaît dans la sortie .. –
Oui, la colonne de partition n'apparaîtra pas dans le fichier de sortie. Mais présent dans le chemin. – mrsrinivas
Y at-il un moyen que je peux ajouter dans le fichier de sortie .. Et dans l'étincelle créer un répertoire pas un fichier .. Je dois faire manuellement –