2017-10-04 2 views
1

J'ai un fichier texte et je le lis comme fichier CSV dans le cadre de données d'étincelles. Maintenant, après avoir rejoint quand j'écris quand une fonction afin de sélectionner des colonnes, je reçois en dessous de l'exception.Lorsque la fonction ne fonctionne pas dans la trame de données d'étincelles avec le schéma de détection automatique

Voici mes fichiers csv charge de code

val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 



import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 

Voici mon schéma

latestForEachKey.printSchema() 

root 
|-- DataPartiotion: string (nullable = true) 
|-- LineItem_organizationId: long (nullable = true) 
|-- LineItem_lineItemId: integer (nullable = true) 
|-- StatementTypeCode_1: string (nullable = true) 
|-- LineItemName_1: string (nullable = true) 
|-- LocalLanguageLabel_1: string (nullable = true) 
|-- FinancialConceptLocal_1: string (nullable = true) 
|-- FinancialConceptGlobal_1: string (nullable = true) 
|-- IsDimensional_1: boolean (nullable = true) 
|-- InstrumentId_1: string (nullable = true) 
|-- LineItemSequence_1: string (nullable = true) 
|-- PhysicalMeasureId_1: string (nullable = true) 
|-- FinancialConceptCodeGlobalSecondary_1: string (nullable = true) 
|-- IsRangeAllowed_1: string (nullable = true) 
|-- IsSegmentedByOrigin_1: string (nullable = true) 
|-- SegmentGroupDescription_1: string (nullable = true) 
|-- SegmentChildDescription_1: string (nullable = true) 
|-- SegmentChildLocalLanguageLabel_1: string (nullable = true) 
|-- LocalLanguageLabel_languageId_1: string (nullable = true) 
|-- LineItemName_languageId_1: integer (nullable = true) 
|-- SegmentChildDescription_languageId_1: string (nullable = true) 
|-- SegmentChildLocalLanguageLabel_languageId_1: string (nullable = true) 
|-- SegmentGroupDescription_languageId_1: string (nullable = true) 
|-- SegmentMultipleFundbDescription_1: string (nullable = true) 
|-- SegmentMultipleFundbDescription_languageId_1: string (nullable = true) 
|-- IsCredit_1: string (nullable = true) 
|-- FinancialConceptLocalId_1: string (nullable = true) 
|-- FinancialConceptGlobalId_1: string (nullable = true) 
|-- FinancialConceptCodeGlobalSecondaryId_1: string (nullable = true) 
|-- FFAction_1: string (nullable = true) 

df1result.printSchema() 

root 
|-- LineItem_organizationId: long (nullable = true) 
|-- LineItem_lineItemId: integer (nullable = true) 
|-- StatementTypeCode: string (nullable = true) 
|-- LineItemName: string (nullable = true) 
|-- LocalLanguageLabel: string (nullable = true) 
|-- FinancialConceptLocal: string (nullable = true) 
|-- FinancialConceptGlobal: string (nullable = true) 
|-- IsDimensional: boolean (nullable = true) 
|-- InstrumentId: string (nullable = true) 
|-- LineItemSequence: string (nullable = true) 
|-- PhysicalMeasureId: string (nullable = true) 
|-- FinancialConceptCodeGlobalSecondary: string (nullable = true) 
|-- IsRangeAllowed: boolean (nullable = true) 
|-- IsSegmentedByOrigin: boolean (nullable = true) 
|-- SegmentGroupDescription: string (nullable = true) 
|-- SegmentChildDescription: string (nullable = true) 
|-- SegmentChildLocalLanguageLabel: string (nullable = true) 
|-- LocalLanguageLabel_languageId: integer (nullable = true) 
|-- LineItemName_languageId: integer (nullable = true) 
|-- SegmentChildDescription_languageId: integer (nullable = true) 
|-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true) 
|-- SegmentGroupDescription_languageId: integer (nullable = true) 
|-- SegmentMultipleFundbDescription: string (nullable = true) 
|-- SegmentMultipleFundbDescription_languageId: integer (nullable = true) 
|-- IsCredit: boolean (nullable = true) 
|-- FinancialConceptLocalId: integer (nullable = true) 
|-- FinancialConceptGlobalId: integer (nullable = true) 
|-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true) 
|-- FFAction: string (nullable = true) 

