kotlin coroutines 协程教程(三)launch原理
原理篇(二)CoroutineScope.launch{} 过程分析
launch 的流程
先贴上 launch 的源码:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
step 1 创建 CoroutineContext
执行的代码块如下:
val newContext = newCoroutineContext(context)
首先会,根据传入的 CoroutineContext,由于没有传入 CoroutineContext,则使用默认的EmptyCoroutineContext,作为参数值
//CoroutineScope.newCoroutineContext() 方法
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
在这个 newCoroutineContext() 方法里面,会将作为参数的 Context,也就是 EmptyCoroutineContext,和当前的 Coroutine,也就是 BlockingCoroutine,关联在一起。
接着会根据是否 Debug 模式,如果是 Debug 的话,会对协程设置协程ID,方便调试;具体 Debug 设置,可以参考 Debug.kt 中 DEBUG_PROPERTY_NAME,相关解释。
最后根据当前的 CoroutineContext 是否有 Dispatcher,并且没有 ContinuationInterceptor,如果都没有的话,则为该 CoroutineContext 关联 Dispatchers.Default。由于我们是没有设置的,所以进入该方法,会给当前的 CoroutineContext 关联一个 Dispatchers.Default。
step 2 创建 Coroutine
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
接着会根据 CoroutineStart 的模式,确认当前的 Coroutine 是 StandaloneCoroutine 还是 LazyStandaloneCoroutine。
至于两者的区别仅仅在于,Lazy 模式下:
-
构造参数 active = false,非 Lazy 默认是 true,这个值是传入作为 JobSupport 中作为构造函数,也是 Job 状态机里面的初始值变量,如下:
private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
也就是说,如果是 Lazy 模式,则状态机进入的是 EMPTY_NEW 模式,非 Lazy 模式进入的是 EMPTY_ACTIVE 模式,这两个模式的可以简单认为,new 需要一种行为,让它进入 active。具体的解释,看Job-状态机 这一章节的分析
-
Lazy 模式下,会将代码块存进 bolck 参数里面,等待调用 start(),join(),await() 等,会执行 onStart() 回调,这会,代码块才会真正的执行。
具体看源码如下:
//Builder.Common.kt 文件下
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override val cancelsParent: Boolean get() = true
override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
}
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
private var block: (suspend CoroutineScope.() -> Unit)? = block
override fun onStart() {
val block = checkNotNull(this.block) { "Already started" }
this.block = null
block.startCoroutineCancellable(this, this)
}
}
Step 3 初始化 Parent Job
接着会执行这行代码,也就是调用 start() 方法,开始执行一个协程:
coroutine.start(start, coroutine, block)
实际调用的是 AbsCoroutine.start(),源码如下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
所以第三步,就是 initParentJob(),那我们来看看这里做了什么。
还是先看下源码,最终会走到 initParentJob() 以及initParentJobInternal() 函数:
internal fun initParentJob() {
initParentJobInternal(parentContext[Job])
}
internal fun initParentJobInternal(parent: Job?) {
check(parentHandle == null)
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
@Suppress("DEPRECATION")
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
首先会检查 parentHandle 是否为空,如果空的话,会抛出 IllegalStateException。
接着看 parent 是否为空,在这个例子里面,其 parent 是 BlockingCoroutine(因为根step 1 的分析,其 CoroutineContext 为BlockingCoroutine+EmptyCoroutineContext)。那么其实如果 parent 为空的话,后面的 parent.start() 不会执行,这里直接回 return 掉了。
后面会执行 parent.start(),这里调用 parent.start() 只是为了确保 ParentJob 的状态机现态为 Empty.isActive,具体的逻辑在 JobSupport 中:
// returns: RETRY/FALSE/TRUE:
// FALSE when not new,
// TRUE when started
// RETRY when need to retry
private fun startInternal(state: Any?): Int {
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
onStartInternal()
return TRUE
}
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
if (!_state.compareAndSet(state, state.list)) return RETRY
onStartInternal()
return TRUE
}
else -> return FALSE // not a new state
}
}
这里主要分为三种情况:
- state.isActive 直接 return false,表示这个 Job 之前已经开始了。
- state.isActive = false,那么将 state 设置为 EMPTY_ACTIVE,并且返回 RETRY
- 第三钟情况会调用 onStartInternal(),也就是 onStart() 函数,并且返回 TRUE
接着这里会进行其它一些设置和检查,例如设置 ParentHandler,ParentHandler 用于当 Child 取消的时候,能够通知到 Parent。
step 4 启动一个协程
具体执行的代码是:
//AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
这里会调用 CoroutineStart 的操作符函数,也就是 invoke() 函数:
//CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
Coroutine 的初始状态是 start,进入这里之后,会变成create。具体的细节可以看 Coroutine 章节。
Step 5 协程分发
执行的代码,如下:
//Cancellable.kt 中
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
这里会执行 Continuation 的 resumeCancelable() 方法,实际执行的是其子类 DispatchContinuation 里面相关方法,源码如下:
//也就是说,最终会走向 Dispatcher.dispatche() 方法
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
// 一般isDispatchNeeded 都是返回 true,表示,需要将该execute 分发到其它线程
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
Dispatcher 会分发到 CoroutineSchedler 中,然后接着在线程池中执行操作。
step6 分发block 到线程池的线程中
现在block 分发到,CoroutineSchedler 的 dispatch() 中,源码如下:
/**
* Dispatches execution of a runnable [block] with a hint to a scheduler whether
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
*
* @param block runnable to be dispatched
* @param taskContext concurrency context of given [block]
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
*/
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
timeSource.trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
when (submitToLocalQueue(task, fair)) {
ADDED -> return
NOT_ADDED -> {
// try to offload task to global queue
if (!globalQueue.addLast(task)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
requestCpuWorker()
}
else -> requestCpuWorker() // ask for help
}
}
首先会根据 block 和 taskContext,封装成一个 TaskImpl。TaskImpl 其实就是封装了Runnable,如下:
//TaskImpl
internal class TaskImpl(
@JvmField val block: Runnable,
submissionTime: Long,
taskContext: TaskContext
) : Task(submissionTime, taskContext) {
override fun run() {
try {
block.run()
} finally {
taskContext.afterTask()
}
}
}
//Task
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
val mode: TaskMode get() = taskContext.taskMode
}
//Runnable
public actual inline fun Runnable(crossinline block: () -> Unit): Runnable =
java.lang.Runnable { block() }
也就是说,其实 TaskImpl 就是Runnable 的子类,并且其run() 方法就是执行一个 block() 代码块。同事如果执行完该 run() 函数,taskContext.afterTask() 会回调,那么这里的 TaskContext 可能是 NonBlockingContext 这类的,但是在 NonblockingContext 的 afterTask() 并没有做什么,类似LimitingDispatcher 可能会有其它的操作。
接着会通过 submitToLocalQueue() 尝试将 Task add 进对应线程的 localQueue 中,如果 add 成功,则返回 ADDED,失败返回 NOT_ADDED,
/**
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
*/
private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
//这里会判断当前线程是继承 Worker 线程,as? 是一种非安全的类型转换,如果转换失败,会返回null
val worker = Thread.currentThread() as? Worker
?: return NOT_ADDED
if (worker.scheduler !== this) return NOT_ADDED // different scheduler's worker (!!!)
/*
* This worker could have been already terminated from this thread by close/shutdown and it should not
* accept any more tasks into its local queue.
*/
if (worker.state === WorkerState.TERMINATED) return NOT_ADDED
//上面进行了 scheduler 和 state的判断,如果不符合,都不能在这里进行Add 操作
var result = ADDED
if (task.mode == TaskMode.NON_BLOCKING) {
/*
* If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
* 1) Blocking worker is finishing its block and resumes non-blocking continuation
* 2) Blocking worker starts to create non-blocking jobs
*
* First use-case is expected (as recommended way of using blocking contexts),
* so we add non-blocking task to local queue, but also request CPU worker to mitigate second case
*/
if (worker.isBlocking) {
result = ADDED_REQUIRES_HELP
} else {
/*
* If thread is not blocking, then it's just tries to finish its
* local work in order to park (or grab another blocking task), do not add non-blocking tasks
* to its local queue if it can't acquire CPU
*/
val hasPermit = worker.tryAcquireCpuPermit()
if (!hasPermit) {
return NOT_ADDED
}
}
}
val noOffloadingHappened = if (fair) {
worker.localQueue.addLast(task, globalQueue)
} else {
worker.localQueue.add(task, globalQueue)
}
if (noOffloadingHappened) {
// When we're close to queue capacity, wake up anyone to steal work
// Note: non-atomic bufferSize here is Ok (it is just a performance optimization)
if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD) {
return ADDED_REQUIRES_HELP
}
return result
}
return ADDED_REQUIRES_HELP
}
这里的流程比较多,具体细节可以看上面的源码,我在关键位置,都加了注释,简单来说,这里主要就是,根据是否当前 Worker 线程(Worker 是一种守护线程),将 block 加入到 Worker 的 localQueue 中,其中 localQueue 也就是 LockFreeTaskQueue 类型的数据接口,根据文档描述:
Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes.
LockFreeTaskQueue 是一种它是一种 smi-FIFo 的队列,也就是按照当前提交和入队顺序,出队,例如入队[1, 2, 3, 4] 出队[4, 1, 2, 3] ,其次这种数据结构是一种 单一生产者 多消费者的模式。
step 7 Worker.run() 不断的取出task 并且执行
在 step 6 中,Task 会被 submit 到 Worker.localQueue 或者 CoroutineScheduler.GlobalQueue 中,最后会调用 CoroutineDispatcher.dispatch() 也就是线程池的 execute() 方法执行子线程,期间会通过 createNewWorker() 方法创建新的 Worker 线程,并且同时 start,源码如下:
//CoroutineDispatcher createNewWorker() 函数中
val worker = Worker(newIndex).apply { start() }
然后 Worker 线程的 run() 方法里面会不断的 poll task 出来,然后执行 rask,源码如下:
private fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
timeSource.unTrackTask()
}
}
就这样,launch{} 的执行过程基本清晰了,下面分析最后一个过程,也就是线程池的销毁过程。
step 8 协程销毁
如果最后调用了 Dispatcher.close() 方法,那么线程池会调用 shutDown()方法,销毁线程和线程池,源码如下:
//CoroutineScheduler 中
fun shutdown(timeout: Long) {
// atomically set termination flag which is checked when workers are added or removed
if (!_isTerminated.compareAndSet(0, 1)) return
// make sure we are not waiting for the current thread
val currentWorker = Thread.currentThread() as? Worker
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
val created = synchronized(workers) { createdWorkers }
// Shutdown all workers with the only exception of the current thread
for (i in 1..created) {
val worker = workers[i]!!
if (worker !== currentWorker) {
while (worker.isAlive) {
LockSupport.unpark(worker)
worker.join(timeout)
}
val state = worker.state
check(state === WorkerState.TERMINATED) { "Expected TERMINATED state, but found $state"}
worker.localQueue.offloadAllWork(globalQueue)
}
}
// Make sure no more work is added to GlobalQueue from anywhere
globalQueue.close()
// Finish processing tasks from globalQueue and/or from this worker's local queue
while (true) {
val task = currentWorker?.findTask() ?: globalQueue.removeFirstOrNull() ?: break
runSafely(task)
}
// Shutdown current thread
currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
// check & cleanup state
assert(cpuPermits.availablePermits() == corePoolSize)
parkedWorkersStack.value = 0L
controlState.value = 0L
}
这里先会停止所有的 Worker 线程,然后确认当前 Woker 的所有 task 都执行完毕之后,便将线程池清空。
结合下面的流程图流程图的步骤划分和文字的步骤不一样,因为为了画图好看~~,launch{} 的全部过程已经梳理完成,大图可以查看