Spark SQL工作原理剖析和性能优化
一、工作原理剖析:
Spark SQL 架构中主要有这几个关键的组件:SqlParser(Sql分析程序) ,Analyser(分析器) ,Optimizer(优化器) ,SparkPlan(Spark计划)
SparkSQL大致的执行流程是这样的:
1. SQL 语句经过SqlParser 完成sql 语句的语法解析功能,解析成Unresolved LogicalPlan(未解析的逻辑计划);
2. 使用Analyser将不同来源的Unresolved LogicalPlan和元数据(如hive metastore、Schema catalog)进行绑定,生成Resolved LogicalPlan(解析后的逻辑计划);
3. 使用Optimizer 对Resolved LogicalPlan 进行优化,生成Optimized LogicalPlan(优化后的逻辑计划);
4. 使用SparkPlan 将LogicalPlan(逻辑计划) 转换成PhysicalPlan(物理计划);
5. 使用prepareForExecution() 将PhysicalPlan转换成可执行物理计划;
6. 使用execute() 执行可执行物理计划,生成SchemaRDD 即Dataset或DataFrame。
具体流程如下图所示:
二、性能优化
1. 设置Shuffle过程中的并行度:spark.sql.shuffle.partitions。默认并行度是200。
如果并行度过低,意味着每个task需要处理的数据量过大,特别是在发生数据倾斜的时候可能会造成某个task运行特别慢,最终影响Spark SQL作业的完成。
适当增多分区可以有效的减轻每个task处理的数据量,同时如果有数据倾斜发生,还能缓解数据倾斜。
调整任务并行度,代码如下:
2. 优化sql语句。例如:列裁剪,尽量给出明确的列名,比如select name from
students。不要写select *的方式。
3. 并行处理查询结果:对于Spark SQL查询的结果,如果数据量比较大,比如超过1000条,那么就不要一次性collect()到Driver再处理。因为数据collect到Driver端会造成Driver端内存的压力过大,同时数据还需要通过网络到达Driver端,这样会增加网络流量的消耗。
可以使用foreachPartition()算子,并行处理查询结果。由于数据并行保存,数据不需要拉回到Driver端,那么就减少了网络流量的消耗,同时也不会给Driver的内存带来压力。
4. spark.sql.autoBroadcastJoinThreshold,默认10485760 (10 MB)。在内存够用
的情况下,可以增加其大小,该参数设置了一个表在join的时候,最大在多大
以内,可以被广播出去优化性能。
如果是Reduce Side Join可能会出现以下问题:
1.数据倾斜问题,导致单个task任务很难计算完成,影响Spark SQL作用整体完成。
2.由于在Reduce端进行Join,意味着需要发生Shuffle,而Shuffle又是最消耗性能的,这样会降低Spark作业运行效率,增加Spark作业运行时间。
如果是Map Side Join,由于是在Map端进行Join,省去了Shuffle,没有Shuffle,Spark的作业效率会大大提高,也能避免数据倾斜。
设置spark.sql.autoBroadcastJoinThreshold参数,代码如下: