Spark源码解析系列(二、SparkContext内部初始化)
SparkContext
上一节我们介绍了spark-sumbit
方式的任务提交。讲到了SparkContext初始化这里。我们都知道sc是spark应用最为核心的对象,那么今天我们就来看看它是如何进行初始化的。我们通过new SparkContext(conf)
方法进入源码。
// 它是Spark功能的主要入口点。SparkContext表示与Spark集群的连接,
// 可用于在该集群上创建RDDs、累加器和广播变量
class SparkContext(config: SparkConf) extends Logging {
... // 此次省略 2351 多行
}
主构造函数,它是和类的定义混杂在一起的,scala特有写法。里面也包括许多方法的定义,主要逻辑在try里面大概300行!好吧,我们挑重点看看。这个构造函数里最重要的做了以下事情:
// 1.创建出spark运行环境
_env = createSparkEnv(_conf, isLocal, listenerBus)
...
// 2.绑定spark ui的url和端口
_ui.foreach(_.bind())
...
// 3.创建createTaskScheduler,是本章节的重点,会详细介绍
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 4.创建 DAGScheduler
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// 5.启动创建好的TaskScheduler
_taskScheduler.start()
TaskScheduler的实现
我们跳到createTaskScheduler的具体实现。根据master
类型有很多实现,"local"
最简单。 但生产环境中我们一般会用自己指定master和deploy-mode,如使用spark自带的集群模式standalone,则在submit后设置--master spark://xxx:port
,yarn方式则配置为--master
。这里我们就看下standalone的机器模式。
// master = spark://xxx:port ,通过正则匹配 ""spark://(.*)""
case SPARK_REGEX(sparkUrl) =>
// TaskScheduler具体的实现类
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
// 创建backend,实际会负责与master注册,Executor的反注册,task发送到executor等操作
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
// 初始化scheduler
scheduler.initialize(backend)
// 返回 backend 和 scheduler
(backend, scheduler)
SPI
另外再补充一个知识点,现在先提一个问题,就是使用yarn方式提交时会怎么走?下面是master的正则匹配集合,我们发现会yarn没有可匹配的。
/**
* 用于从 master 字符串中提取信息的正则表达式集合。
*/
private object SparkMasterRegex {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
}
那样就会走到最后的case masterUrl
,重点getClusterManager(masterUrl)
方法。
case masterUrl =>
// 通过Option[ExternalClusterManager] 返回确定的ExternalClusterManager,否则就抛异常
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
...
我们看下getClusterManager(masterUrl)
实现:
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
...
}
这里用到了java.util下的ServiceLoader,是SPI(Service Provider Interface)的是一种实现。简单理解是允许用户使用自定义一些服务替换现有框架的实现,动态替换掉默认的实现类,可以增强框架的扩展或者替换一些组件。 (dubbo等框架都有类似是思想,实现 @Adaptive 接口就好)。这里我们如果想扩展自定义调度框架,就只需要实现ExternalClusterManager特征就好了。比如Yarn里面的实现:
class YarnClusterManager extends ExternalClusterManager
...
// 具体实现
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
如果你本地无法找到YarnClusterManager类,只许要添加spark-yarn的依赖就好。(当时我也没找到yarn的实现,后面想起它是独立于spark的调度框架才找到原因的)
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.2.0</version>
</dependency>
多说了会,让我们回到createTaskScheduler
,新建TaskSchedulerImpl,里面就是初始定义一堆东西,然后新建StandaloneSchedulerBackend,它继承自CoarseGrainedSchedulerBackend,这个backend在Spark作业期间保留每个执行程序,而不是放弃。当一个任务完成,并要求调度程序启动一个新的执行器。接下初始化backend,scheduler.initialize(backend)
这里会创建任务池SchedulePool,默认先进先出。
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
scheduler.start()
之后会返回初始化好的backend和scheduler,回到最初的SparkContext构造函数。然后会创建DAGScheduler,它是任务的高层调度器,主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个的Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler。接着会调用_taskScheduler.start()
启动刚才创建好的scheduler,里面会首先启动backend。通过一系列的调用。会创建一个 DriverEndpoint (老版本DriverActor),主要作用就是和worker节点的Executor通信。也就是我们常听到的Driver。
// StandaloneSchedulerBackend
-> super.start()
-> driverEndpoint = createDriverEndpointRef(properties)
-> rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
// 创建Driver和worker通信
-> new DriverEndpoint(rpcEnv, properties)
...
// 这个ApplicationDescription非常重要它代表了当前application执行状况,
-> val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// (Reactor模式)和Master通信,老版本ClientActor 。
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
ApplicationDescription中包含了最大需要多少cup core,每个slave上需要多少内存等信息,并将其发送给master,master会通过schedule()分配任务到worker中。
client会注册任务到master ,然后等到master发送消息,下一节会接着client的任务注册进行详解。来个图解可能更直观。
这节涉及的内容也比较多,一些重要细节没注意到的也希望大家指正。下一节我们再看看再简单的过下client是如何进行注册的。