深入理解Kotlin协程(三)——Kotlin协程框架初探(一)

前面四篇文章我们了解了如何使用createCoroutinestartCoroutine这两个API来创建和启动简单协程,并且可以使用它们来构造复合协程。但客观的说,这两个API并不太适合直接在业务开发中使用,所以Kotlin协程为开发者提供了一些更贴近业务的复合协程API,即官方协程框架kotlinx.coroutines。后面几篇文章我们就来尝试窥探一下这个庞大的框架的一角。

协程的描述

对于协程的创建,官方框架中根据不同的目的提供了不同的构造器。

协程的描述类

Java中提供了线程的Java描述类Thread,通过调用Thread#start方法我们就可以通知系统启动一个线程,我们也知道Thread#run方法即为线程的执行代码,同时Thread也提供了类型join、interrupt、isAlive等方法来方便我们操作线程。类似的,我们也需要这样一个类来描述协程,按照官方框架的做法把它命名为Job,API设计如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
interface Job : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<Job>

override val key: CoroutineContext.Key<*>
get() = Job

/**
* 判断协程是否扔在执行
*/
val isActive: Boolean

/**
* 协程取消回调
*/
fun invokeOnCancel(onCancel: OnCancel): Disposable

/**
* 协程完成回调
*/
fun invokeOnCompletion(onComplete: OnCompelete): Disposable

/**
* 取消协程
*/
fun cancel()

/**
* 移除回调
*/
fun remove(disposable: Disposable)

/**
* 与[Thread.join]类似,挂起外部协程直到当前协程完成
*/
suspend fun join()
}

协程的状态

我们对协程的状态进行封装,让其状态管理更加简便。主要是未完成已取消已完成这三种状态。状态的定义如下所示。

1
2
3
4
5
sealed class CoroutineState {
class Incomplete : CoroutineState()
class Cancelling : CoroutineState()
class Complete<T>(val value: T? = null, val exception: Throwable? = null) : CoroutineState()
}

进一步解释下这三种状态:

  • Incomplete:协程启动后立即进入该状态,直到完成或者被取消。
  • Cancelling:协程执行中被取消后进入该状态。进入该状态后,要等待协程体内部的挂起函数调用相应取消,相应后协程成功被取消或者抛出CancellationException取消,最终会流转为Complete状态。
  • Complete:协程执行完成进入该状态。

支持回调的状态

注册回调时,需要根据当前状态的不同采取不同的处理方式,回调注册的操作也必须是原子操作,否则会有状态不一致问题。跟上篇文章相同,我们在状态流转时采用元子类来处理原子操作,比加锁性能会有较大提升。

1
2
3
4
5
6
7
8
9
protected val state = AtomicReference<CoroutineState>()

override fun cancel() {
val newState = state.updateAndGet { prev ->
when (prev) {
// 返回新状态
}
}
}

调用updateAndGet,在Lambda表达式中返回新的状态,内部会采用CAS操作来更新新状态,如果更新不成功,Lambda表达式会重复调用。

用于存放回调的数据结构也必须支持并发安全。这里我们提供一个递归列表,它具有不变形,如下所示。

1
2
3
4
5
6
7
sealed class DisposableList {
object Nil : DisposableList()
class Cons(
val head: Disposable,
val tail: DisposableList
) : DisposableList()
}

通过递归来实现对该列表的访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun DisposableList.remove(disposable: Disposable): DisposableList {
return when (this) {
DisposableList.Nil -> this
is DisposableList.Cons -> {
if (head == disposable) tail
else DisposableList.Cons(head, tail.remove(disposable))
}
}
}

tailrec fun DisposableList.forEach(action: (Disposable) -> Unit): Unit = when (this) {
DisposableList.Nil -> Unit
is DisposableList.Cons -> {
action(this.head)
this.tail.forEach(action)
}
}

inline fun <reified T : Disposable> DisposableList.loopOn(crossinline action: (T) -> Unit) =
forEach {
when (it) {
is T -> action(it)
}
}

我们把这个不变列表添加到状态中,在状态发生变化时,上一个状态的回调可以传递给新状态,确保已注册的回调不丢失。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sealed class CoroutineState {

private var disposableList: DisposableList = DisposableList.Nil

/**
* 拿到上一个状态的所有回调
*/
fun from(state: CoroutineState): CoroutineState {
disposableList = state.disposableList
return this
}

/**
* 添加一个回调
*/
fun with(disposable: Disposable): CoroutineState {
disposableList = DisposableList.Cons(disposable, disposableList)
return this
}

/**
* 移除一个回调
*/
fun without(disposable: Disposable): CoroutineState {
disposableList = this.disposableList.remove(disposable)
return this
}

/**
* 清除所有回调
*/
fun clear() {
disposableList = DisposableList.Nil
}
}

协程的初步实现

