spark 任务运行机制

(图文结合一起看)
1 spark 任务 从提交任务开始 : spark-submiit --master …
2 会去调用一个脚本 spark-class,然后这个类会调用spark-submit的 main 方法
3 这个main方法启动之后会通过反射调用我们自己写的程序的main方法.
4 自己写的main方法启动之后 首先会 new sparkconf 这个参数主要设置master 和一些运行模
式的 参数
5 然后 new sparkcontext 而sparkcontext 里面会创建下列东西
5-1 创建 taskScheduler(这个是一个特质) 的实现类
5-2 new DAGScheduler (负责调度DAG的)
5-3 new StandaloneSchedulerBackend (Standalone 模式运行下) 然后会去调用
StandaloneSchedulerBackend.start方法 ,start 里面会创建ClientEndPoint(负责与
master通信)
5-4 然后调用 StandaloneSchedulerBackend 父类的start 方法 ,他父类的start方法里面会
创建DriverEndPoint(负责与executor通信)
5-5 ClientEndPoint 向master注册Application 会附带一个command,里面封装了启动
executor 命令
5-6 master 接收到Driver端的任务请求后后遍历自己空闲的worker,根据请求任务的资源
分配资源,向worker发送命令启动executor
5-7 worker启动executor,并向Driver注册
6 解析程序 并记录各个RDD之间的依赖关系.当解析到action算子的时候就会触发任务的提交.
产生逻辑上的DAG(有向无环图)
7 DSGScheduler根据DAG有向无环图切分stage(具体就是 调用DAGScheduler.runJob----->调
用 DAGScheduler.handleJobSubmit(),然后切分DAG有向无图stage)(Stage一共有两类,一类
是shuffleMapStage,一类是ResultStage)
切分原理 :
利用 finalRDD创建一个ResultStage实例: finalStage. 然后从finalStage开始,根据RDD的依赖
关系, 从后往前追溯,发现依赖关系是窄依赖就把父RDD加到Stage中,一旦发现依赖关系是宽
依赖,就终止当前stage,创建一个新的stage,继续往前追溯.直到追溯到第一个RDD,该RDD没有
父依赖,切分结束
8 提交Stage
提交原理:
从finalStage开始,做深度优先遍历,知道发现stage没有父Stage,就开始提交.执行完后继续提交
下后面的stage
9 为提交的stage创建task
创建stage 的原理:
根据stage的类型,创建对应类型的task (shuffleMapStage创建shuffleMaptask,ResultStage
创建Resulttask)
根据stage最后一个RDD的分区数,创建对应数量的task
10 DAGScheduler把Task以TaskSet提交给TaskScheduler —>调用
TaskScheduler.taskSubmit(new TaskSet(里面封装了一个stage的所有task))
11 TaskScheduler提交task给executor执行
提交原理
遍历TaskSet,获取每一个task
把task序列化发送给executor执行
12 executor接收到序列化的task 会做一下事情
12-1 反序列化,得到task
12-2 用taskRunner封装反序列化的task,该taskRuner继承了Runnable接口
12-3 然后把taskRuner 放入executor拥有的线程池中,等待被调度
12-4 一旦有资源taskRuner 的run方法被调用,该run方法调用了task的run方法
12-5 task的run方法,调用runTask方法 (会根据类型调用对应类型的runTask方法)
12-6 runTask方法执行后,才真正执行程序的业务逻辑 是并行执行的
shuffleMaptask—会执行一个shuffleMapStage里面的逻辑
Resulttask—会执行另一个ResultStage里面的逻辑

13 执行shuffleMapStage里的逻辑 读取数据,按照逻辑处理,处理完之后会shufflewrite分区写在内
存或者磁盘上
14 shuffleMapStage 执行完之后会提交ResultStage 执行 步骤12—步骤13 重复执行
15 Resulttask执行后会进行shuffleRead 读取数据过来之后 进行处理,
16 处理完成后分区输出处理结果(可能写在HDFS或者数据库)

(备注 步骤6-10 为DAGScheduler处理 11 为taskScheduler处理 12–16 为executor 处理)
(图来源于 多易教育辉哥(www.51doit.com))
spark 任务运行机制