当前位置: 首页 > news >正文

Kotlin协程 -> Job.join() 完整流程图与核心源码分析

🔄 完整调用流程图

👤 用户调用 job.join()↓
🔍 JobSupport.join() → 检查 isActive 状态↓
❓ isActive 状态判断├── ✅ false (已完成) → 检查 this.state│   ├── CompletedExceptionally → throw state.cause│   └── 正常完成 → return (直接返回)│└── ⏳ true (运行中)joinSuspend() 挂起等待↓🔗 suspendCancellableCoroutine { cont -> ... }↓📝 invokeOnCompletion(onCancelling = false) { cause -> ... }↓🔄 loopOnState { state -> ... } 状态循环检查↓❓ 当前状态判断├── 🏃 Incomplete → makeNode() 创建回调节点│   ↓│   🔒 _state.compareAndSet(state, state + node) 原子添加│   ↓│   📦 return node (JobNode) 作为 DisposableHandle│└── ✅ 已完成 → invokeIt(handler, state) 立即执行回调↓📞 handler(cause) 执行用户回调↓🎯 cont.resume(Unit) 或 cont.resumeWithException(cause)↓
⏸️ cancellable.getResult() → COROUTINE_SUSPENDED 挂起标记↓
🕐 等待目标协程完成...↓
🎯 目标协程完成:AbstractCoroutine.resumeWith(result)↓
📍 makeCompletingOnce(result.toState())↓
🔄 loopOnState { state -> ... } 完成状态转换↓
🔧 Completing(state.list, false, proposedUpdate) 创建完成状态↓
🔒 _state.compareAndSet(state, completing) 原子更新状态↓
📢 completeStateFinalization(state, completing)↓
🔔 notifyCompletion(list, cause) 通知所有回调↓
🔄 list.forEach<JobNode<*>> { node -> node.invoke(cause) }↓
🎯 InvokeOnCompletion.invoke()handler(cause) 执行join的回调↓
📞 CancellableContinuationImpl.resumeWith(Result.success(Unit))↓
🔒 tryResumeImpl() → _state.compareAndSet(ACTIVE, result)↓
🚀 dispatchResume(resumeMode) 派发恢复↓
🧵 CoroutineDispatcher.dispatch(context, this)↓
🎯 目标线程执行 DispatchedTask.run()↓
📞 continuation.resumeWith(Result.success(Unit))↓
🔄 BaseContinuationImpl.resumeWith() 状态机恢复↓
⚙️ 状态机.invokeSuspend() 继续执行用户代码↓
🏁 join() 方法返回,用户代码继续执行

🎯 核心源码方法明确讲解

1. 🔍 JobSupport.join() - 入口方法

// 📍 位置:kotlinx.coroutines/JobSupport.kt
public final override suspend fun join() {// 🚀 快速路径:检查协程是否已完成if (!isActive) {val state = this.stateif (state is CompletedExceptionally) {throw state.cause // ❌ 重新抛出异常}return // ✅ 正常完成,直接返回}// ⏳ 慢速路径:协程还在运行,需要挂起等待return joinSuspend()
}// 📊 isActive 属性检查
public val isActive: Boolean get() = state.let { it is Incomplete && it.isActive }

明确要点:
isActive = false 表示协程已完成(正常或异常)
isActive = true 表示协程仍在运行,需要挂起等待

2. 🔗 joinSuspend() - 挂起等待核心

private suspend fun joinSuspend(): Unit = suspendCancellableCoroutine { cont ->// 📝 关键:注册完成回调,当目标协程完成时会被调用val handle = invokeOnCompletion(onCancelling = false) { cause ->// 🎯 这个回调会在目标协程完成时执行if (cause != null) {cont.resumeWithException(cause) // ❌ 异常完成} else {cont.resume(Unit) // ✅ 正常完成}}// 🧹 设置取消处理:如果当前协程被取消,移除回调cont.invokeOnCancellation { handle.dispose() }
}

明确要点:
suspendCancellableCoroutine 创建挂起点,invokeOnCompletion 注册的回调是关键连接点,回调中的 cont.resume() 会恢复等待的协程

3. 📝 invokeOnCompletion() - 回调注册机制