定义好状态后,接下来要为状态机输入事件,定义一个Job的抽象子类如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
abstract class AbstractCoroutine<T>(override var context: CoroutineContext) : Job, Continuation<T> {
/**
* 原子状态机
*/
protected val state = AtomicReference<CoroutineState>()

override val context: CoroutineContext

val isCompleted
get() = state.get() is CoroutineState.Complete<*>

override val isActive: Boolean
get() = when (state.get()) {
is CoroutineState.Complete<*>,
is CoroutineState.Cancelling -> false
else -> true
}

init {
state.set(CoroutineState.Incomplete()) // 初始化为Incomplete状态
this.context = context + this
}
... ...
}

AbstractCoroutine同时实现了Continuation接口,这个我们已经熟悉了,是为了能够传入作为completion在启动时传入协程,以监听协程的完成事件。

协程的创建

我们在协程的描述中定义好了协程应该具备哪些能力,接下来要考虑如何封装协程的创建。

无返回值的launch

如果协程的返回值是为Unit,那我们可以认为它无返回值,对于这种协程只需要启动它即可。

1
2
3
4
5
launch {
println(1)
delay(1000)
println(2)
}

launch的实现如下。

1
2
3
4
5
6
7
fun launch(context: CoroutineContext = EmptyCoroutineContext, block: suspend () -> Unit): Job {
val completion = StandaloneCoroutine(context)
block.startCoroutine(completion)
return completion
}

class StandaloneCoroutine(context: CoroutineContext) : AbstractCoroutine<Unit>(context)

实现invokeOnCompletion

用launch创建的协程可以立即运行起来,如果我们知道它什么时候结束,可以通过注册OnComplete回调来做到这一点。我们需要做两件事:

  1. 将回调注册到协程中。
  2. 在协程完成时通知回调。

Job接口中定义的OnComplete实际只是一个函数,声明如下:

1
typealias OnComplete = () -> Unit

这里并没有携带任何参数,因为协程执行完成的结果我们有更好的方式去获取,这里的OnComplete仅仅用于通知协程执行结束。但对于协程内部来说,我们需要获取结果来进行状态流转,所以这里定义一个doOnCompleted函数在注册获取结果的回调,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
override fun invokeOnCompletion(onComplete: OnComplete): Disposable {
return doOnCompleted { _ -> onComplete() }
}

protected fun doOnCompleted(block: (Result<T>) -> Unit): Disposable {
val disposable = CompletionHandlerDisposable(this, block)
val newState = state.updateAndGet { prev ->
when (prev) {
is CoroutineState.Incomplete -> {
CoroutineState.Incomplete().from(prev).with(disposable)
}
is CoroutineState.Cancelling -> {
CoroutineState.Cancelling().from(prev).with(disposable)
}
is CoroutineState.Complete<*> -> prev
}
}
(newState as? CoroutineState.Complete<T>)?.let {
block(
when {
it.value != null -> Result.success(it.value)
it.exception != null -> Result.failure(it.exception)
else -> throw IllegalStateException("Won't happen.")
}
)
}
return disposable
}

class CompletionHandlerDisposable<T>(
private val job: Job,
private val OnComplete: (Result<T>) -> Unit
) : Disposable {

override fun dispose() {
job.remove(this)
}
}

这里需要注意的是,除了Complete状态时我们可以直接回调OnComplete,其它状态的流转我们都需要构造一个新的状态对象来确保并发安全

注册回调的过程分为以下三步:

  1. 构造一个CompletionHandlerDisposable对象。它有一个disposable函数,用于将对应的回调移除。
  2. 检查状态,并将回调添加到状态中。
  3. 在状态流转成功后,获得最终的状态,如果是Complete状态则立即回调OnComplete。

Job的remove函数还未实现,来看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override fun remove(disposable: Disposable) {
state.updateAndGet { prev ->
when (prev) {
is CoroutineState.Incomplete -> {
CoroutineState.Incomplete().from(prev).without(disposable)
}
is CoroutineState.Cancelling -> {
CoroutineState.Cancelling().from(prev).without(disposable)
}
is CoroutineState.Complete<*> -> {
prev
}
}
}
}

实现与doOnCompleted刚好相反。

接下来思考下如何通知回调。想一下如何知道协程执行完毕呢,没错就是当AbstractCoroutine#resumeWith函数调用时,协程执行完毕。因此我们只需要在AbstractCoroutine#resumeWith函数中将协程流转为完成状态,并且通知此前注册的完成回调即可。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override fun resumeWith(result: Result<T>) {
val newState = state.updateAndGet { prev ->
when (prev) {
is CoroutineState.Cancelling,
is CoroutineState.Incomplete -> {
CoroutineState.Complete(result.getOrNull(), result.exceptionOrNull()).from(prev)
}
is CoroutineState.Complete<*> -> throw IllegalStateException("Already completed!")
}
}
// 通知完成回调
newState.notifyCompletion(result)
newState.clear()
}

[CoroutineState.kt]
fun <T> notifyCompletion(result: Result<T>) {
this.disposableList.loopOn<CompletionHandlerDisposable<T>> {
it.onComplete(result)
}
}

这里Cancelling会流转为Complete的关键是,协程被取消后并不会立即停止执行,而是要等待内部的挂起点相应,这个我们后面再详细讨论。

