2017-10-01 12 views
-1

J'ai bien formaté le fichier texte comme ci-dessous.Pouvons-nous charger un fichier texte délimité dans une trame de données spark sans créer de schéma?

TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!| 
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!| 
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!| 
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!| 
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!| 
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!| 
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!| 
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!| 
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!| 
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!| 
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!| 
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!| 
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!| 
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!| 
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!| 

Maintenant, je dois charger ce fichier texte dans la trame de données spark.

je peux le faire comme ça

val schema = StructType(Array(

     StructField("OrgId", StringType), 
     StructField("LineItemId", StringType), 
     StructField("SegmentId", StringType), 
     StructField("SequenceId", StringType), 
     StructField("Action", StringType))) 

val textRdd1 = sc.textFile("s3://trfsdisu/SPARK/Text1.txt") 
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
var df1 = sqlContext.createDataFrame(rowRdd1, schema).drop("index") 

Mais de cette façon que j'ai créer le schéma, donc par exemple si j'ai fichier texte qui a 100 colonnes que je dois écrire 100 fois ce. Donc ce que j'ai besoin de charger des fichiers comme CSV.

val df1 = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load("s3://sdi/SPARK/FinancialLineItem/MAIN") 

Mais cela ne fonctionne pas pour moi parce que j'ai un fichier texte qui pas au format csv.

Alors est-il possible de charger le fichier texte dans le style csv dans la trame de données spark?

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
     .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), 
     when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), 
     when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), 
     when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), 
     when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), 
     when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), 
     when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), 
     when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), 
     when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), 
     when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), 
     when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), 
     when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), 
     when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), 
     when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), 
     when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), 
     when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), 
     when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), 
     when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), 
     when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), 
     when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), 
     when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), 
     when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), 
     when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), 
     when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), 
     when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 

Répondre

2

Tout en essayant de résoudre votre question, le premier problème est que je faisais face avec spark-csv, vous ne pouvez utiliser un séparateur de caractères et non un séparateur de chaîne.

La solution que je trouve est un peu délicat:

Charger les données de CSV à l'aide | comme séparateur.

import org.apache.spark.sql.SQLContext 
val sqlContext = new SQLContext(sc); 

val df = sqlContext.read.format("csv") 
      .option("header", "true") 
      .option("delimiter", "|") 
      .option("inferSchema",true") 
      .load("/home/robin/Bureau/Spark/csv_strange_delimiter.csv") 

Cela va créer une trame de données qui ressemble à ceci:

df.show 
+----------+---+----------+---+---------+---+----------+---+------+ 
|  OrgId| ^1|LineItemId| ^3|SegmentId| ^5|SequenceId| ^7|Action| 
+----------+---+----------+---+---------+---+----------+---+------+ 
|4295877341| ^|  136| ^|  4| ^|   1| ^|  I| 
|4295877346| ^|  136| ^|  4| ^|   1| ^|  I| 
|4295877341| ^|  138| ^|  2| ^|   1| ^|  I| 
|4295877341| ^|  141| ^|  4| ^|   1| ^|  I| 
|4295877341| ^|  143| ^|  2| ^|   1| ^|  I| 
|4295877341| ^|  145| ^|  14| ^|   1| ^|  I| 
| 123456789| ^|  145| ^|  14| ^|   1| ^|  I| 
+----------+---+----------+---+---------+---+----------+---+------+ 

2: Supprimer les colonnes contenant "^"

val column_to_keep = df.columns.filter(v => (!v.contains("^"))).toSeq 
val result = df.select(column_to_keep.head, column_to_keep.tail: _*) 

result.show 
+----------+----------+---------+----------+------+ 
|  OrgId|LineItemId|SegmentId|SequenceId|Action| 
+----------+----------+---------+----------+------+ 
|4295877341|  136|  4|   1| I|!|| 
|4295877346|  136|  4|   1| I|!|| 
|4295877341|  138|  2|   1| I|!|| 
|4295877341|  141|  4|   1| I|!|| 
|4295877341|  143|  2|   1| I|!|| 
|4295877341|  145|  14|   1| I|!|| 
| 123456789|  145|  14|   1| I|!|| 
+----------+----------+---------+----------+------+ 
+0

pouvez-vous aussi s'il vous plaît me dire comment puis-je ajouter |! | dans les colonnes d'action pour tous les enregistrements j'ai mis à jour mon code – SUDARSHAN

+1

Vous pouvez utiliser la fonction concate comme expliqué ici: https://stackoverflow.com/a/31452109/6138873 – jeanr

+0

Laissez-nous [continuer cette discussion dans le chat] (http: // chat.stackoverflow.com/rooms/156010/discussion-entre-sudarshan-and-jeanr). – SUDARSHAN