如何调整EMR上的火花作业,以便在S3上快速写入大量数据
问题描述:
我有一个火花作业,我正在做两个数据框之间的外连接。 第一个数据帧的大小为260 GB,文件格式为文本文件,分割为2200个文件,第二个数据帧的大小为2GB。如何调整EMR上的火花作业,以便在S3上快速写入大量数据
将这两个文件加载到数据框本身需要10分钟。
然后将大约260 GB的数据帧输出写入S3大约需要1个小时。
这是我的集群信息。
emr-5.9.0
Master:1m3.2xlarge
Core:c3.4large 5 machines
这里是每个c3.4xlarge机器的细节
CPU:16
RAM:30
DISK:2 × 160 GB SSD
Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2
这是我的群集配置是我设置
[
{
"Classification": "spark-defaults"
, "Properties": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
},{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.maxConnections": "200"
}
}
]
这里是我的代码
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN"))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name))
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer")
.select($"FundamentalSeriesId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"),
when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"),
when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"),
when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"),
when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"),
$"AuditID_1").otherwise($"AuditID").as("AuditID"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))
val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq
val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|")
val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear")
.format("csv")
.option("nullValue", "")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FundamentalAnalytic/output")
I除此之外,我已经尝试将数据写入HDFS,但同时也需要花费相同的时间(比S3小4分钟)才能写入HDFS目录。
添加SQL物理计划
添加DAG
内存使用我的工作的最后一个小时
下面是一些作业日志
I looking into the job execution on the below clusters .Here were some on my observations :
Majority time is being consumed at RDD getting spilling in the disk .
#########
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far)
#########
This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size :
#########################
17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead)
#########################
Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled .
You will notice the same in the nodemanager Logs :
2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used
从Web UI显示逻辑计划。做截图让我/人们了解代码的功能。谢谢。 –
我已经询问了Spark的Web UI查询的物理计划。这应该有助于我了解你的查询究竟在做什么。 –
@JacekLaskowski哦好吧..我可以看到那些在Gangelia? – SUDARSHAN