深入理解Kotlin协程(四)——Kotlin协程框架初探(二)

本篇继续探讨协程的取消、异常处理以及作用域问题。

协程的取消

协程的取消本质是协作式的取消,这点跟线程的中断一致,除了自身状态置为取消外,也需要协程体的执行逻辑能够检查状态的变化来响应取消。

完善协程的取消逻辑

我们的Job目前还有两个函数空着没有实现,分别是cancle和invokeOnCancel。后者的实现与doOnCompleted类似,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
override fun invokeOnCancel(onCancel: OnCancel): Disposable {
val disposable = CancellationHandlerDisposable(this, onCancel)

val newState = state.updateAndGet { prev ->
when (prev) {
is CoroutineState.Incomplete -> {
CoroutineState.Incomplete().from(prev).with(disposable)
}
is CoroutineState.Cancelling,
is CoroutineState.Complete<*> -> {
prev
}
}
}
(newState as? CoroutineState.Cancelling)?.let { onCancel() }
return disposable
}

class CancellationHandlerDisposable(val job: Job, val onCancel: OnCancel) : Disposable {
override fun dispose() {
job.remove(this)
}
}

cancel函数实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override fun cancel() {
val preState = state.getAndUpdate { prev ->
when (prev) {
is CoroutineState.Incomplete -> {
CoroutineState.Cancelling()
}
is CoroutineState.Cancelling,
is CoroutineState.Complete<*> -> prev
}
}
if (preState is CoroutineState.Incomplete) {
preState.notifyCancellation()
preState.clear()
}
parentCancelDisposable?.disposable()
}

注意这里使用了getAndUpdate来流转状态,也就是说我们拿到的是旧状态,旧状态如果是Incomplete则一定发生了状态流转,调用notifyCancellation来通知取消事件。

支持取消的挂起函数

通常来讲,一个发生了事实上挂起的挂起函数如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
suspend fun nonCancellableFunction() = suspendCoroutine<Int> { continuation ->

val completableFuture = CompletableFuture.supplyAsync {
Thread.sleep(1000)
Random.nextInt()
}

completableFuture.thenApply {
continuation.resume(it)
}.exceptionally {
continuation.resumeWithException(it)
}
}

这种情况下,及时所在的协程被取消,我们也无法取消内部的异步任务CompletableFuture。为了能够取消内部的异步任务,我们需要Continuation提供一个取消状态和回调,在协程取消,回调被通知时取消掉CompletableFuture,如下。

1
2
3
4
5
6
7
8
9
10
suspend fun cancellableFunction() = suspendCancellableCoroutine<Int> { continuation ->
val completableFuture = CompletableFuture.supplyAsync {
Thread.sleep(1000)
Random.nextInt()
}
continuation.invokeOnCancellation {
completableFuture.cancel(true)
}
... ...
}

那么这个suspendCancellableCoroutine要如何实现呢?我们可以参考官方框架中的suspendCoroutine实现来做一点修改。

1
2
3
4
5
6
7
8
9
10
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
}

suspendCoroutineUninterceptedOrReturn的参数是一个函数,这个函数有一个参数Continuation,实际上就是我们前面文章说的编译后生成的匿名内部类的实例。SafeContinuation的作用是确保传入的Continuation对象的恢复调用只被执行一次。如何确保的呢?当block(safe)执行过程中调用了Continuation的恢复调用时,safe.getOrThrow就会获取到结果,而不是COROUTINE_SUSPENDED,这样协程就不会真正挂起了。

那么我们要对 Continuation进行改造使其支持取消的话,就要替换到SafeContinuation了,即需要实现一下效果:

1
2
3
4
5
6
7
suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { continuation ->
val cancellable = CancellableContinuation(continuation.intercepted())
block(cancellable)
cancellable.getResult()
}

关键就是CancellableContinuation的实现了。

CancellableContinuation的实现

CancellableContinuation需要具备以下能力:

  • 支持通过invokeOnCancellation注册取消回调
  • 支持监听对应协程的取消状态
  • 具备SafeContinuation的功能