C'est là que je reçois erreur

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
    .select($"LineItem_organizationId", $"LineItem_lineItemId", 
    when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
    when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), 
    when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), 
    when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), 
    when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), 
    when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), 
    when($"LineItemLineItemName_1".isNotNull, $"LineItemLineItemName_1").otherwise($"LineItemLineItemName").as("LineItemLineItemName"), 
    when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
    when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), 
    when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed").as("IsRangeAllowed"), 
    when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin").as("IsSegmentedByOrigin"), 
    when($"SegmentGroupDescription_1".isNotNull, $"SegmentGroupDescription_1").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), 
    when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), 
    when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), 
    when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), 
    when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), 
    when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), 
    when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), 
    when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), 
    when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), 
    when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), 
    when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit").as("IsCredit"), 
    when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), 
    when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), 
    when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), 
    when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction")) 
    .filter(!$"FFAction".contains("D")) 


dfMainOutput.write 
    .format("csv") 
    .option("quote", "\uFEFF") 
    .option("codec", "gzip") 
    .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

ci-dessous est mon exception

org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`IsRangeAllowed_1` IS NOT NULL) THEN `IsRangeAllowed_1` ELSE `IsRangeAllowed` END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type; 

Je n'ai mentionné aucun type lors du chargement de CSV.

Je suis le chargement à la fois en tant que fichier csv sans schéma, mais dans un IsRangeAllowed_1 est une chaîne où, comme dans un autre, il est BooleanType

Et avec cela, il y a une question que je veux poser. Comment pouvons-nous supprimer le délimiteur par défaut dans la sortie de trame de données et mettre notre délimiteur personnalisé avec partition et avec la compression gzip?

dfMainOutput.rdd.saveAsTextFile("s3://trfsdisu/SPARK/FinancialLineItem/output") 

Répondre

1

Pour la première question, i, e QUAND THEN et ELSE expressions doivent tous être le même type ou un type commun;

Mais ici IsRangeAllowed est Boolean et IsRangeAllowed_1 est String. Convertissez donc l'une des colonnes en String ou Boolean. Ainsi, le changement de code pourrait être

import org.apache.spark.sql.types.DataTypes 

when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1") 
    .otherwise($"IsRangeAllowed".cast(DataTypes.StringType)) 
    .as("IsRangeAllowed") 

Comment peut-on supprimer delimiter par défaut dans la sortie de trame de données et de mettre notre délimiteur personnalisé avec partition et avec une compression gzip?

Un DataFrame peut être sauvé directement avec delimiter et codec, sans avoir besoin d'appeler la rdddfMainOutput.rdd sous-jacente par exemple. à savoir:

dfMainOutput.write 
    .format("csv") 
    .option("delimiter", "!") 
    .option("codec", "gzip") 
    .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

Edit: selon le commentaire CONCAT_WS exemple

df.withColumn("colmn", concat_ws("|!|", $"IsRangeAllowed_1", "IsRangeAllowed", ...) 
    .selectExpr("colmn") 
    .show() 

//to add all columns in df 
df.withColumn("colmn", concat_ws("|!|", df.cols:_*)) 
    .selectExpr("colmn") 
    .show() 
+0

Lorsque la fonction fonctionne bien, merci ... Je suis à la recherche sur delimiter maintenant .just une chose Si je fais partitionnement puis la colonne sur laquelle je fais partition apparaît dans la sortie .. –

+0

Oui, la colonne de partition n'apparaîtra pas dans le fichier de sortie. Mais présent dans le chemin. – mrsrinivas

+0

Y at-il un moyen que je peux ajouter dans le fichier de sortie .. Et dans l'étincelle créer un répertoire pas un fichier .. Je dois faire manuellement –