kotlin协程指北

kotlin协程指北

二月 01, 2019

什么是协程

我们知道在Linux系统中,线程是系统调度的最小单元,线程的切换由操作系统来进行。协程被称为”轻量级的线程”,但协程与线程其实完全是两个概念,协程是通过编译技术实现的,通过挂起机制来实现异步,操作系统根本就不知道它的存在,因此协程完全运行在用户态中,它的切换需要用户自己调度,在同一个线程内的协程切换没有线程上下文切换的开销,因此对于非io型的异步任务来说协程比线程具有更高的执行效率。

初体验

kotlin在语言层面支持了协程,协程库在以kotlinx开头的扩展模块中,如果是一个android项目,你的gradle依赖应该是这样的:

1
2
3
api 'org.jetbrains.kotlin:kotlin-stdlib:1.3.11'
api 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.0'
api 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.0'

学习一个新技术,在配置完环境之后,自然是run一把helloworld先爽一下^_^:

1
2
3
4
5
6
7
fun main(args: Array<String>) = runBlocking {
launch {
delay(8000)
println("world")
}
println("hello ")
}

这是我们的第一个协程,第一眼看上去可能会有点迷惑,不要紧我们来分析一下:runBlocking在当前线程开启了一个协程1,协程1中的this指向了一个CoroutineScope对象,CoroutineScope的launch方法用于开启一个新的协程2,它是协程1的子协程,会返回一个job对象(在这里我们忽略掉了),延迟一秒后打印world,开启的协程在被delay方法挂起后就会继续执行下面的打印hello的代码,父协程会在子协程全部执行完毕才退出因此hello打印完毕后会block八秒再切换到world的打印。通过协程,我们实现了在一个线程中用同步的形式来写异步代码这个愿望。

如果我们看launch中的代码不爽,想提取出来怎么办,我们可以这样写:

1
2
3
4
5
6
7
8
9
10
11
fun main(args: Array<String>) = runBlocking {
launch {
printWorld()
}
println("hello ")
}

private suspend fun printWorld() {
delay(8000)
println("world")
}

我们看到printWorld有一个suspend关键字,表示了这是一个挂起函数,挂起函数和普通函数的不同就是只有挂起函数才能调用挂起函数,因为delay是一个挂起函数,因此我们的printWorld也要标记为suspend

我们之前说过协程的切换是用户自己控制的,那么具体是怎么控制的呢,就是通过挂起点,让一个协程运行到挂起点的时候就会挂起当前协程转让其他协程,delay函数就是一个挂起点,我们再看个栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fun main(args: Array<String>) = runBlocking {
launch {
println("world")
}
//模拟一段耗时操作
Thread.sleep(4000)
println("hello ")
}
=========
/**
结果依然是
hello
world
*/
========

fun main(args: Array<String>) = runBlocking {
launch {
println("world")
}
//模拟一段耗时操作
Thread.sleep(4000)
delay(1)
println("hello ")
}

=========
/**
结果变为
world
hello
*/
========

这个栗子就佐证了协程是由用户自己来控制而非系统调度这个特性,具有类似功能的函数还有join,yield等等。

创建作用域

coroutineScope函数可以创建一个新的作用域,它的特点是会等待它的所有子协程全部运行结束才会继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fun main() = runBlocking {
// 创建一个新的协程作用域
coroutineScope {

launch {
delay(2000L)
println("launch1")
}

launch {
delay(1000L)
println("launch2")
}

println("coroutineScope end")
}

// 这一行会在coroutineScope所有子协程运行完毕后才会执行
println("Coroutine scope is over")
}
==========
coroutineScope end
launch2
launch1
Coroutine scope is over

指定线程

我们看一下开头的栗子,将其在android主线程中run这段代码你大概率会产生一个anr,因为协程默认都是在当前线程启动的,协程block的当然也会将线程block住

are you kidding me?如果协程不能切换线程,那么异步任务怎么能放心的交给它来处理呢?放心,这种基本操作协程肯定会提供的,我们launch函数有一个可选参数,它是一个协程调度器(CoroutineDispatcher),有几个常用值:

  • Dispatchers.Default 协程会在一个后台线程池中的一个线程中启动
  • Dispatchers.IO 被设计为IO线程池,但实际上和Default共用了一个线程池
  • Dispatchers.Unconfined 一个试验中的api,它的效果是启动的协程会在当前线程启动,但是在被suspend函数挂起后再继续后就会切换到suspend函数所在的线程
  • Dispatchers.Main 代表协程会在与UI交互的主线程中启动,对于android项目来说,需要引进kotlinx-coroutines-android这个模块,否则直接使用会报IllegalStateException异常
  • newSingleThreadContext() 这个方法可以开启一个新的线程