这样的话,CancellableContinuation必然是有状态的,同样我们先给出状态的定义。

1
2
3
4
5
6
7
8
9
10
sealed class CancelState {
object InComplete : CancelState()
class CancelHandler(val onCancel: OnCancel): CancelState()
class Complete<T>(val value: T? = null, val exception: Throwable? = null) : CancelState()
object Cancelled : CancelState()
}

enum class CancelDecision {
UNDECIDED, SUSPENDED, RESUMED
}

这里的状态定义跟协程的状态时一致的,这里不多说。CancelDecision枚举用于标记对应的挂起函数是否同步返回了。

CancellableContinuation需要包装一个Continuation,这里我们直接使用接口代理即可,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class CancellableContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {

private val state = AtomicReference<CancelState>(CancelState.InComplete)
private val decision = AtomicReference(CancelDecision.UNDECIDED)

val isCompleted: Boolean
get() = when (state.get()) {
CancelState.InComplete,
is CancelState.CancelHandler -> false
is CancelState.Complete<*>,
CancelState.Cancelled -> true
}
... ...
}

先看invokeOnCancellation的实现,如果当前是Incomplete状态,那么就可以注册回调,如果是Cancelled状态就直接调用回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
fun invokeOnCancellation(onCancel: OnCancel) {
val newState = state.updateAndGet { prev ->
when (prev) {
CancelState.InComplete -> CancelState.CancelHandler(onCancel)
is CancelState.CancelHandler -> throw IllegalStateException("It's prohibited to register multiple handlers.")
is CancelState.Complete<*>,
CancelState.Cancelled -> prev
}
}
if (newState is CancelState.Cancelled) {
onCancel()
}
}

接下来尝试去监听对应协程的取消事件,可以通过协程上下文来获取对应协程。

1
2
3
4
5
6
7
private fun installCancelHandler() {
if (isCompleted) return
val parent = continuation.context[Job] ?: return // 获取协程
parent.invokeOnCancel {
doCancel()
}
}

取消回调中通过doCancel方法来完成状态的流转。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private fun doCancel() {
val prevState = state.getAndUpdate { prev ->
when (prev) {
is CancelState.CancelHandler,
CancelState.InComplete -> {
CancelState.Cancelled
}
CancelState.Cancelled,
is CancelState.Complete<*> -> {
prev
}
}
}
if (prevState is CancelState.CancelHandler) {
prevState.onCancel()
resumeWithException(CancellationException("Cancelled."))
}
}

对于两种未完成的状态,流转为Cancelled,如果刘赚钱有回调注册,就调用回调通知取消事件。

由于挂起点发生了真正的挂起后注册回调才有意义,因此无需急于注册取消回调,在getResult中注册即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun getResult(): Any? {
installCancelHandler() // 注册取消回调
if(decision.compareAndSet(CancelDecision.UNDECIDED, CancelDecision.SUSPENDED))
return COROUTINE_SUSPENDED
return when (val currentState = state.get()) {
is CancelState.CancelHandler,
CancelState.InComplete -> COROUTINE_SUSPENDED
CancelState.Cancelled -> throw CancellationException("Continuation is cancelled.")
is CancelState.Complete<*> -> {
(currentState as CancelState.Complete<T>).let {
it.exception?.let { throw it } ?: it.value
}
}
}
}

首先注册了协程的取消回调,接着通过CAS操作判断当前decision是否为UNDECIDED,如果是UNDECIDED则表示结果还未就绪,将其设置为SUSPENDED并返回挂起标志位COROUTINE_SUSPENDED。否则decision只可能为RESUMED,即挂起函数没有真正挂起并且结果已经可以获取,那么就会在Complete分支返回结果,如果未完成则会返回挂起标志COROUTINE_SUSPENDED。

