2017-10-05 3 views
1

Voici comment je charge mon fichier csv dans le cadre de données d'allumageComment diviser le nom du fichier d'entrée et ajouter de la valeur spécifique dans la colonne de trame de données d'allumage

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.implicits._ 

import org.apache.spark.{ SparkConf, SparkContext } 
import java.sql.{Date, Timestamp} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.udf 



val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4)) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 
val df1Final=df1result.withColumn("DataPartition", lit(null: String)) 

C'est par exemple d'un de mon nom de fichier d'entrée.

Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full 

Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt 

Maintenant, je veux lire ce fichier et le diviser par "." opérateur, puis ajoutez CUS comme nouvelle colonne à la place de DataPartition. Puis-je le faire sans UDF?

Voici le schéma de trame de données existantes

root 
|-- LineItem_organizationId: long (nullable = true) 
|-- LineItem_lineItemId: integer (nullable = true) 
|-- StatementTypeCode: string (nullable = true) 
|-- LineItemName: string (nullable = true) 
|-- LocalLanguageLabel: string (nullable = true) 
|-- FinancialConceptLocal: string (nullable = true) 
|-- FinancialConceptGlobal: string (nullable = true) 
|-- IsDimensional: boolean (nullable = true) 
|-- InstrumentId: string (nullable = true) 
|-- LineItemSequence: string (nullable = true) 
|-- PhysicalMeasureId: string (nullable = true) 
|-- FinancialConceptCodeGlobalSecondary: string (nullable = true) 
|-- IsRangeAllowed: boolean (nullable = true) 
|-- IsSegmentedByOrigin: boolean (nullable = true) 
|-- SegmentGroupDescription: string (nullable = true) 
|-- SegmentChildDescription: string (nullable = true) 
|-- SegmentChildLocalLanguageLabel: string (nullable = true) 
|-- LocalLanguageLabel_languageId: integer (nullable = true) 
|-- LineItemName_languageId: integer (nullable = true) 
|-- SegmentChildDescription_languageId: integer (nullable = true) 
|-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true) 
|-- SegmentGroupDescription_languageId: integer (nullable = true) 
|-- SegmentMultipleFundbDescription: string (nullable = true) 
|-- SegmentMultipleFundbDescription_languageId: integer (nullable = true) 
|-- IsCredit: boolean (nullable = true) 
|-- FinancialConceptLocalId: integer (nullable = true) 
|-- FinancialConceptGlobalId: integer (nullable = true) 
|-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true) 
|-- FFAction: string (nullable = true) 

Mise à jour du code après suggéré réponse

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import org.apache.spark.sql.functions.{input_file_name, regexp_extract} 

spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4)) 

import org.apache.spark.sql.functions.input_file_name 

val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

df1result.withColumn("cus_val", get_cus_val(input_file_name)) 

df1result.printSchema() 
+0

Est-ce que l'extraction de texte entre '' SelfSourcedPrivate.' et un point (.) 'Fonctionne pour vous? (Supposons que la sous-chaîne 'SelfSourcedPrivate.CUS.' soit donnée) – mrsrinivas

Répondre

0

Vous pouvez obtenir le nom de fichier avec UDF prédéfini à savoir input_file_name(), après que ce soit vous pouvez créer un UDF pour extraire CUS ou utiliser regexp_extract avec UDF.

En utilisant regexp_extract wo UDFregex usage here

import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 

df.withColumn("cus_val", 
    regexp_extract(input_file_name, "\.(\w+)\.[0-9]+\.", 1)) 

aide personnalisée UDF

import org.apache.spark.sql.functions.udf 

val get_cus_val = udf(filePath: String => filePath.split("\\.")(4)) 

import org.apache.spark.sql.functions.input_file_name 

df.withColumn("cus_val", get_cus_val(input_file_name))