看一个简单的栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun main() = runBlocking {
repeat(10) {
launch(Dispatchers.Default) {
println("${Thread.currentThread()}")
}
}
Unit
}

============
Thread[DefaultDispatcher-worker-1,5,main]
Thread[DefaultDispatcher-worker-1,5,main]
Thread[DefaultDispatcher-worker-3,5,main]
Thread[DefaultDispatcher-worker-2,5,main]
Thread[DefaultDispatcher-worker-1,5,main]
Thread[DefaultDispatcher-worker-1,5,main]
Thread[DefaultDispatcher-worker-4,5,main]
Thread[DefaultDispatcher-worker-2,5,main]
Thread[DefaultDispatcher-worker-2,5,main]
Thread[DefaultDispatcher-worker-2,5,main]

取消与超时

如果某个协程block太久等的不耐烦了怎么办?我们之前说过launch函数会返回一个job对象,它可以对协程进行cancel,join等操作,我们来看个栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() = runBlocking {
val job = launch {
repeat(1000) { i -> //循环一千次
println("I'm sleeping $i ...")
delay(1000L)
}
}
delay(3000L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该任务并等待job结束
println("main: Now I can quit.")
}

================
/**
这段代码运行结果如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
*/

这里要对job对象做一下介绍了,Job源码中有一大段对于Job的注释,我们简单总结一下:

Job是一个具有生命周期有父子结构可取消的任务对象,在job中启动的job称为子job,父job会等待所有子job执行完毕后才结束,cancel父job会同时取消所有的子job。cancel一个挂起的job会抛出一个CancellationException异常,但这个异常并不会导致crash而只是静默处理。如果一个子job抛出了一个非CancellationException异常就会导致父job执行失败。
启动job有一个可选参数CoroutineStart.LAZY,带这个参数启动的job会有懒启动特性,在调用了job.start或者job.join才会启动。

job具有一系列的状态值:

1
2
3
4
5
6
7
8
| **State**                        | [isActive] | [isCompleted] | [isCancelled] |
| -------------------------------- | ---------- | ------------- | ------------- |
| _New_ (optional initial state) | `false` | `false` | `false` |
| _Active_ (default initial state) | `true` | `false` | `false` |
| _Completing_ (transient state) | `true` | `false` | `false` |
| _Cancelling_ (transient state) | `false` | `false` | `true` |
| _Cancelled_ (final state) | `false` | `true` | `true` |
| _Completed_ (final state) | `false` | `true` | `false` |

job生命周期图如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
*
* wait children
* +-----+ start +--------+ complete +-------------+ finish +-----------+
* | New | -----> | Active | ---------> | Completing | -------> | Completed |
* +-----+ +--------+ +-------------+ +-----------+
* | cancel / fail |
* | +----------------+
* | |
* V V
* +------------+ finish +-----------+
* | Cancelling | --------------------------------> | Cancelled |
* +------------+ +-----------+
*
*/

回到刚才的问题,job可以通过cancel取消协程,但我们再看下这个栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun main(args: Array<String>) = runBlocking {
var num = 0
var job = launch(Dispatchers.Default) {
//模拟一段cpu耗时操作
while (true) {
num++
println("num:$num")
}
}
//等待100ms
delay(100)
job.cancelAndJoin()
println("end")
}

============
/**
等待了几秒之后,log如下:
num:1446289
num:1446290
num:1446291
num:1446292
num:1446293
num:1446294
num:1446295
*/

我们发现job并没有被取消..事实上,job的cancel只能取消挂起的协程,那么如果我们想取消一个正在正常运行的协程该怎么办呢?可以这样做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main(args: Array<String>) = runBlocking {
var num = 0
var job = launch(Dispatchers.Default) {
//模拟一段cpu耗时操作
while (isActive) {
num++
println("num:$num")
}
}
//等待100ms
delay(100)
job.cancelAndJoin()
println("end")
}

==========
/**
这次终于被正常取消了
num:10103
num:10104
num:10105
end
*/

需要注意的是,这种方式取消的协程不会抛出CancellationException异常。

还有一种取消的方式就是通过设定超时时间,超时后自动cancel,一个简单的栗子就明白:

1
2
3
4
5
6
7
8
9
10
fun main() = runBlocking {
var num = 0
withTimeout(100) {
while (isActive) {
num++
}
}
println(num)
Unit
}

获取返回值

读到这里很多读者会想到,我们实际开发大部分的场景都是开启了异步操作之后希望在异步操作结束之后拿到返回值,launch只是开启了一个异步操作,并没有提供拿到结果的方式呀?这个时候就需要async登场了,通过async方法启动的协程返回值是一个Deffered对象,它也是Job的子类,可以通过await方法获取返回值,我们看个demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
fun main() = runBlocking {
val deferred1:Deferred<Int> = async {
delay(1000)
2
}
val deferred2:Deferred<Int> = async {
delay(2000)
3
}
//打印5
println(deferred1.await() + deferred2.await())
Unit
}

上下文与线程间切换

协程有它自己运行时的上下文(CoroutineContext),前面提到的调度器和Job其实都是上下文的一部分,协程切换它运行所在的线程可以通过切换它的上下文实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
//asCoroutineDispatcher是kotlin对ExecutorService增加的一个扩展函数,返回了一个包含调度器的上下文对象
Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { ctx1 ->
Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { ctx2 ->
//开启一个协程
runBlocking(ctx1) {
log("1")
//通过withContext函数实现协程上下文的切换
withContext(ctx2){
log("2")
}
log("3")
}
}
}
}