接着是resumeWith的实现。该函数被调用表示挂起函数恢复执行,此时如果dicision为UNDECIDED,表示挂起函数同步返回了,后面通过getResult来获取结果,否则只能为dicision只能为SUSPENDED,即已挂起。代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override fun resumeWith(result: Result<T>) {
when {
decision.compareAndSet(CancelDecision.UNDECIDED, CancelDecision.RESUMED) -> { // 如果是UNDECIDED,则不用考虑并发安全问题
state.set(CancelState.Complete(result.getOrNull(), result.exceptionOrNull()))
}
decision.compareAndSet(CancelDecision.SUSPENDED, CancelDecision.RESUMED) -> { // 如果是SUSPENDED,使用updateAndGet来更新状态并获取结果
state.updateAndGet { prev ->
when (prev) {
is CancelState.Complete<*> -> {
throw IllegalStateException("Already completed.")
}
else -> {
CancelState.Complete(result.getOrNull(), result.exceptionOrNull())
}
}
}
// 恢复delegate
continuation.resumeWith(result)
}
}
}

CancellableContinuation的状态转移如下图。

CancellableContinuation的状态转移

改造挂起函数

我们之前定义的挂起函数都是使用suspendCoroutine函数来实现挂起,若要响应协程的取消,需要将其替换成上面实现好的suspendCancellableCoroutine。

以delay函数为例,改造后的delay函数如下。

1
2
3
4
5
6
7
8
suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
if (time < 0) return

suspendCancellableCoroutine<Unit> { continuation ->
val future = executor.schedule({ continuation.resume(Unit) }, time, unit)
continuation.invokeOnCancellation { future.cancel(true) }
}
}

协程的异常处理

异常处理是异步程序需要解决的关键问题。

处理协程的未捕获异常

先定义一个异常处理器。

1
2
3
4
5
interface CoroutineExceptionHandler : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

fun handleException(context: CoroutineContext, exception: Throwable)
}

要处理协程的未捕获异常,我们需要在AbstractCoroutine中定义一个子类可见的函数,提供给子类复写,返回值为true表示异常已处理。

1
protected open fun handleJobException(e: Throwable) = false

我们以AbstractCoroutine子类StandaloneCoroutine为例,StandaloneCoroutine由launch启动,协会本身没有返回结果。我们虚妄它能够在遇到未捕获异常时,调用自身的异常处理器进行处理,如果没有异常处理器就抛出给所在线程的uncaughtExceptionHandler来处理,代码如下所示。

1
2
3
4
5
6
override fun handleJobException(e: Throwable): Boolean {
super.handleJobException(e)
context[CoroutineExceptionHandler]?.handleException(context, e) ?: Thread.currentThread()
.let { it.uncaughtExceptionHandler.uncaughtException(it, e) }
return true
}

取消异常的特别处理

协程的取消类似于线程的中断,取消时通过抛出取消异常来实现对取消状态的相应,因此上节的未捕获异常不应该包含取消异常。只需要定义一个函数来分发异常即可。

1
2
3
4
private fun tryHandleException(e: Throwable) = when(e){
is CancellationException -> false
else -> handleJobException(e)
}

接着在resumeWith中添加异常处理逻辑。

1
2
3
4
override fun resumeWith(result: Result<T>) {
··· ···
(newState as CoroutineState.Complete<T>).exception?.let(this::tryHandleException)
}

协程的作用域

协程作用域用来描述协程的作用范围,作用域既有约束作用又可以提供额外的能力。

作用域的种类

官方框架中作用域包括以下三种:

  1. 顶级作用域:没有父协程的协程所在的作用域。
  2. 协同作用域:协程中启动新的协程,新协程未所在协程的子协程,这种情况下子协程所在的作用域默认为协同作用域。子协程抛出的未捕获异常都将传递给父协程处理,同时父协程也会被取消。
  3. 主从作用域:与协程作用域在协程的父子关系一致,区别是子协程的未捕获异常将不会向上传递给父协程。

父子协程之间还存在以下规则:

  • 父协程被取消,所有子协程都会被取消。
  • 父协程需要等待子协程执行完毕才会最终进入完成状态。
  • 子协程会继承父协程的协程上下文中的元素,如果自身有相同Key的成员,将会覆盖该Key。

前面几篇文章我们已经熟悉了如何去创建一个作用域了,通常我们会将协程的启动函数定义在作用域中,同时让协程的描述类来实现作用域充当Receiver的角色,基于这两点来改造下launch函数。

