大数据学习笔记(spark资源调度)

1、绪论

上图是Spark程序运行时的一个超级简单的概括。我们运行一个Spark应用程序时,首先第一步肯定是写一个Spark Application应用程序,然后调用资源调度器为Driver申请资源。申请成功后,向master为Application申请资源,申请完毕后,调用资源调度器把任务分发到节点执行。在各个节点进行分布式的并行计算。

2、前置知识

对于Application来说,资源是Executor。对于Executor来说资源是内存、core。
  Master里面有几个对象:workers、waitingDrivers、waitingApps。下面对这几个对象做一个简单介绍,这几个对象是在源码中声明的,如需更详细的认知,可自行查看看源码(源码是使用的是Scala,如需快速了解,请参考《Scala快速学习》)。

val works = new HashSet[WorkInfo]()
val waitingDrivers = new ArrayBuffer[DriverInfo]()
val waitingApps = new ArrayBuffer[ApplicationInfo]()

(可能直接看下面的知识点会有点迷惑,若不理解可以结合第三部分的流程图一起看)
  在上面的代码中,WorkInfo代表的是work节点的节点信息。DriverInfo是Driver发送过来的请求信息。ApplicationInfo是发送过来的Application的信息。

val works = new HashSetWorkInfo
  works 集合采用HashSet数组存储work的节点信息,可以避免存放重复的work节点。为什么要避免重复?首先我们要知道work节点有可能因为某些原因挂掉,挂掉之后下一次与master通信时会报告给master,这个节点挂掉了,然后master会在works对象里把这个节点去掉,等下次再用到这个节点是时候,再加进来。这样来说,理论上是不会有重复的work节点的。可是有一种特殊情况:work挂掉了,在下一次通信前又自己启动了,这时works里面就会有重复的work信息。

val waitingDrivers = new ArrayBufferDriverInfo
  当客户端向master为Driver申请资源时,会将要申请的Driver的相关信息封装到master节点的DriverInfo这个泛型里,然后添加到waitingDrivers 里。master会监控这个waitingDrivers 对象,当waitingDrivers集合中的元素不为空时,说明有客户端向master申请资源了。此时应该先查看一下works集合,找到符合要求的worker节点,启动Driver。当Driver启动成功后,会把这个申请信息从waitingDrivers 对象中移除。

val waitingApps = new ArrayBufferApplicationInfo
  Driver启动成功后,会为application向master申请资源,这个申请信息封存到master节点的waitingApps 对象中。同样的,当waitingApps 集合不为空,说明有Driver向Master为当前的Application申请资源。此时查看workers集合,查找到合适的Worker节点启动Executor进程,默认的情况下每一个Worker只是为每一个Application启动一个Executor,这个Executor会使用1G内存和所有的core。启动Executor后把申请信息从waitingApps 对象中移除。

注意点:上面说到master会监控这三个集合,那么到底是怎么监控的呢???
  master并不是分出来线程专门的对这三个集合进行监控,相对而言这样是比较浪费资源的。master实际上是‘监控’这三个集合的改变,当这三个集合中的某一个集合发生变化时(新增或者删除),那么就会调用schedule()方法。schedule方法中封装了上面提到的处理逻辑。

3、流程图

大数据学习笔记(spark资源调度)

4、详细步骤

1、执行提交命令,会在client客户端启动一个spark-submit进程(用来为Driver申请资源)。
  2、为Driver向Master申请资源,在Master的waitingDrivers 集合中添加这个Driver要申请的信息。Master查看works集合,挑选出合适的Work节点。
  3、在选中的Work节点启动Driver进程(Driver进程已经启动了,spark-submit的使命已经完成了,关闭该进程)。
  4、Driver进程为要运行的Application申请资源(这个资源指的是Executor进程)。此时Master的waitingApps 中要添加这个Application申请的资源信息。这时要根据申请资源的要求去计算查看需要用到哪些Worker节点(每一个节点要用多少资源)。在这些节点启动Executor进程。
  (注:轮询启动Executor。Executor占用这个节点1G内存和这个Worker所能管理的所有的core)
  5、此时Driver就可以分发任务到各个Worker节点的Executor进程中运行了。

5、资源调度结论

1、默认情况下,每一个Executor只会为每一个Application启动一个Executor。每个Executor默认使用1G内存和这个Worker所能管理的所有的core。
  2、如果想要在一个Worker上启动多个Executor,在提交Application的时候要指定Executor使用的core数量。提交命令:spark-submit --executor-cores
  3、默认情况下,Executor的启动方式是轮询启动,一定程度上有利于数据的本地化。

什么是轮询启动???为什么要轮训启动呢???

轮询启动:轮询启动就是一个个的启动。例如这里有5个人,每个人要发一个苹果+一个香蕉。轮询启动的分发思路就是:五个人先一人分一个苹果,分发完苹果再分发香蕉。

为什么要使用轮询启动的方式呢???我们做大数据计算首先肯定想的是计算找数据。在数据存放的地方直接计算,而不是把数据搬过来再计算。我们有n台Worker节点,如果只是在数据存放的节点计算。只用了几台Worker去计算,大部分的worker都是闲置的。这种方案肯定不可行。所以我们就使用轮询方式启动Executor,先在每一台节点都允许一个任务。

存放数据的节点由于不需要网络传输数据,所以肯定速度快,执行的task数量就会比较多。这样不会浪费集群资源,也可以在存放数据的节点进行计算,在一定程度上也有利于数据的本地化。

6、加深理解

这里有几个小问题,益脑游戏。。。
  前提条件:假设我们有5个worker,每个worker节点提供10G内存,10个core。

1、spark-submit --master … --executor-cores 2 --executor-memory 2G … 在集群中会启动多少个Executor进程???
  25

2、spark-submit --master … --executor-cores 3 --executor-memory 4G … 在集群中会启动多少个Executor进程???
  10

3、spark-submit --master … --executor-cores 2 --executor-memory 2G --total-executor-cores 10 … 在集群中会启动多少个Executor进程???(–total-executor-cores:整个Application最多使用的core数)
  5

4、spark-submit --master … --executor-cores 2 --executor-memory 2G --total-executor-cores 4 … 集群中Executor的分布情况???(–total-executor-cores:整个Application最多使用的core数)
  随机找两台Worker节点。

5、启动Executor个数的公式:*min(min(wm/em,wc/ec)wn,tec/ec)
  注:–executor-cores : ec
    --executor-memory : em
    --total-executor-cores : tec
    worker_num : wn
    worker_memory : wm
    worker_core : wc