public final override fun invokeOnCompletion(onCancelling: Boolean,invokeImmediately: Boolean = true,handler: CompletionHandler
): DisposableHandle {var nodeCache: JobNode<*>? = null// 🔄 状态循环:处理并发状态变化loopOnState { state ->when (state) {is Incomplete -> {// 🏃 协程还在运行:需要注册回调节点val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }// 🔒 原子操作:将回调节点添加到状态链表中if (_state.compareAndSet(state, state + node)) {return node // 返回可取消的句柄}// CAS失败,说明状态被其他线程修改,重试}else -> {// ✅ 协程已完成:立即执行回调if (invokeImmediately) {invokeIt(handler, state) // 立即调用回调}return NonDisposableHandle // 返回空句柄}}}
}

明确要点:
Incomplete 状态:协程运行中,注册回调节点,其他状态:协程已完成,立即执行回调,使用 CAS 操作保证线程安全

5. 🎯 invokeIt() - 立即执行回调

// 📍 位置:kotlinx.coroutines/JobSupport.kt
private fun invokeIt(handler: CompletionHandler, state: Any?) {try {// 🔍 提取异常原因(如果是异常完成)val cause = (state as? CompletedExceptionally)?.cause// 🎯 关键调用:执行join()注册的回调函数handler(cause) // 这里的handler就是joinSuspend中的 { cause -> ... }} catch (ex: Throwable) {// 🛡️ 回调异常处理:防止回调异常影响协程状态handleOnCompletionException(ex)}
}// 🛡️ 回调异常处理
protected open fun handleOnCompletionException(exception: Throwable) {// 将异常传递给全局异常处理器,避免影响协程状态GlobalScope.launch(Dispatchers.Unconfined) {throw exception}
}

明确要点:
handler(cause) 就是执行 joinSuspend 中注册的回调,异常隔离:回调中的异常不会影响协程的完成状态

6. 📞 执行join的回调 - 恢复等待协程

// joinSuspend中注册的回调函数
val handle = invokeOnCompletion(onCancelling = false) { cause ->// 🎯 这个回调现在被执行if (cause != null) {// ❌ 目标协程异常完成:将异常传递给等待的协程cont.resumeWithException(cause)} else {// ✅ 目标协程正常完成:恢复等待的协程cont.resume(Unit)}
}

明确要点:
contCancellableContinuationImpl 实例,cause != null 表示目标协程异常完成,cause == null 表示目标协程正常完成

7. 📞 CancellableContinuationImpl.resume() - 启动恢复

// 📍 位置:kotlinx.coroutines/CancellableContinuationImpl.kt
public override fun resume(value: T) {// 🔄 包装成功结果resumeWith(Result.success(value))
}public override fun resumeWithException(exception: Throwable) {// 🔄 包装异常结果resumeWith(Result.failure(exception))
}// 📞 核心恢复方法
override fun resumeWith(result: Result<T>) {val state = result.toState() // 转换为内部状态// 🔒 尝试原子性恢复if (tryResumeImpl(state) === TryResumeToken) {// ✅ 恢复成功:派发到目标线程执行dispatchResume(resumeMode)}// 如果tryResumeImpl失败,说明已经被恢复或取消,忽略
}

明确要点:
resume(Unit) 最终调用 resumeWith(Result.success(Unit))tryResumeImpl 确保 continuation 只能被恢复一次

8. 🔒 tryResumeImpl() - 原子恢复检查

private fun tryResumeImpl(proposedUpdate: Any?): Symbol? {// 🔄 状态循环:处理并发loopOnState { state ->when (state) {ACTIVE -> {// 🔄 从ACTIVE状态转换为恢复状态if (_state.compareAndSet(ACTIVE, proposedUpdate)) {return TryResumeToken // ✅ 恢复成功}// CAS失败,重试}is CancelledContinuation -> {// 🚫 已被取消,检查是否可以覆盖取消if (state.makeResumed()) {return TryResumeToken // ✅ 可以恢复}return null // ❌ 不能恢复}else -> return null // 已经被恢复或其他状态}}
}

明确要点:
ACTIVE → proposedUpdate:正常恢复路径,CancelledContinuation:处理取消与恢复的竞争,返回 TryResumeToken 表示恢复成功

9. 🚀 dispatchResume() - 派发恢复执行

fun dispatchResume(mode: Int) {// 🔍 再次检查是否成功恢复(防止竞争条件)if (tryResume() != TryResumeToken) return// 📍 根据恢复模式选择执行策略when (mode) {MODE_CANCELLABLE -> {// 🧵 可取消模式:检查是否需要线程切换val dispatcher = context[ContinuationInterceptor] as? CoroutineDispatcherif (dispatcher?.isDispatchNeeded(context) == true) {// 🚀 派发到指定调度器线程dispatcher.dispatch(context, this)} else {// ⚡ 当前线程执行,避免线程切换开销executeUnconfined()}}MODE_ATOMIC -> {// ⚡ 原子模式:直接在当前线程执行resumeUndispatchedWith(getSuccessfulResult())}MODE_UNDISPATCHED -> {// 🚀 不派发模式:直接执行executeUnconfined()}}
}

明确要点:
MODE_CANCELLABLEjoin 使用的默认模式,isDispatchNeeded 判断是否需要线程切换,优化:相同线程时避免不必要的派发

10. 🧵 CoroutineDispatcher.dispatch() - 线程派发

// 📍 位置:kotlinx.coroutines/CoroutineDispatcher.kt
public abstract fun dispatch(context: CoroutineContext, block: Runnable)// 🧵 典型实现(如Dispatchers.Main)
override fun dispatch(context: CoroutineContext, block: Runnable) {// 📋 将任务提交到目标线程的任务队列handler.post(block) // Android Main线程// 或者executor.execute(block) // 线程池
}

明确要点:
block 就是 DispatchedTask(即当前的 continuation),任务被放入目标线程的执行队列,实际执行时机由调度器决定

11. 🎯 DispatchedTask.run() - 目标线程执行

// 📍 位置:kotlinx.coroutines/DispatchedTask.kt
public final override fun run() {assert { resumeMode != MODE_UNINITIALIZED }val taskContext = this.taskContextvar fatalException: Throwable? = nulltry {// 🎯 获取要恢复的Continuationval delegate = delegate as DispatchedContinuation<T>val continuation = delegate.continuation// 🔄 在正确的上下文中恢复协程withContinuationContext(continuation, countOrElement) {val context = continuation.contextval state = takeState() // 获取恢复状态(Unit或异常)// 🔍 检查协程是否在执行过程中被取消val exception = getExceptionalResult(state)val job = if (exception == null && resumeMode.isCancellableMode) {context[Job]} else nullif (job != null && !job.isActive) {// 🚫 等待协程已被取消val cause = job.getCancellationException()cancelCompletedResult(state, cause)continuation.resumeWithStackTrace(cause)} else {// ✅ 正常恢复协程if (exception != null) {// ❌ 传递异常continuation.resumeWithException(exception)} else {// ✅ 传递成功结果continuation.resume(getSuccessfulResult(state))}}}} catch (e: Throwable) {// 🛡️ 处理执行异常fatalException = e} finally {// 🧹 清理工作val result = runCatching { taskContext?.afterTask() }handleFatalException(fatalException, result.exceptionOrNull())}
}

明确要点:
continuation 是等待 join 的协程的 continuationstate 包含要传递的结果(Unit 表示成功),取消检查:即使目标协程完成,等待协程可能已被取消

12. 📞 continuation.resume() - 恢复等待协程

// 这里的continuation是等待join的协程的continuation
// 最终会调用到BaseContinuationImpl.resumeWith()// 📍 位置:kotlin-stdlib/BaseContinuationImpl.kt
public final override fun resumeWith(result: Result<Any?>) {var current = thisvar param = result// 🔄 协程状态机执行循环while (true) {probeCoroutineResumed(current) // 调试探针with(current) {val completion = completion!! // 下一个continuationval outcome: Result<Any?> = try {// ⚙️ 执行当前状态机步骤val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) {// ⏸️ 需要再次挂起,退出循环return}Result.success(outcome) // ✅ 继续执行} catch (exception: Throwable) {// ❌ 状态机执行异常Result.failure(exception)}// 🧹 释放当前帧的拦截器引用releaseIntercepted()if (completion is BaseContinuationImpl) {// 📈 继续执行下一个状态机帧current = completionparam = outcome} else {// 🏁 到达顶层,完成整个协程completion.resumeWith(outcome)return}}}
}

明确要点:
这是协程状态机恢复的核心循环,invokeSuspend 是编译器生成的状态机方法,循环处理多个状态机帧的恢复

13. ⚙️ 状态机.invokeSuspend() - 用户代码继续执行

// 编译器生成的状态机代码(简化版)
override fun invokeSuspend(result: Result<Any?>): Any? {when (label) {0 -> {// 🔍 检查前一步的结果throwOnFailure(result)// 🎯 这里是join()调用点label = 1val joinResult = targetJob.join() // 挂起点if (joinResult === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED}1 -> {// ✅ join()完成,继续执行后续代码throwOnFailure(result) // 检查join是否有异常// 🏁 用户代码继续执行println("Target job completed!")return Unit}}
}

明确要点:
label = 1 表示从 join 挂起点恢复,throwOnFailure(result) 处理 join 过程中的异常,状态机继续执行 join 之后的用户代码

14. 🏁 join() 方法返回,用户代码继续

// 用户代码示例
suspend fun example() {val job = launch {delay(1000)println("Target job finished")}println("Before join")job.join() // ⏸️ 在这里挂起等待println("After join") // 🎯 从这里继续执行
}

明确要点:
join() 方法"返回"实际上是状态机的恢复
用户感知上就像是一个普通的挂起函数调用,后续代码在目标协程完成后继续执行

🤔问题 & 回答

Q1: join() 如何实现线程挂起而不阻塞线程?

A1: 通过协程状态机和 Continuation 机制实现真正的非阻塞挂起

🧵 传统阻塞 vs 协程挂起对比
// ❌ 传统阻塞方式(会阻塞线程)
fun blockingJoin() {while (job.isActive) {Thread.sleep(10) // 🚫 线程被阻塞,无法处理其他任务}
}// ✅ 协程挂起方式(不阻塞线程)
suspend fun suspendingJoin() {job.join() // ⏸️ 协程挂起,线程可以处理其他任务
}
⚙️ 挂起机制的核心原理
// 🎯 协程状态机转换
class JoinStateMachine : BaseContinuationImpl {var label = 0 // 状态标记override fun invokeSuspend(result: Result<Any?>): Any? {when (label) {0 -> {label = 1 // 📍 标记挂起点val joinResult = job.join() // 调用joinif (joinResult === COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED // ⏸️ 告诉调度器:我需要挂起}}1 -> {// 🔄 从挂起点恢复,继续执行println("Join completed!")return Unit}}}
}
🔄 线程不阻塞的关键机制
// 🧵 线程调度器的处理逻辑
class CoroutineScheduler {fun executeCoroutine(continuation: Continuation<*>) {val result = try {// ⚙️ 执行协程状态机continuation.invokeSuspend(lastResult)} catch (e: Throwable) {Result.failure(e)}when (result) {COROUTINE_SUSPENDED -> {// ⏸️ 协程挂起:不继续执行,线程可以处理其他任务// 📋 将continuation保存起来,等待后续恢复// 🧵 线程立即返回,可以执行其他协程return // 线程不阻塞!}else -> {// ✅ 协程完成:处理结果handleResult(result)}}}
}

Q2: join() 的挂起和恢复具体是如何实现的?

A2: 通过 Continuation 保存和恢复机制

📦 Continuation 保存机制
// 📦 挂起时的状态保存
suspend fun join() = suspendCancellableCoroutine<Unit> { cont ->// 🎯 关键:将continuation保存到目标Job的回调列表中val handle = invokeOnCompletion { cause ->// 📞 这个回调会在目标协程完成时被调用if (cause != null) {cont.resumeWithException(cause) // 🔄 恢复并传递异常} else {cont.resume(Unit) // 🔄 恢复并传递成功结果}}// 🧹 设置取消清理cont.invokeOnCancellation { handle.dispose() }// ⏸️ 此时函数返回COROUTINE_SUSPENDED,协程挂起// 📋 continuation被保存在Job的回调列表中// 🧵 当前线程可以去执行其他任务
}
🔄 恢复机制详解
// 🔄 恢复时的执行流程
class CancellableContinuationImpl<T> {fun resume(value: T) {// 🔒 原子性检查和更新状态if (tryResumeImpl(Result.success(value)) === TryResumeToken) {// ✅ 恢复成功,派发到目标线程dispatchResume(MODE_CANCELLABLE)}}private fun dispatchResume(mode: Int) {val dispatcher = context[ContinuationInterceptor] as? CoroutineDispatcherif (dispatcher?.isDispatchNeeded(context) == true) {// 🚀 派发到目标线程的任务队列dispatcher.dispatch(context, this)} else {// ⚡ 在当前线程直接执行executeUnconfined()}}
}

Q3: 为什么协程挂起不会导致线程泄漏?

A3: 协程与线程是 M:N 的映射关系,线程可以复用

🧵 线程复用机制
// 🧵 线程池复用示例
class CoroutineDispatcher {private val threadPool = Executors.newFixedThreadPool(4) // 固定4个线程override fun dispatch(context: CoroutineContext, block: Runnable) {// 📋 将任务提交到线程池队列threadPool.execute {try {block.run() // 🏃 执行协程任务} catch (e: Throwable) {handleException(e)}// 🔄 任务完成后,线程返回池中等待下一个任务}}
}// 📊 协程与线程的关系
// 1000个协程 → 4个线程 (M:N映射)
// 协程挂起 → 线程继续执行其他协程
// 协程恢复 → 可能在不同线程上执行

Q4: join() 如何处理协程取消的传播?

A4: 通过结构化并发和取消令牌传播

🚫 取消传播机制
// 🚫 取消传播示例
suspend fun parentCoroutine() = coroutineScope {val childJob = launch {delay(5000) // 长时间运行println("Child completed")}launch {delay(1000)println("Before join")childJob.join() // 🔗 等待子协程println("After join") // 如果父协程被取消,这里不会执行}// 🚫 2秒后取消整个scopedelay(2000)throw CancellationException("Parent cancelled")
}// 取消传播流程:
// 1. 父协程抛出CancellationException
// 2. coroutineScope捕获异常并取消所有子协程
// 3. childJob被取消
// 4. join()等待的协程收到CancellationException
// 5. join()抛出异常,等待协程也被取消

Q5: join() 与其他等待机制的区别?

A5: join() 是结构化并发的一部分,与传统等待机制有本质区别

📊 对比分析
// 📊 不同等待机制对比// ❌ Thread.join() - 阻塞线程
thread.join() // 线程被阻塞,无法处理其他任务// ❌ CountDownLatch - 阻塞线程  
latch.await() // 线程被阻塞// ❌ Future.get() - 阻塞线程
future.get() // 线程被阻塞// ✅ Coroutine.join() - 挂起协程,不阻塞线程
job.join() // 协程挂起,线程可以执行其他协程// ✅ Channel.receive() - 挂起协程
channel.receive() // 协程挂起等待数据// ✅ Deferred.await() - 挂起协程
deferred.await() // 协程挂起等待结果

Q6: join() 在异常情况下的行为?

A6: 遵循结构化并发原则,异常会正确传播

🛡️ 异常处理机制
suspend fun exceptionHandling() {val job = launch {delay(1000)throw RuntimeException("Something went wrong")}try {job.join() // 🎯 会重新抛出子协程的异常} catch (e: RuntimeException) {println("Caught exception from child: ${e.message}")// ✅ 异常被正确传播和处理}
}// 🔄 异常传播流程
// 1. 子协程抛出异常
// 2. 异常被包装为CompletedExceptionally
// 3. join()的回调收到异常
// 4. cont.resumeWithException(cause)
// 5. 等待协程收到异常并重新抛出

🎯 核心要点总结

💡 关键理解

  • 挂起 ≠ 阻塞:协程挂起释放线程,阻塞占用线程
  • 状态机机制:通过 labelcontinuation 实现挂起恢复
  • 回调转换:将异步回调转换为同步代码风格
  • 线程复用:M:N 映射实现高并发低资源消耗
  • 结构化并发:异常和取消的正确传播

🚀 性能优势

  • 零拷贝挂起:状态保存在堆栈帧中
  • 快速路径优化:已完成任务直接返回
  • 智能调度:避免不必要的线程切换
  • 内存效率:协程开销远小于线程

🛡️ 安全保障

  • 异常安全:异常正确传播不丢失
  • 取消安全:支持协作式取消
  • 资源安全:自动清理和资源管理
  • 线程安全:无锁并发设计

文章转载自:

http://vYeNtxmY.pjwfs.cn
http://xiw1xheL.pjwfs.cn
http://qQXKzQUx.pjwfs.cn
http://1sWza8DO.pjwfs.cn
http://tEzUBbcg.pjwfs.cn
http://1uM0MMOs.pjwfs.cn
http://ilDpHHFZ.pjwfs.cn
http://1vhJW8Ag.pjwfs.cn
http://Hk1GowHt.pjwfs.cn
http://oFodXs9j.pjwfs.cn
http://iRTbvYgE.pjwfs.cn
http://bhNFRy6Z.pjwfs.cn
http://iRyUdOyP.pjwfs.cn
http://muic9JCu.pjwfs.cn
http://ECAGzkjM.pjwfs.cn
http://HifNiQcZ.pjwfs.cn
http://eReKaEHB.pjwfs.cn
http://spVV428y.pjwfs.cn
http://nWLr2AuC.pjwfs.cn
http://TB0sAAOm.pjwfs.cn
http://hVXQKEFr.pjwfs.cn
http://zAckJFXm.pjwfs.cn
http://E1oLDfrO.pjwfs.cn
http://ZaAL3yAc.pjwfs.cn
http://c0W7HFGT.pjwfs.cn
http://x2x95oKP.pjwfs.cn
http://2yPOayYT.pjwfs.cn
http://NxyxFQEx.pjwfs.cn
http://olHdbT38.pjwfs.cn
http://B6EkRv8l.pjwfs.cn
http://www.dtcms.com/a/373735.html

相关文章:

  • [优选算法专题二滑动窗口——串联所有单词的子串]
  • VR森林防火模拟进行零风险演练,成本降低​
  • 玩转Docker | 使用Docker部署Kener状态页监控工具
  • Oracle 官网账号登不了?考过的证书还能下载吗?
  • Oracle 数据库高级查询语句方法
  • WSD3075DN56高性能MOS管在汽车电动助力转向系统(EPS)中的应用
  • 1.1 汽车运行滚动阻力
  • LinuxC++项目开发日志——高并发内存池(3-thread cache框架开发)
  • Android 自定义 TagView
  • 下沉一线强赋能!晓商圈多维帮扶护航城市共建者
  • YOLO12 改进、魔改|通道自注意力卷积块CSA-ConvBlock,通过动态建模特征图通道间的依赖关系,优化通道权重分配,在强化有效特征、抑制冗余信息
  • 提升数据库性能的秘密武器:深入解析慢查询、连接池与Druid监控
  • 中间件的日志分析
  • 机器宠物外壳设计的详细流程
  • OpenCV C++ 二值图像分析:从连通组件到轮廓匹配
  • Java分页 Element—UI
  • Flow-GRPO: Training Flow Matching Models via Online RL
  • C#中解析XML时遇到注释节点报错
  • 联邦学习辅导流程
  • MySQL MVCC原理
  • QSS加载失败的奇葩问题--已解决
  • 一体化伺服电机在管道焊缝检测爬行机器人中的应用案例
  • flowable发起申请后无法查看申请记录
  • 鸿蒙实现APP和网页跳转方案总结
  • 【数据结构与算符Trip第2站】稀疏数组
  • 国产EtherCAT从站芯片FCE1353与N32G435 MCU功能板测试流程
  • 0908 C++标准模板库和异常处理
  • 【PostgreSQL内核学习:基于 ExprState 的哈希计算优化—— GROUP BY 与 SubPlan 的性能提升】
  • Hive基础简介
  • Hive实战(一)