葵花宝典--Spark内核和优化

一、Spark内核

1、内核概述

Driver:将用户程序转换为job、在executor之间进行调度、跟踪executor的运行情况、通过UI展示运行情况

Executor:运行spark任务并返回结果,通过自身BlockManager为RDD提供存储和,并且加快RDD的计算

通用运行流程

葵花宝典--Spark内核和优化

2、部署模式

Standalone模式

  • Driver:是一个进程,我们编写的Spark应用程序就运行在Driver上,由Driver进程执行
  • Master:是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责
  • Worker:是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某个或某些partition;另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算
  • Executor:是一个进程,一个Worker上可以运行多个Executor,Executor通过启动多个线程(task)来执行对RDD的partition进行并行计算,也就是执行我们对RDD定义的例如map、flatMap、reduce等算子操作

客户端模式

葵花宝典--Spark内核和优化

集群模式

葵花宝典--Spark内核和优化

yarn模式

客户端模式

葵花宝典--Spark内核和优化

集群模式

葵花宝典--Spark内核和优化

3、通讯架构

spark2以后使用Netty通讯框架作为内部通讯组件。Spark通讯框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。Endpoint(Client/Master/Worker)有1个InBoxN个OutBox(N>=1,N取决于当前Endpoint与多少其他的Endpoint进行通信,一个与其通讯的其他Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。

葵花宝典--Spark内核和优化

Spark通讯架构

葵花宝典--Spark内核和优化

 

  1. RpcEndpoint:RPC端点,Spark针对每个节点(Client/Master/Worker)都称之为一个Rpc端点,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher;
  2. RpcEnv:RPC上下文环境,每个RPC端点运行时依赖的上下文环境称为RpcEnv;
  3. Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;
  4. Inbox:指令消息收件箱,一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;
  5. RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。
  6. OutBox:指令消息发件箱,对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;
  7. RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。
  8. TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;

葵花宝典--Spark内核和优化

4、shuffe解析

reduce端读取数据

  • map阶段结束后,计算状态和小文件封装到MapState对象,并交给MapOutPutTrackerWorke管理
  • reduce阶段开始前从MapOutPutTrackerWorke中获取文件信息
  • 完成操作后,BlockTransforService去Executor0所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的20%内存中)

HashShuffe解析(已废弃)

未经优化:

葵花宝典--Spark内核和优化

优化后(spark.shuffle. consolidateFiles):

葵花宝典--Spark内核和优化

SortShuffe解析

普通机制:

葵花宝典--Spark内核和优化

bypass机制:

前提:不是聚合类的算子、task的数量少于spark.shuffle.sort.bypassMergeThreshold参数的值(默认200)

葵花宝典--Spark内核和优化

5、内存管理

堆内和堆外内存

  • 堆内内存:由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。
  • 堆外内存:默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并spark.memory.offHeap.size 参数设定堆外空间的大小

内存空间分配

  • storage:RDD缓存数据、广播变量
  • executor:shuffe数据
  • other:用户自定义数据结构或spark内部元数据

静态内存:

葵花宝典--Spark内核和优化

统一内存:storage和executor之间的内存可以相互借用,但如果executor占用了storage的内存,是不会返还的

葵花宝典--Spark内核和优化

存储内存

  • RDD的持久化机制:RDD 的每个 Partition 经过处理后唯一对应一个 Block。缓存可以为内存、磁盘等方式。
  • RDD的缓存机制:RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,同一 Partition 的不同 Record 的存储空间并不连续,将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为"展开"(Unroll)

每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。

葵花宝典--Spark内核和优化

6、核心组件

BlockManager:是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。Driver上为BlockManagerMaster,Executor为BlockManager,类似主从的关系。

葵花宝典--Spark内核和优化

广播变量:分布式只读变量-Broadcas

累加器:分布式只写变量-Accumulator

二、优化

1、性能优化和常见故障

葵花宝典--Spark内核和优化

葵花宝典--Spark内核和优化

2、数据倾斜

葵花宝典--Spark内核和优化