实现join

join是一个挂起函数,它被调用时会有两种情况:

  • 被等待的协程已经完成,join不会挂起而是立即返回。
  • 被等待的协程未完成,join立即挂起,直到协程完成。

由于上面已经实现了完成回调,因此join只需要判断是否挂起,以及在挂起后被等待协程完成时回调中使用resume恢复挂起即可,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
override suspend fun join() {
when (state.get()) {
is CoroutineState.Incomplete,
is CoroutineState.Cancelling -> return joinSuspend()
is CoroutineState.Complete<*> -> return
}
}

private suspend fun joinSuspend() = suspendCoroutine<Unit> { continuation ->
doOnCompleted { result ->
continuation.resume(Unit)
}
}

有返回值的async

现在我们已经知道如何启动协程并等待协程执行完成,不过很多时候我们更想拿到协程的返回值,因此我们基于Job再定义一个接口Deferred如下。

1
2
3
interface Deferred<T> : Job {
suspend fun await(): T
}

这里多了一个泛型参数T,T表示返回值类型,通过它的await函数也可以拿到这个返回值,await的作用主要是:

  1. 在协程已经执行完成时,立即返回协程的结果,或者异常。
  2. 如果协程未完成,则挂起直到当前协程执行完成。

该方法与join类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override suspend fun await(): T {
val currentState = state.get()
return when (currentState) {
is CoroutineState.Incomplete,
is CoroutineState.Cancelling -> awaitSuspend()
is CoroutineState.Complete<*> -> {
currentState.exception?.let { throw it } ?: (currentState.value as T)
}
}
}

private suspend fun awaitSuspend() = suspendCoroutine<T> { continuation ->
doOnCompleted { result -> continuation.resumeWith(result) }
}

接下来可以写出有返回值的async函数的实现了,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun <T> async(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend () -> T
): Deferred<T> {
val completion = DeferredCoroutine<T>(context)
block.startCoroutine(completion)
return completion
}

// 使用举例
val deferred = async {
getValue()
}
val result = deferred.await()
println(result)

这与我们前几篇实现的async/await仅有细节的差别。

协程的调度

我们已经大致实现了一个比较完整的复合协程,不过还要一个问题,如何指定协程的运行线程,或者说如何对协程进行线程调度?

协程的调度位置

协程在挂起点位置可能需要进行调度,为什么说可能,前几篇文章已经说过了,只有发生了事实上的异步行为时,才需要调度。我们再来回顾一下什么是事实上的异步:

  • 挂起点对应的挂起函数内部切换了线程,并在该线程内部调用Continuation#resume来恢复。
  • 挂起函数内部通过事件循环机制将Continuation的恢复调用转移到了新的线程调用栈上,例如Android平台上的Handler#post。
  • 挂起函数内部将Continuation实例白村,在后续某个时间再恢复调用。

综上所述,只有恢复和挂起不在同一个函数调用栈执行的情况,我们才有机会实现调度,而实现调度需要使用协程的拦截器

协程的调度器设计

调度的本质是利用拦截器将协程的恢复调用转移到一个特定的线程上,由此可以写出下列代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
interface Dispatcher {
fun dispatch(block: () -> Unit)
}

open class DispatcherContext(private val dispatcher: Dispatcher) :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatcherContinuation(continuation, dispatcher)
}

private class DispatcherContinuation<T>(val delegate: Continuation<T>, val dispatcher: Dispatcher) :
Continuation<T> {

override val context: CoroutineContext
get() = delegate.context

override fun resumeWith(result: Result<T>) {
dispatcher.dispatch {
delegate.resumeWith(result)
}
}
}

基于线程池的调度器

我们最常见的调度场景就是制定代码的执行线程,而在Java平台上使用线程池是比较好的做法,官方调度器也是如此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object DefaultDispatcher : Dispatcher {

private val threadGroup = ThreadGroup("DefaultDispatcher")

private val threadIndex = AtomicInteger(0)

private val executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1) { runnable ->
Thread(
threadGroup,
runnable,
"${threadGroup.name}-worker-${threadIndex.getAndDecrement()}"
).apply { isDaemon = true }
}

override fun dispatch(block: () -> Unit) {
executor.submit(block)
}
}

object Dispatchers {
val Default by lazy {
DispatcherContext(DefaultDispatcher)
}
}

这里我们创建了一个CPU密集型线程池,并且线程全部设置为守护线程。使用方法如下:

1
2
3
4
5
launch(Dispatchers.Default) {
println(1)
delay(2000)
println(2)
}

println(1)和println(2)都将运行在Default调度器对应线程上。

基于UI事件循环的调度器

Android开发者比较关心如何将协程调度到主线程上。这个比较简单,直接使用Handler#post即可将协程体发送到主线程消息循环中。

1
2
3
4
5
6
7
8
object AndroidMainDispatcher : Dispatcher {

private val handler = Handler(Looper.getMainLooper())

override fun dispatch(block: () -> Unit) {
handler.post(block)
}
}

参考