============
[pool-1-thread-1] 1
[pool-2-thread-1] 2
[pool-1-thread-1] 3

从结果可以看到协程切换到线程池2中执行完毕后又切换回当前线程

全局协程

通过GlobalScope启动的协程是全局独立的,其启动的协程采用默认的Dispatchers.Default作为调度器,与它所在的协程并没有父子关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun main() = runBlocking<Unit> {
var job = launch {
GlobalScope.launch {
delay(2000)
println("global coroutine end")
}
launch {
delay(2000)
println("child coroutine end")
}
}
delay(1000)
job.cancelAndJoin()
delay(2000)
}

============
global coroutine end

异常处理

协程的异常有两种传播方式:

  • 通过launch启动的协程在没有异常处理器的时候会将错误堆栈打印在控制台上
  • 通过async启动的协程会在await的时候抛给虚拟机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
fun main() = runBlocking {
val job = GlobalScope.launch {
throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
}
job.join()
val deferred = GlobalScope.async {
throw ArithmeticException()
}
try {
deferred.await()
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}

==============
Exception in thread "DefaultDispatcher-worker-1" java.lang.IndexOutOfBoundsException
at CoroutineKt$main$1$job$1.invokeSuspend(coroutine.kt:8)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:233)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742)
Caught ArithmeticException
```
那如果我们不想将协程中的异常打印而是进行处理呢?协程提供了CoroutineExceptionHandler,它类似Thread.uncaughtExceptionHandler,可以将launch启动的协程中的异常捕获并打印,但是对于async启动的协程没有效果:

```kotlin
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
throw AssertionError()
}
//给async传入handler没有任何效果
val deferred = GlobalScope.async(handler) {
throw ArithmeticException()
}
joinAll(job, deferred)
}

===============
handler caught java.lang.AssertionError
caught java.lang.ArithmeticException

前面说过某个子任务执行失败会导致父任务进而其他子任务也会被取消,有没有子任务不会影响到父任务的机制呢?有的,它就是监督任务,看个🌰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
supervisorScope {
val child = launch(handler) {
println("Child throws an exception")
throw AssertionError()
}
println("Scope is completing")
}
println("Scope is completed")
}

==============
Scope is completing
Child throws an exception
Caught java.lang.AssertionError
Scope is completed

并发问题

由于协程可以运行在不同的线程中,因此多个协程操作共享数据依然可能会产品并发问题,看个🌰:
首先定义一个扩展函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}

然后运行一下:

1
2
3
4
5
6
7
8
9
10
11
var counter = 0

fun main() = runBlocking<Unit> {
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
}
============
Completed 100000 actions in 176 ms
Counter = 98550

可以看到结果并没有得到100000,因为GlobalScope启动的协程是在默认线程池中启动的,实际上还是线程的并发问题。因此线程的并发解决方案和思路这里其实都是可以用的,比如将counter类型变为AtomicInteger,或者加synchronized或ReentrantLock,在协程中有加锁的替代方案:Mutex,将上面代码换成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var counter = 0
val mutex = Mutex()

fun main() = runBlocking<Unit> {
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
==========
Completed 100000 actions in 364 ms
Counter = 100000

这样结果就正常了,还有一种方案就是把协程都放在同一个线程中执行,也就不会有并并发问题了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking<Unit> {
// 使用 DefaultDispathcer 运行每个协程
GlobalScope.massiveRun {
// 但是把每个递增操作都限制在此单线程上下文中
withContext(counterContext) {
counter++
}
}
println("Counter = $counter")
}
=========
Completed 100000 actions in 596 ms
Counter = 100000

可以看到结果正常了,但是速度慢了一些,因为频繁的通过withContext切换效率比较低,更好的方式是直接在同一个线程中启动协程:

1
2
3
4
5
6
7
8
9
10
fun main() = runBlocking<Unit> {
// 在单线程上下文中运行每个协程
CoroutineScope(counterContext).massiveRun {
counter++
}
println("Counter = $counter")
}
============
Completed 100000 actions in 35 ms
Counter = 100000

这次的效率就非常高了。