6

J'ai un travail d'étincelle où je fais la jointure externe entre deux cadres de données. La taille de la première trame de données est de 260 Go, le format de fichier est de fichiers texte qui est divisé en 2200 fichiers et la taille de la deuxième trame de données est de 2 Go.Comment régler le travail d'étincelle sur EMR pour écrire des données énormes rapidement sur S3

Le chargement de ce fichier de deux dans le cadre de données lui-même prend 10 minutes.

L'écriture d'une trame de données d'environ 260 Go dans S3 prend environ 1 heure.

Voici mes informations de cluster.

emr-5.9.0 
Master:1m3.2xlarge 
Core:c3.4large 5 machines 

voici les détails de chaque machine c3.4xlarge

CPU:16 
RAM:30 
DISK:2 × 160 GB SSD 

Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2 

Voici ma config de cluster que je suis en train

[ 
    { 
     "Classification": "spark-defaults" 
     , "Properties": { 
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer" 
     } 
    },{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.maxConnections": "200" 
    } 
    } 
] 

Voici mon code

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import java.io.File 
    import org.apache.hadoop.fs._ 



import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 


val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) 
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4)) 


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN")) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name)) 
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name)) 


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 

import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 


val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer") 
     .select($"FundamentalSeriesId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"), 
     when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"), 
     when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"), 
     when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"), 
     when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"), 
     $"AuditID_1").otherwise($"AuditID").as("AuditID"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 



    val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated")) 

    val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) 

    val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq 

    val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|") 

    val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) 
    dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear") 
     .format("csv") 
     .option("nullValue", "") 
     .option("header", "true") 
     .option("codec", "gzip") 
     .save("s3://trfsdisu/SPARK/FundamentalAnalytic/output") 

I En plus de cela, j'ai essayé d'écrire des données dans HDFS d'abord, mais il prend également le même temps (4 minutes de moins que le S3) pour écrire dans le répertoire HDFS.

Ajout SQL physique Plan

enter image description here

toutes les étapes DAG enter image description here

Ajout DAG

enter image description here

Mémoire utilisée la dernière heure de mon travail

enter image description here

Voici quelques journaux d'emploi

I looking into the job execution on the below clusters .Here were some on my observations : 

Majority time is being consumed at RDD getting spilling in the disk . 

######### 

17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far) 

######### 

This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size : 

######################### 
    17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead) 
######################### 

Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled . 

You will notice the same in the nodemanager Logs : 

2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used 
+0

Afficher les plans logiques à partir de l'interface utilisateur Web. Faites des captures d'écran pour me donner une idée de ce que fait le code. Merci. –

+0

J'ai demandé des informations sur les plans physiques pour les requêtes de l'interface Web de Spark. Cela devrait m'aider à savoir exactement ce que font vos requêtes. –

+0

@JacekLaskowski Oh, d'accord .. Puis-je voir ceux de Gangelia? – SUDARSHAN

Répondre

1

Vous exécutez cinq instances de c3.4large EC2, qui a 30 Go de RAM chacun. Donc, ce n'est que 150Go au total, ce qui est beaucoup plus petit que votre dataframe> 200Go à rejoindre. D'où beaucoup de déversement de disque. Vous pouvez peut-être lancer des instances de type EC2 (mémoire optimisée par rapport au type c qui est optimisé pour le calcul) et voir s'il y a une amélioration des performances.

+0

Même si j'utilise r4.4xlarge alors aussi cela prend presque la même quantité de temps ...Le noyau V utilisé est toujours 1 – SUDARSHAN

+1

@SUDARSHAN ajoutez cette propriété à votre configuration de cluster '[" "classification": "spark", "properties": {"maximizeResourceAllocation": "true"}}] ' – Will