spark基础之调度器运行机制简述

一 概述

驱动程序在启动的时候,首先会初始化SparkContext,初始化SparkContext的时候,就会创建DAGScheduler、TaskScheduler、SchedulerBackend等,同时还会向Master注册程序;如果注册没有问题。Master通过集群管理器(cluster manager)会给这个程序分配资源,然后SparkContext根据action触发job。

Job里面有一系列RDD, DAGScheduler从后往前推若发现是宽依赖的话,就划分不同的Stage。

Stage划分完后,Stage提交给底层的调度器TaskScheduler,TaskScheduler拿到这个Task的集合,因为Stage内部都是计算逻辑完全一样的任务,只是数据不一样而已。TaskScheduler就会根据数据本底性,将任务分配到Executor上执行。

Executor在任务运行完毕或者出状况时,肯定要向Driver汇报

最后运行完毕,关闭SparkContext,同时创建的那些对象也被关掉。

 

二 什么是Spark Driver 程序

Driver程序就是运行应用程序的main函数,它会创建SparkContext,准备应用程序的运行环境(初始化各个组件,比如DAGScheduler等),

然后应用程序由SparkContext负责和集群通信,资源的申请以及任务的分配和监控等。当Worker节点的Executor执行完Task之后,Driver同时负责将SparkContext关闭。

 

三 SparkContext

SparkContext是用户和Spark集群进行交互的唯一入口,可以用来在Spark集群中创建RDD,累加器Accumulator和广播变量; 它也是驱动程序至关重要的对象,由它提供应用程序所需要的运行环境。

 

SparkContext的核心作用就是准备应用程序运行环境,所以在初始化的时候会构造一系列对象DAGScheduler, TaskScheduler等,同时负责向Master注册应用程序

 

只可以有一个SparkContext实例运行在一个JVM中,所以在创建SparkContext的时候之前,确保之前的SparkContext已经关闭了,即调用stop方法停止当前JVM中唯一运行的SparkContext

 

四 Spark Job的触发

# 每一个final RDD的action操作会触发一个job,比如count,collect,saveAsTextFile,forEach等都会触发job。这就意味着应用程序如果有多个action操作 .

# 每一个Job根据宽依赖来划分Stage,每一个job可能有一个或者多个Stage,比如reduceByKey,groupByKey等算子,每一个Stage生成一个Task

# 所有的Stage会形成一个DAG(有向无环图),由于RDD的Lazy特性,导致Stage也是Lazy级别的,只有遇到了Action才会真正发生作业的执行,在Action之前,Spark框架只是将要进行的计算记录下来,并没有真的执行。

# 一个作业可能有ResultStage和ShuffleMapStage组成:一个作业如果shuffle操作,那么就只有一个ResultStage;如果有shuffle操作,那么,则存在一个ResultStage和至少一个ShuffleMapStage

spark基础之调度器运行机制简述

spark基础之调度器运行机制简述

五 DAGScheduler

# DAG:Direct Acyclic Graph,spark主要用于RDD关系建模,描述RDD之间的依赖关系,主要用于构建RDD的数据流,即RDD的各个分区数据是从哪里来的和构建基于数据流之上的操作算子流,即RDD各个分区数据总共会经过哪些transformation和action的这两种类型的一系列的操作的调度运行

# DAGScheduler需要解析DAG.它是一个面向stage的高层调度器,它把DAG拆分成很多个Task,每一组task都是一个stage,解析的时候,每当遇到shuffle操作的时候就会产生新的stage,然后以一个个TaskSet的形式提交给底层的调度器TaskScheduler.

# DAGScheduler需要记录哪些RDD需要写入磁盘

# DAGScheduler 需要寻求Task的最优调度,比如stage内部数据的本地性等

# DAGScheduler 需要监视因为shuffle跨节点输出可能导致的失败,如果发现stage失败,可能需要重新提交stage

 

Job、Stage、TaskSet、Task含义和关系:

Job: 一个action操作就会触发一个job,如果有多个action操作就会有多个job.

Stage: 一个Job会被DAGScheduler拆分成多组任务,每一组任务就是由一个Stage封装,stage之间也有依赖关系。如果RDD之间没有shuffle操作那么就只有一个ResultStage;如果有shuffle操作,那么就有一个ResultStage和至少一个ShuffleMapStage

TaskSet: 一组任务就是一个TaskSet,对应着一个Stage,所以也可以理解为一个Stage就是一个TaskSet

Task: 一个独立的工作单元,由驱动程序发送到Executor上去执行。通常情况下,一个Task处理一个RDD的分区的数据,根据返回类型不同,又分为ResultTask和ShuffleMapTask

 

六 TaskScheduler

TaskScheduler主要是提交TaskSet到集群运算并汇报结果

# 为TaskSet创建和维护一个TaskSetManager,并追踪任务本地性及错误信息

# 遇到一些迷路的任务(straggle)会放在其他节点重试

# 向DAGScheduler汇报执行情况,包括shuffle输出丢失时报告fetch failed错误

 

七 SchedulerBackend

调度器的通信终端,以SparkDeploySchedulerBackend在启动时,构造了AppClient实例,并在该实例start时启动ClientEndpoint消息循环体,ClientEndpoint在启动时会向Master注册当前程序。

SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend在start时会实例化类型为DriverEndPoint消息循环体,SparkDeploySchedulerBackend专门负责收集Worker上资源信息,当ExecutorBackend启动时会发送RegisteredExecutor信息向DriverPoint注册,此时SparkDeploySchedulerBackend就掌握了当前应用程序所拥有的计算资源。