1
2
3
4
5
6
7
8
9
// 定义为CoroutineScope的扩展函数
fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
): Job {
val completion = StandaloneCoroutine(context)
block.startCoroutine(completion, completion) // 带Receiver的方式启动协程
return completion
}

建立父子关系

前面说了,父协程取消之后,子协程也需要被取消,因此我们要对AbstractCoroutine稍加改造,如下。

1
2
3
4
5
6
7
8
9
10
protected val parentJob = context[Job]

private var parentCancelDisposable : Disposable? = null

init {
··· ···
parentCancelDisposable = parentJob?.invokeOnCancel {
cancel()
}
}

通过协程上下文来获取父协程,如果父协程存在,那么就需要注册一个取消回调,当父协程取消时取消掉当前协程。

顶级作用域

我们对launch函数进行改造之后,需要一个作用域来调用launch函数才能启动一个协程,但作用域又是在创建协程过程中产生的。针对这个问题,我们需要一个特殊的作用域,这个作用域不需要依赖父作用域来产生,这个作用域就叫做顶级作用域

1
2
3
4
5
6
7
8
object GlobalScope : CoroutineScope {
override val scopeContext: CoroutineContext
get() = EmptyCoroutineContext
}

GlobalScope.launch{
··· ···
}

由于协程描述类AbstractCoroutine本身实现了CoroutineScope,因此由GlobalScope.launch启动的协程体内部我们可以直接创建新的子协程。

实现异常传播

接下来考虑一下子协程如何将异常向上传递。按照现有的实现,我们已经将该异常处理传递到tryHandleException中,对于非取消异常的情况都交给了handleJobException来处理。按照协同作用域的设计,协程遇到未捕获的异常时应当优先向上传播,如果没有父协程才自行处理。因此我们添加一个函数handleChildException用于接收子协程的异常,改造一下tryHandleException:

1
2
3
4
5
6
7
8
9
10
11
protected open fun handleChildException(e: Throwable): Boolean {
cancel() // 取消父协程
return tryHandleException(e) // 继续向上传递或者自己处理
}

private fun tryHandleException(e: Throwable) = when (e) {
is CancellationException -> false
else -> (parentJob as? AbstractCoroutine<*>)?.handleChildException(e) // 交给父协程处理
?.takeIf { it }
?: handleJobException(e) // 没有父协程 自行处理
}

注释已经很完善了,当出现未捕获异常时,先尝试调用父协程的handleChildException来处理,否则自行处理。父协程也优先调用父协程的父协程来处理。

主从作用域

协同作用域的效果就是父子协程绑定,父取消则子取消,子异常则父连坐。而主从作用域则可以避免子协程出现异常而导致父协程取消的情况。

我们只需要将上面的handleChildException方法返回false即可避免子协程将异常传递给父协程,如下。

1
2
3
4
private class SupervisorCoroutine<T>(context: CoroutineContext, continuation: Continuation<T>) :
ScopeCoroutine<T>(context, continuation) {
override fun handleChildException(e: Throwable) = false
}

也很容易去创建这样一个作用域,如下。

1
2
3
4
5
6
suspend fun <R> supervisorScope(
block: suspend CoroutineScope.() -> R
): R = suspendCoroutine { continuation ->
val coroutine = SupervisorCoroutine(continuation.context, continuation)
block.startCoroutine(coroutine, coroutine)
}

主从作用域的应用场景多见于子协程为独立对等的任务实体的情况,例如Android平台的Jetpack开发包中ViewModel内置的viewModelScope就是主从作用域实现。

完整的异常处理流程

引入作用域后完整的异常处理流程如下如所示。

完整的异常处理流程

父协程等待子协程完成

作用域要求父协程必须等待子协程执行完才可以进入完成状态,因此父协程的resumeWith执行完成后需要检查子协程是否完成,未完成则需要注册完成回调,等待所有子协程状态流转为完成状态父协程才能触发完成回调并且流转为完成态。我们只需要新增一个等待的状态即可,状态流转如下所示。

完整的异常处理流程

参考