脚本宝典收集整理的这篇文章主要介绍了【kotlin从摸索到探究】- 协程的执行流程,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
这篇文章将从源码的角度,分析携程的执行流程,我们创建一个携程,系统是怎么进行调度的,什么时候执行的,是否需要创建新线程等等,带着这些疑问,一起往下看吧。
fun main(): UnIT = runBlocking {
launch {
PRintln("${treadName()}======1")
}
GlobalScoPE.launch {
println("${treadName()}======3")
}
launch {
println("${treadName()}======2")
}
println("${treadName()}======4")
Thread.sleep(2000)
}
输出如下F1a;
DefaultDispatcher-worker-1======3
main======4
main======1
main======2
Process finished with exit code 0
根据打印,如果根据单线程执行流程来看,是不是感觉上面的日志打印顺序有点不好理解,下面我们就逐步来进行分解。
runBlocking携程体
这里将其它代码省略到了,我这里都是按照一条简单的执行流程进行讲解。
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val EventLoop: EventLoop?
val newContext: CoroutineContext
...
if (contextInterceptor == null) {
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
}
...
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
看一下eventLoop
的初始化,会 在当前线程(主线程)创建BlockingEventLoop对象。
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
看一下newContext
初始化,这里会对携程上下文进行组合,返回新的上下文。最后返回的是一个BlockingEventLoop对象。
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + Coroutineid(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
开始对携程进行调度
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
看一下执行这句代码之前,各变量的值
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w4u0tSiL-1631697299327)(//upload-images.jianshu.io/upload_images/3341206-0d228d7431b736a4.png?imageMogr2/auto-orient/strip|imageView2/2/w/566/format/webp)]
而上面的代码最终调用的是CoroutineStart.DEFAULT
的invoke
方法。
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
我们使用的是DEFAULT
启动模式。然后会执行resumeCancellableWith
方法。
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
dispatcher
是BlockingEventLoop
对象,没有重写isDispatchNeeded
,默认返回true。然后调用dispatch
继续进行分发。BlockingEventLoop
继承了EventLoopImplBase
并调用其dispatch
方法。把任务加入到队列中。
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
回到最开始,在coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
执行完,还执行了coroutine.joinBlocking()
看一下实现。
fun joinBlocking(): T {
registerTimeLoopthread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@SupPress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
执行val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
,取出任务进行执行,也就是runBlocking
携程体。
launch {}
执行流程
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
因为launch
是直接在runBlocking(父携程体)
里新的创建的子携程体,所以执行流程上和之前将的差不多,只不过不会像runBlocking
再去创建BlockingEventLoop
对象,而是直接用runBlocking(父携程体)
的,然后把任务加到里面,所以通过这种方式其实就是单线程对任务的调度
而已。所以在runBlocking(父携程体)
内通过launch
启动再多的携程体,其实都是在同一线程,按照任务
队列的顺序执行的。
根据上面日志输出,并没有先执行两个
launch
携程体,这是为什么呢,根据上面的讲解,应用知道,runBlocking(父携程体)
是第一被添加的队列的任务,其次是launch
,所以是这样的顺序。那可以让launch
立即执行吗?答案是可以的,这就要说携程的启动模式了。
CoroutineStart 是协程的启动模式
,存在以下4种模式:
我们使用
UNDISPATCHED
就可以使携程体马上在当前线程执行。看一下是怎么实现的。看一下实现:
使用这种启动模式执行UNDISPATCHED -> block.startCoroutineUndispatched(completion)
方法。
internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
startDirect(completion) { actualCompletion ->
withCoroutineContext(completion.context, null) {
startCoroutineUninterceptedOrReturn(actualCompletion)
}
}
}
大家可以自己点击去看一下,大概就是会立即执行携程体,而不是将任务放入队列。
但是
GlobalScope.launch
却不是按照这样的逻辑,这是因为GlobalScope.launch
启动的全局携程,是一个独立的携程体了,并不是runBlocking(父携程体)
子携程。看一下通过GlobalScope.launch
有什么不同。
GlobalScope.launch执行流程
GlobalScope.launch
```kotlin
`newCoroutineContext(context)`返回`Dispatchers.Default`对象。而defaultscheduler继承了ExperimentalCoroutineDispatcher类。看一下`ExperimentalCoroutineDispatcher`中的`dispatch`代码:
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = … coroutineScheduler.dispatch(block) …
看一下`coroutineScheduler`初始化
```kotlin
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
CoroutineScheduler
实现了Executor
接口,里面还有两个全局队列和线程池相关的参数。
@JvmField
val globalCpuQueue = GlobalQueue()
@JvmField
val globalBlockingQueue = GlobalQueue()
继续调用CoroutineScheduler
中的dispatch
方法
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
val task = createTask(block, taskContext)
包装成TaskImpl
对象。
val currentWorker = currentWorker()
当前是主线程,运行程序时由进程创建,肯定不是Worker
对象,Worker
是一个继承了Thread
的类 ,并且在初始化时都指定为守护线程
。
Worker存在5种状态:
CPU_ACQUIred 获取到cpu权限
BLOCKING 正在执行IO阻塞任务
PARKING 已处理完所有任务,线程挂起
DORMANT 初始态
TERMINATED 终止态
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
由于currentWorker是null,直接返回task
对象。addToGlobalQueue(notAdded)
根据任务是否是阻塞任务,将task
添加到全局任务队列中。这里被添加到globalCpuQueue
中。signalCpuWork()
来唤醒一个线程或者启动一个新的线程。 fun signalCpuWork() {
if (tryUnpark()) return
if (tryCreateWorker()) return
tryUnpark()
}
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
val created = createdWorkers(state)// 创建的的线程总数
val blocking = blockingTasks(state)// 处理阻塞任务的线程数量
val cpuWorkers = (created - blocking).coerceAtLeast(0)//得到非阻塞任务的线程数量
if (cpuWorkers < corePoolSize) {// 小于核心线程数量,进行线程的创建
val newCpuWorkers = createNewWorker()
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()// 当前非阻塞型线程数量为1,同时核心线程数量大于1时,再进行一个线程的创建,
if (newCpuWorkers > 0) return true
}
return false
}
// 创建线程
private fun createNewWorker(): Int {
synchronized(workers) {
...
val created = createdWorkers(state)// 创建的的线程总数
val blocking = blockingTasks(state)// 阻塞的线程数量
val cpuWorkers = (created - blocking).coerceAtLeast(0) // 得到非阻塞线程数量
if (cpuWorkers >= corePoolSize) return 0//超过最大核心线程数,不能进行新线程创建
if (created >= maxPoolSize) return 0// 超过最大线程数限制,不能进行新线程创建
...
val worker = Worker(newIndex)
workers[newIndex] = worker
require(newIndex == incrementCreatedWorkers())
worker.start()// 线程启动
return cpuWorkers + 1
}
}
那么这里面的任务又是怎么调度的呢,当全局任务被执行的时候,看一下Worker
中的run
方法:
override fun run() = runWorker()
执行runWorker
方法,该方法会从队列中找到执行任务,然后开始执行。详细代码,可以自行翻阅。
所以
GlobalScope.launch
使用的就是线程池,没有所谓的性能好。
Dispatchers调度器
Dispatchers是协程中提供的线程调度器,用来切换线程,指定协程所运行的线程。,上面用的是默认调度器Dispatchers.Default
。Dispatchers中提供了4种类型调度器:
Default 默认调度器
:适合CPU密集型任务调度器 比如逻辑计算;Main UI调度器
Unconfined 无限制调度器
:对协程执行的线程不做限制,协程恢复时可以在任意线程;IO调度器
:适合IO密集型任务调度器 比如读写文件,网络请求等。
由于水平有限,**有错误的地方在所难免,未免误导他人,欢迎大佬指正!**码字不易,感谢大家的点赞关注!🙏有一起学习的小伙伴可以关注下我的公众号——【❤️程序猿养成中心❤️
】每周会定期做关于AndROId的技术分享。
以上是脚本宝典为你收集整理的【kotlin从摸索到探究】- 协程的执行流程全部内容,希望文章能够帮你解决【kotlin从摸索到探究】- 协程的执行流程所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。