如何调整EMR上的火花作业,以便在S3上快速写入大量数据

问题描述:

我有一个火花作业,我正在做两个数据框之间的外连接。 第一个数据帧的大小为260 GB,文件格式为文本文件,分割为2200个文件,第二个数据帧的大小为2GB。如何调整EMR上的火花作业,以便在S3上快速写入大量数据

将这两个文件加载到数据框本身需要10分钟。

然后将大约260 GB的数据帧输出写入S3大约需要1个小时。

这是我的集群信息。

emr-5.9.0 
Master:1m3.2xlarge 
Core:c3.4large 5 machines 

这里是每个c3.4xlarge机器的细节

CPU:16 
RAM:30 
DISK:2 × 160 GB SSD 

Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2 

这是我的群集配置是我设置

[ 
    { 
     "Classification": "spark-defaults" 
     , "Properties": { 
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer" 
     } 
    },{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.maxConnections": "200" 
    } 
    } 
] 

这里是我的代码

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import java.io.File 
    import org.apache.hadoop.fs._ 



import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 


val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) 
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4)) 


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN")) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name)) 
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name)) 


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 

import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 


val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer") 
     .select($"FundamentalSeriesId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"), 
     when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"), 
     when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"), 
     when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"), 
     when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"), 
     $"AuditID_1").otherwise($"AuditID").as("AuditID"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 



    val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated")) 

    val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) 

    val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq 

    val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|") 

    val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) 
    dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear") 
     .format("csv") 
     .option("nullValue", "") 
     .option("header", "true") 
     .option("codec", "gzip") 
     .save("s3://trfsdisu/SPARK/FundamentalAnalytic/output") 

I除此之外,我已经尝试将数据写入HDFS,但同时也需要花费相同的时间(比S3小4分钟)才能写入HDFS目录。

添加SQL物理计划

enter image description here

所有阶段DAG enter image description here

添加DAG

enter image description here

内存使用我的工作的最后一个小时

enter image description here

下面是一些作业日志

I looking into the job execution on the below clusters .Here were some on my observations : 

Majority time is being consumed at RDD getting spilling in the disk . 

######### 

17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far) 

######### 

This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size : 

######################### 
    17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead) 
######################### 

Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled . 

You will notice the same in the nodemanager Logs : 

2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used 
+0

从Web UI显示逻辑计划。做截图让我/人们了解代码的功能。谢谢。 –

+0

我已经询问了Spark的Web UI查询的物理计划。这应该有助于我了解你的查询究竟在做什么。 –

+0

@JacekLaskowski哦好吧..我可以看到那些在Gangelia? – SUDARSHAN

你跑5个c3.4large EC2实例,其中有每个RAM 30GB。因此总共只有150GB,这比你要加入的大于200GB的数据帧要小得多。因此大量的磁盘溢出。也许你可以启动r类型的EC2实例(优化内存,而不是优化计算的c类型),并查看是否有性能改进。

+0

即使我使用r4.4xlarge,也需要几乎相同的时间...使用的V核仍然是1 – SUDARSHAN

+1

@SUDARSHAN将此属性添加到您的群集配置'[{“classification”:“spark”,“properties”:{“maximizeResourceAllocation”:“true”}}]' – Will