本期内容:
1、Spark Streaming Job生成深度思考
2、Spark Streaming Job生成源码解析
一、Spark Streaming Job生成深度思考
源码解析:
1. 在大数据处理场景中,如果不是流处理的话,一般会有定时任务。例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理,一切处理终将被流处理统一!!
DStreams其实有三种类型:
第一种是输入的DStrams,可以有各种不同的数据来源构建的Stream,例如来自Socket,Kafka,Flume等;
第二种是输出的DStrams,outputStreams 是逻辑级别的Action,由于还是Spark Streaming框架级别的,底层还是会翻译成物理级别的Action,就是ADD的Action;
第三种是Transforms操作从一种DStream转变为另一种DStream,即基于其他DStream产生的。其中DStreamGraph 类记录了数据来源的DStream,和输出类型的DStream
产生DStreams有两种方式:
DSTreams要么基于数据源产生,要么基于其它的DStreams产生;
SparkStreaming流处理基于时间作为触发器,Storm基于事件作为触发器,基于一个又一个的Record!!
二、Spark Streaming Job生成源码解析
Spark 作业动态生成三大核心:
JobGenerator: 负责Job生成。 基于源数据生成;
JobSheduler: 负责Job调度。 基于源数据调度;
ReceiverTracker: 获取元数据。
JobGenerator和ReceiverTracker是JobScheduler的成员,从JobScheduler的start方法可以看出!!
跟踪源码得到如下运行流程:
streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)-->jobScheduler.inputInfoTracker.getInfo(time)-->jobScheduler.submitJobSet-->startFirstTime()-->graph.start()-->timer.start()
具体逻辑图如下:
说明:此图引用了spark版本定制班成员http://lqding.blog.51cto.com/9123978/1772958的成果,表示深深的感谢
![[spark streaming]Job动态生成和深度思考 [spark streaming]Job动态生成和深度思考](/default/index/img?u=aHR0cHM6Ly9waWFuc2hlbi5jb20vaW1hZ2VzLzYwOS8wNTExYWNjNmQyMThhYTE1Y2JiZGJlMjNmZTM3NDgwOS5wbmc=)
下面结合调式过程和job生成结构图,逐步追踪源码:
1.val ssc = new StreamingContext(conf, Seconds(5))
ssc.start()程序运行的入口;
2.进入jobscheduler.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def start() : Unit = synchronized { if (eventLoop ! = null ) return // scheduler has already been started
logDebug( "Starting JobScheduler" )
eventLoop = new EventLoop[JobSchedulerEvent]( "JobScheduler" ) {
override protected def onReceive(event : JobSchedulerEvent) : Unit = processEvent(event)
override protected def onError(e : Throwable) : Unit = reportError( "Error in job scheduler" , e)
} //启动消息循环处理线程
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc) //接收源数据
receiverTracker.start() //基于源数据生成Job
jobGenerator.start()
logInfo( "Started JobScheduler" )
}
|
3.进入receiverTracker.start()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/** Start the endpoint and receiver execution thread. */
def start() : Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException( "ReceiverTracker already started" )
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint( //ReceiverTracker接收到源数据后保存在ReceiverTrackerEndpoint中
"ReceiverTracker" , new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo( "ReceiverTracker started" )
trackerState = Started
}
}
|
4.进入jobGenerator.start()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
//checkpoint的初始化操作,实例化并启动消息循环体EventLoop,开启定时生成Job的定时器。/** Start generation of jobs */
def start() : Unit = synchronized {
if (eventLoop ! = null ) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoop[JobGeneratorEvent]( "JobGenerator" ) { //匿名内部类重写onReceive方法
override protected def onReceive(event : JobGeneratorEvent) : Unit = processEvent(event)
override protected def onError(e : Throwable) : Unit = {
jobScheduler.reportError( "Error in job generator" , e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
|
首先看EventLoop类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
/**
* An event loop to receive events from the caller and process all events in the event thread. It
* will start an exclusive event thread to process all events.
*
* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
* handle events in time to avoid the potential OOM. * EvenLoop类中有,后台线程从队列中获取消息,然后调用onReceive方法对该消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法。
*/
private [spark] abstract class EventLoop[E](name : String) extends Logging {
//存储消息的LinkedBlockingDeque和后台线程
private val eventQueue : BlockingQueue[E] = new LinkedBlockingDeque[E]()
private val stopped = new AtomicBoolean( false )
private val eventThread = new Thread(name) {
setDaemon( true ) //后台线程
override def run() : Unit = {
try {
while (!stopped.get) { //后台线程从队列中获取消息
val event = eventQueue.take()
try {
onReceive(event) //对消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法
} catch {
case NonFatal(e) = > {
try {
onError(e)
} catch {
case NonFatal(e) = > logError( "Unexpected error in " + name, e)
}
}
}
}
} catch {
case ie : InterruptedException = > // exit even if eventQueue is not empty
case NonFatal(e) = > logError( "Unexpected error in " + name, e)
}
}
}
def start() : Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped" )
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start()
}
|
进入最重要的processEvent方法:
1
2
3
4
5
6
7
8
9
10
11
|
//processEvent方法是对消息类型进行模式匹配,然后路由到对应处理该消息的方法中。消息的处理一般是发给另外一个线程来处理的,消息循环器不处理耗时的业务逻辑/** Processes all events */
private def processEvent(event : JobGeneratorEvent) {
logDebug( "Got event " + event)
event match {
case GenerateJobs(time) = > generateJobs(time)
case ClearMetadata(time) = > clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) = >
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) = > clearCheckpointData(time)
}
}
|
进入generateJobs方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
在获取到数据后调用DStreamGraph的generateJobs方法来生成Job,具体如下步骤所示: /** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time : Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try { //第一步:获取当前时间段里面的数据。根据分配的时间来分配具体要处理的数据。
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //第二步:生成Job,获取RDD的DAG依赖关系。在此基于DStream生成了RDD实例
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) = > //第三步:获取streamIdToInputInfos的信息。BacthDuractions要处理的数据,以及我们要处理的业务逻辑
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) //第四步:将生成的Job交给jobScheduler
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) = >
jobScheduler.reportError( "Error generating jobs for time " + time, e)
} //第五步:进行checkpoint
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false ))
}
|
进入DStreamGraph的generateJobs方法:
1
2
3
4
5
6
7
8
9
10
11
12
|
def generateJobs(time : Time) : Seq[Job] = {
logDebug( "Generating jobs for time " + time)
val jobs = this .synchronized {
outputStreams.flatMap { outputStream = > //这里的outputStreams是整个DStream中的最后一个DStream。outputStream.generateJob(time)类似于RDD中从后往前推
val jobOption = outputStream.generateJob(time)
jobOption.foreach( _ .setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug( "Generated " + jobs.length + " jobs for time " + time)
jobs
}
|
进入onReceive方法:
1
2
3
4
5
6
7
8
|
/**
* Invoked in the event thread when polling events from the event queue.
*
* Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
* and cannot process events in time. If you want to call some blocking actions, run them in
* another thread.<br>不断的从消息队列中获得消息,一旦获得消息就会处理。 <br>不要在onReceive中添加阻塞的消息,如果这样的话会不断的阻塞消息。 <br>消息循环器一般都不会处理具体的业务逻辑,一般消息循环器发现消息以后都会将消息路由给其他的线程去处理
*/
protected def onReceive(event : E) : Unit
|
其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封装为JobHandler提交到jobExecutor线程池中
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def submitJobSet(jobSet : JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo( "No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job = > jobExecutor.execute( new JobHandler(job)))
logInfo( "Added jobs for time " + jobSet.time)
}
}
private val jobSets : java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
|
JobHandler对象为实现Runnable 接口,job的run方法导致了func的调用,即基于DStream的业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def submitJobSet(jobSet : JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo( "No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job = > jobExecutor.execute( new JobHandler(job)))
logInfo( "Added jobs for time " + jobSet.time)
}
}
private val jobSets : java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
|
最后总结两点:
- Action RDD触发作业的执行,这个时候作为Runnable接口封装,它会定义一个方法,方法里面是基于DStream的依赖关系生成的RDD。翻译的时候是将DStream的依赖关系翻译成RDD的依赖关系,由于DStream的依赖关系最后一个是action级别的,翻译成RDD的时候,RDD的最后一个操作也应该是action级别的,如果翻译的时候直接执行的话,就直接生成了Job,就没有所谓的队列,所以会将翻译的事件放到一个函数中或者一个方法中,因此,如果这个函数没有指定的action触发作业是执行不了的。
- Spark Streaming根据时间不断的去管理我们生成的作业,这个时候我们每个作业又有action级别的操作,这个action操作是对DStream进行逻辑级别的操作,它生成每个Job放到队列的时候,一定会被翻译为RDD的操作,那基于RDD操作的最后一个一定是action级别的,如果翻译的话直接就是触发action的话整个Spark Streaming的Job就不受管理了。因此我们既要保证它的翻译,又要保证对它的管理,把DStream之间的依赖关系转变为RDD之间的依赖关系,最后一个DStream使得action的操作,翻译成一个RDD之间的action操作,整个翻译后的内容它是一块内容,这一块内容是放在一个函数体中的,这个函数体,就是函数的定义,这个函数由于它只是定义还没有执行,所以它里面的RDD的action不会执行,不会触发Job,当我们的JobScheduler要调度Job的时候,转过来在线程池中拿出一条线程执行刚才的封装的方法。