Kotlin协程 -> Job.join() 完整流程图与核心源码分析
🔄 完整调用流程图
👤 用户调用 job.join()↓
🔍 JobSupport.join() → 检查 isActive 状态↓
❓ isActive 状态判断├── ✅ false (已完成) → 检查 this.state│ ├── CompletedExceptionally → throw state.cause│ └── 正常完成 → return (直接返回)│└── ⏳ true (运行中) → joinSuspend() 挂起等待↓🔗 suspendCancellableCoroutine { cont -> ... }↓📝 invokeOnCompletion(onCancelling = false) { cause -> ... }↓🔄 loopOnState { state -> ... } 状态循环检查↓❓ 当前状态判断├── 🏃 Incomplete → makeNode() 创建回调节点│ ↓│ 🔒 _state.compareAndSet(state, state + node) 原子添加│ ↓│ 📦 return node (JobNode) 作为 DisposableHandle│└── ✅ 已完成 → invokeIt(handler, state) 立即执行回调↓📞 handler(cause) 执行用户回调↓🎯 cont.resume(Unit) 或 cont.resumeWithException(cause)↓
⏸️ cancellable.getResult() → COROUTINE_SUSPENDED 挂起标记↓
🕐 等待目标协程完成...↓
🎯 目标协程完成:AbstractCoroutine.resumeWith(result)↓
📍 makeCompletingOnce(result.toState())↓
🔄 loopOnState { state -> ... } 完成状态转换↓
🔧 Completing(state.list, false, proposedUpdate) 创建完成状态↓
🔒 _state.compareAndSet(state, completing) 原子更新状态↓
📢 completeStateFinalization(state, completing)↓
🔔 notifyCompletion(list, cause) 通知所有回调↓
🔄 list.forEach<JobNode<*>> { node -> node.invoke(cause) }↓
🎯 InvokeOnCompletion.invoke() → handler(cause) 执行join的回调↓
📞 CancellableContinuationImpl.resumeWith(Result.success(Unit))↓
🔒 tryResumeImpl() → _state.compareAndSet(ACTIVE, result)↓
🚀 dispatchResume(resumeMode) 派发恢复↓
🧵 CoroutineDispatcher.dispatch(context, this)↓
🎯 目标线程执行 DispatchedTask.run()↓
📞 continuation.resumeWith(Result.success(Unit))↓
🔄 BaseContinuationImpl.resumeWith() 状态机恢复↓
⚙️ 状态机.invokeSuspend() 继续执行用户代码↓
🏁 join() 方法返回,用户代码继续执行
🎯 核心源码方法明确讲解
1. 🔍 JobSupport.join()
- 入口方法
// 📍 位置:kotlinx.coroutines/JobSupport.kt
public final override suspend fun join() {// 🚀 快速路径:检查协程是否已完成if (!isActive) {val state = this.stateif (state is CompletedExceptionally) {throw state.cause // ❌ 重新抛出异常}return // ✅ 正常完成,直接返回}// ⏳ 慢速路径:协程还在运行,需要挂起等待return joinSuspend()
}// 📊 isActive 属性检查
public val isActive: Boolean get() = state.let { it is Incomplete && it.isActive }
明确要点:
isActive = false
表示协程已完成(正常或异常)
isActive = true
表示协程仍在运行,需要挂起等待
2. 🔗 joinSuspend()
- 挂起等待核心
private suspend fun joinSuspend(): Unit = suspendCancellableCoroutine { cont ->// 📝 关键:注册完成回调,当目标协程完成时会被调用val handle = invokeOnCompletion(onCancelling = false) { cause ->// 🎯 这个回调会在目标协程完成时执行if (cause != null) {cont.resumeWithException(cause) // ❌ 异常完成} else {cont.resume(Unit) // ✅ 正常完成}}// 🧹 设置取消处理:如果当前协程被取消,移除回调cont.invokeOnCancellation { handle.dispose() }
}
明确要点:
suspendCancellableCoroutine
创建挂起点,invokeOnCompletion
注册的回调是关键连接点,回调中的 cont.resume()
会恢复等待的协程
3. 📝 invokeOnCompletion()
- 回调注册机制
public final override fun invokeOnCompletion(onCancelling: Boolean,invokeImmediately: Boolean = true,handler: CompletionHandler
): DisposableHandle {var nodeCache: JobNode<*>? = null// 🔄 状态循环:处理并发状态变化loopOnState { state ->when (state) {is Incomplete -> {// 🏃 协程还在运行:需要注册回调节点val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }// 🔒 原子操作:将回调节点添加到状态链表中if (_state.compareAndSet(state, state + node)) {return node // 返回可取消的句柄}// CAS失败,说明状态被其他线程修改,重试}else -> {// ✅ 协程已完成:立即执行回调if (invokeImmediately) {invokeIt(handler, state) // 立即调用回调}return NonDisposableHandle // 返回空句柄}}}
}
明确要点:
Incomplete
状态:协程运行中,注册回调节点,其他状态:协程已完成,立即执行回调,使用 CAS
操作保证线程安全
5. 🎯 invokeIt()
- 立即执行回调
// 📍 位置:kotlinx.coroutines/JobSupport.kt
private fun invokeIt(handler: CompletionHandler, state: Any?) {try {// 🔍 提取异常原因(如果是异常完成)val cause = (state as? CompletedExceptionally)?.cause// 🎯 关键调用:执行join()注册的回调函数handler(cause) // 这里的handler就是joinSuspend中的 { cause -> ... }} catch (ex: Throwable) {// 🛡️ 回调异常处理:防止回调异常影响协程状态handleOnCompletionException(ex)}
}// 🛡️ 回调异常处理
protected open fun handleOnCompletionException(exception: Throwable) {// 将异常传递给全局异常处理器,避免影响协程状态GlobalScope.launch(Dispatchers.Unconfined) {throw exception}
}
明确要点:
handler(cause)
就是执行 joinSuspend
中注册的回调,异常隔离:回调中的异常不会影响协程的完成状态
6. 📞 执行join
的回调 - 恢复等待协程
// joinSuspend中注册的回调函数
val handle = invokeOnCompletion(onCancelling = false) { cause ->// 🎯 这个回调现在被执行if (cause != null) {// ❌ 目标协程异常完成:将异常传递给等待的协程cont.resumeWithException(cause)} else {// ✅ 目标协程正常完成:恢复等待的协程cont.resume(Unit)}
}
明确要点:
cont
是 CancellableContinuationImpl
实例,cause != null
表示目标协程异常完成,cause == null
表示目标协程正常完成
7. 📞 CancellableContinuationImpl.resume()
- 启动恢复
// 📍 位置:kotlinx.coroutines/CancellableContinuationImpl.kt
public override fun resume(value: T) {// 🔄 包装成功结果resumeWith(Result.success(value))
}public override fun resumeWithException(exception: Throwable) {// 🔄 包装异常结果resumeWith(Result.failure(exception))
}// 📞 核心恢复方法
override fun resumeWith(result: Result<T>) {val state = result.toState() // 转换为内部状态// 🔒 尝试原子性恢复if (tryResumeImpl(state) === TryResumeToken) {// ✅ 恢复成功:派发到目标线程执行dispatchResume(resumeMode)}// 如果tryResumeImpl失败,说明已经被恢复或取消,忽略
}
明确要点:
resume(Unit)
最终调用 resumeWith(Result.success(Unit))
,tryResumeImpl
确保 continuation
只能被恢复一次
8. 🔒 tryResumeImpl()
- 原子恢复检查
private fun tryResumeImpl(proposedUpdate: Any?): Symbol? {// 🔄 状态循环:处理并发loopOnState { state ->when (state) {ACTIVE -> {// 🔄 从ACTIVE状态转换为恢复状态if (_state.compareAndSet(ACTIVE, proposedUpdate)) {return TryResumeToken // ✅ 恢复成功}// CAS失败,重试}is CancelledContinuation -> {// 🚫 已被取消,检查是否可以覆盖取消if (state.makeResumed()) {return TryResumeToken // ✅ 可以恢复}return null // ❌ 不能恢复}else -> return null // 已经被恢复或其他状态}}
}
明确要点:
ACTIVE → proposedUpdate:
正常恢复路径,CancelledContinuation:
处理取消与恢复的竞争,返回 TryResumeToken
表示恢复成功
9. 🚀 dispatchResume()
- 派发恢复执行
fun dispatchResume(mode: Int) {// 🔍 再次检查是否成功恢复(防止竞争条件)if (tryResume() != TryResumeToken) return// 📍 根据恢复模式选择执行策略when (mode) {MODE_CANCELLABLE -> {// 🧵 可取消模式:检查是否需要线程切换val dispatcher = context[ContinuationInterceptor] as? CoroutineDispatcherif (dispatcher?.isDispatchNeeded(context) == true) {// 🚀 派发到指定调度器线程dispatcher.dispatch(context, this)} else {// ⚡ 当前线程执行,避免线程切换开销executeUnconfined()}}MODE_ATOMIC -> {// ⚡ 原子模式:直接在当前线程执行resumeUndispatchedWith(getSuccessfulResult())}MODE_UNDISPATCHED -> {// 🚀 不派发模式:直接执行executeUnconfined()}}
}
明确要点:
MODE_CANCELLABLE
是 join
使用的默认模式,isDispatchNeeded
判断是否需要线程切换,优化:相同线程时避免不必要的派发
10. 🧵 CoroutineDispatcher.dispatch() - 线程派发
// 📍 位置:kotlinx.coroutines/CoroutineDispatcher.kt
public abstract fun dispatch(context: CoroutineContext, block: Runnable)// 🧵 典型实现(如Dispatchers.Main)
override fun dispatch(context: CoroutineContext, block: Runnable) {// 📋 将任务提交到目标线程的任务队列handler.post(block) // Android Main线程// 或者executor.execute(block) // 线程池
}
明确要点:
block
就是 DispatchedTask
(即当前的 continuation
),任务被放入目标线程的执行队列,实际执行时机由调度器决定
11. 🎯 DispatchedTask.run()
- 目标线程执行
// 📍 位置:kotlinx.coroutines/DispatchedTask.kt
public final override fun run() {assert { resumeMode != MODE_UNINITIALIZED }val taskContext = this.taskContextvar fatalException: Throwable? = nulltry {// 🎯 获取要恢复的Continuationval delegate = delegate as DispatchedContinuation<T>val continuation = delegate.continuation// 🔄 在正确的上下文中恢复协程withContinuationContext(continuation, countOrElement) {val context = continuation.contextval state = takeState() // 获取恢复状态(Unit或异常)// 🔍 检查协程是否在执行过程中被取消val exception = getExceptionalResult(state)val job = if (exception == null && resumeMode.isCancellableMode) {context[Job]} else nullif (job != null && !job.isActive) {// 🚫 等待协程已被取消val cause = job.getCancellationException()cancelCompletedResult(state, cause)continuation.resumeWithStackTrace(cause)} else {// ✅ 正常恢复协程if (exception != null) {// ❌ 传递异常continuation.resumeWithException(exception)} else {// ✅ 传递成功结果continuation.resume(getSuccessfulResult(state))}}}} catch (e: Throwable) {// 🛡️ 处理执行异常fatalException = e} finally {// 🧹 清理工作val result = runCatching { taskContext?.afterTask() }handleFatalException(fatalException, result.exceptionOrNull())}
}
明确要点:
continuation
是等待 join
的协程的 continuation
,state
包含要传递的结果(Unit 表示成功),取消检查:即使目标协程完成,等待协程可能已被取消
12. 📞 continuation.resume()
- 恢复等待协程
// 这里的continuation是等待join的协程的continuation
// 最终会调用到BaseContinuationImpl.resumeWith()// 📍 位置:kotlin-stdlib/BaseContinuationImpl.kt
public final override fun resumeWith(result: Result<Any?>) {var current = thisvar param = result// 🔄 协程状态机执行循环while (true) {probeCoroutineResumed(current) // 调试探针with(current) {val completion = completion!! // 下一个continuationval outcome: Result<Any?> = try {// ⚙️ 执行当前状态机步骤val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) {// ⏸️ 需要再次挂起,退出循环return}Result.success(outcome) // ✅ 继续执行} catch (exception: Throwable) {// ❌ 状态机执行异常Result.failure(exception)}// 🧹 释放当前帧的拦截器引用releaseIntercepted()if (completion is BaseContinuationImpl) {// 📈 继续执行下一个状态机帧current = completionparam = outcome} else {// 🏁 到达顶层,完成整个协程completion.resumeWith(outcome)return}}}
}
明确要点:
这是协程状态机恢复的核心循环,invokeSuspend
是编译器生成的状态机方法,循环处理多个状态机帧的恢复
13. ⚙️ 状态机.invokeSuspend()
- 用户代码继续执行
// 编译器生成的状态机代码(简化版)
override fun invokeSuspend(result: Result<Any?>): Any? {when (label) {0 -> {// 🔍 检查前一步的结果throwOnFailure(result)// 🎯 这里是join()调用点label = 1val joinResult = targetJob.join() // 挂起点if (joinResult === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED}1 -> {// ✅ join()完成,继续执行后续代码throwOnFailure(result) // 检查join是否有异常// 🏁 用户代码继续执行println("Target job completed!")return Unit}}
}
明确要点:
label = 1
表示从 join
挂起点恢复,throwOnFailure(result)
处理 join
过程中的异常,状态机继续执行 join
之后的用户代码
14. 🏁 join()
方法返回,用户代码继续
// 用户代码示例
suspend fun example() {val job = launch {delay(1000)println("Target job finished")}println("Before join")job.join() // ⏸️ 在这里挂起等待println("After join") // 🎯 从这里继续执行
}
明确要点:
join()
方法"返回"实际上是状态机的恢复
用户感知上就像是一个普通的挂起函数调用,后续代码在目标协程完成后继续执行
🤔问题 & 回答
Q1: join()
如何实现线程挂起而不阻塞线程?
A1: 通过协程状态机和 Continuation
机制实现真正的非阻塞挂起
🧵 传统阻塞 vs 协程挂起对比
// ❌ 传统阻塞方式(会阻塞线程)
fun blockingJoin() {while (job.isActive) {Thread.sleep(10) // 🚫 线程被阻塞,无法处理其他任务}
}// ✅ 协程挂起方式(不阻塞线程)
suspend fun suspendingJoin() {job.join() // ⏸️ 协程挂起,线程可以处理其他任务
}
⚙️ 挂起机制的核心原理
// 🎯 协程状态机转换
class JoinStateMachine : BaseContinuationImpl {var label = 0 // 状态标记override fun invokeSuspend(result: Result<Any?>): Any? {when (label) {0 -> {label = 1 // 📍 标记挂起点val joinResult = job.join() // 调用joinif (joinResult === COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED // ⏸️ 告诉调度器:我需要挂起}}1 -> {// 🔄 从挂起点恢复,继续执行println("Join completed!")return Unit}}}
}
🔄 线程不阻塞的关键机制
// 🧵 线程调度器的处理逻辑
class CoroutineScheduler {fun executeCoroutine(continuation: Continuation<*>) {val result = try {// ⚙️ 执行协程状态机continuation.invokeSuspend(lastResult)} catch (e: Throwable) {Result.failure(e)}when (result) {COROUTINE_SUSPENDED -> {// ⏸️ 协程挂起:不继续执行,线程可以处理其他任务// 📋 将continuation保存起来,等待后续恢复// 🧵 线程立即返回,可以执行其他协程return // 线程不阻塞!}else -> {// ✅ 协程完成:处理结果handleResult(result)}}}
}
Q2: join() 的挂起和恢复具体是如何实现的?
A2: 通过 Continuation 保存和恢复机制
📦 Continuation 保存机制
// 📦 挂起时的状态保存
suspend fun join() = suspendCancellableCoroutine<Unit> { cont ->// 🎯 关键:将continuation保存到目标Job的回调列表中val handle = invokeOnCompletion { cause ->// 📞 这个回调会在目标协程完成时被调用if (cause != null) {cont.resumeWithException(cause) // 🔄 恢复并传递异常} else {cont.resume(Unit) // 🔄 恢复并传递成功结果}}// 🧹 设置取消清理cont.invokeOnCancellation { handle.dispose() }// ⏸️ 此时函数返回COROUTINE_SUSPENDED,协程挂起// 📋 continuation被保存在Job的回调列表中// 🧵 当前线程可以去执行其他任务
}
🔄 恢复机制详解
// 🔄 恢复时的执行流程
class CancellableContinuationImpl<T> {fun resume(value: T) {// 🔒 原子性检查和更新状态if (tryResumeImpl(Result.success(value)) === TryResumeToken) {// ✅ 恢复成功,派发到目标线程dispatchResume(MODE_CANCELLABLE)}}private fun dispatchResume(mode: Int) {val dispatcher = context[ContinuationInterceptor] as? CoroutineDispatcherif (dispatcher?.isDispatchNeeded(context) == true) {// 🚀 派发到目标线程的任务队列dispatcher.dispatch(context, this)} else {// ⚡ 在当前线程直接执行executeUnconfined()}}
}
Q3: 为什么协程挂起不会导致线程泄漏?
A3: 协程与线程是 M:N
的映射关系,线程可以复用
🧵 线程复用机制
// 🧵 线程池复用示例
class CoroutineDispatcher {private val threadPool = Executors.newFixedThreadPool(4) // 固定4个线程override fun dispatch(context: CoroutineContext, block: Runnable) {// 📋 将任务提交到线程池队列threadPool.execute {try {block.run() // 🏃 执行协程任务} catch (e: Throwable) {handleException(e)}// 🔄 任务完成后,线程返回池中等待下一个任务}}
}// 📊 协程与线程的关系
// 1000个协程 → 4个线程 (M:N映射)
// 协程挂起 → 线程继续执行其他协程
// 协程恢复 → 可能在不同线程上执行
Q4: join()
如何处理协程取消的传播?
A4: 通过结构化并发和取消令牌传播
🚫 取消传播机制
// 🚫 取消传播示例
suspend fun parentCoroutine() = coroutineScope {val childJob = launch {delay(5000) // 长时间运行println("Child completed")}launch {delay(1000)println("Before join")childJob.join() // 🔗 等待子协程println("After join") // 如果父协程被取消,这里不会执行}// 🚫 2秒后取消整个scopedelay(2000)throw CancellationException("Parent cancelled")
}// 取消传播流程:
// 1. 父协程抛出CancellationException
// 2. coroutineScope捕获异常并取消所有子协程
// 3. childJob被取消
// 4. join()等待的协程收到CancellationException
// 5. join()抛出异常,等待协程也被取消
Q5: join()
与其他等待机制的区别?
A5: join()
是结构化并发的一部分,与传统等待机制有本质区别
📊 对比分析
// 📊 不同等待机制对比// ❌ Thread.join() - 阻塞线程
thread.join() // 线程被阻塞,无法处理其他任务// ❌ CountDownLatch - 阻塞线程
latch.await() // 线程被阻塞// ❌ Future.get() - 阻塞线程
future.get() // 线程被阻塞// ✅ Coroutine.join() - 挂起协程,不阻塞线程
job.join() // 协程挂起,线程可以执行其他协程// ✅ Channel.receive() - 挂起协程
channel.receive() // 协程挂起等待数据// ✅ Deferred.await() - 挂起协程
deferred.await() // 协程挂起等待结果
Q6: join()
在异常情况下的行为?
A6: 遵循结构化并发原则,异常会正确传播
🛡️ 异常处理机制
suspend fun exceptionHandling() {val job = launch {delay(1000)throw RuntimeException("Something went wrong")}try {job.join() // 🎯 会重新抛出子协程的异常} catch (e: RuntimeException) {println("Caught exception from child: ${e.message}")// ✅ 异常被正确传播和处理}
}// 🔄 异常传播流程
// 1. 子协程抛出异常
// 2. 异常被包装为CompletedExceptionally
// 3. join()的回调收到异常
// 4. cont.resumeWithException(cause)
// 5. 等待协程收到异常并重新抛出
🎯 核心要点总结
💡 关键理解
- 挂起 ≠ 阻塞:协程挂起释放线程,阻塞占用线程
- 状态机机制:通过
label
和continuation
实现挂起恢复 - 回调转换:将异步回调转换为同步代码风格
- 线程复用:M:N 映射实现高并发低资源消耗
- 结构化并发:异常和取消的正确传播
🚀 性能优势
- 零拷贝挂起:状态保存在堆栈帧中
- 快速路径优化:已完成任务直接返回
- 智能调度:避免不必要的线程切换
- 内存效率:协程开销远小于线程
🛡️ 安全保障
- 异常安全:异常正确传播不丢失
- 取消安全:支持协作式取消
- 资源安全:自动清理和资源管理
- 线程安全:无锁并发设计