Kotlin协程 -> Deferred.await() 完整流程图与核心源码分析
🔄 Deferred.await()
完整调用流程图
👤 用户调用 deferred.await()↓
🔍 DeferredCoroutine.await() → 检查 getCompleted() 状态↓
❓ 完成状态判断├── ✅ 已完成 → getCompleted() 直接返回结果│ ├── CompletedExceptionally → throw state.cause│ └── 正常完成 → return result (直接返回)│└── ⏳ 未完成 → awaitSuspend() 挂起等待↓🔗 suspendCancellableCoroutine { cont -> ... }↓📝 invokeOnCompletion(onCancelling = false) { cause -> ... }↓🔄 loopOnState { state -> ... } 状态循环检查↓❓ 当前状态判断├── 🏃 Incomplete → makeNode() 创建回调节点│ ↓│ 🔒 _state.compareAndSet(state, state + node) 原子添加│ ↓│ 📦 return node (JobNode) 作为 DisposableHandle│└── ✅ 已完成 → invokeIt(handler, state) 立即执行回调↓📞 handler(cause) 执行用户回调↓🎯 提取结果:getCompleted() 获取实际值↓🎯 cont.resume(result) 或 cont.resumeWithException(cause)↓
⏸️ cancellable.getResult() → COROUTINE_SUSPENDED 挂起标记↓
🕐 等待目标协程完成...↓
🎯 目标协程完成:AbstractCoroutine.resumeWith(result)↓
📍 makeCompletingOnce(result.toState())↓
🔄 loopOnState { state -> ... } 完成状态转换↓
🔧 Completing(state.list, false, proposedUpdate) 创建完成状态↓
🔒 _state.compareAndSet(state, completing) 原子更新状态↓
📢 completeStateFinalization(state, completing)↓
🔔 notifyCompletion(list, cause) 通知所有回调↓
🔄 list.forEach<JobNode<*>> { node -> node.invoke(cause) }↓
🎯 AwaitContinuation.invoke() → handler(cause) 执行await的回调↓
📦 val result = getCompleted() 获取实际结果值↓
📞 CancellableContinuationImpl.resumeWith(Result.success(result))↓
🔒 tryResumeImpl() → _state.compareAndSet(ACTIVE, result)↓
🚀 dispatchResume(resumeMode) 派发恢复↓
🧵 CoroutineDispatcher.dispatch(context, this)↓
🎯 目标线程执行 DispatchedTask.run()↓
📞 continuation.resumeWith(Result.success(result))↓
🔄 BaseContinuationImpl.resumeWith() 状态机恢复↓
⚙️ 状态机.invokeSuspend() 继续执行用户代码↓
🏁 await() 方法返回实际结果值,用户代码继续执行
🎯 核心源码方法明确讲解
1. 🔍 DeferredCoroutine.await()
- 入口方法
// 📍 位置:kotlinx.coroutines/DeferredCoroutine.kt
public final override suspend fun await(): T {// 🚀 快速路径:检查是否已完成val state = this.stateif (state !is Incomplete) {// ✅ 已完成,直接获取结果if (state is CompletedExceptionally) {throw state.cause // ❌ 重新抛出异常}// 🎯 关键差异:返回实际结果值,而不是Unitreturn state as T // 直接返回计算结果}// ⏳ 慢速路径:协程还在运行,需要挂起等待return awaitSuspend()
}// 📊 与join()的核心区别
// join(): return Unit (只关心完成状态)
// await(): return T (需要获取实际结果值)
明确要点:
await()
需要返回实际的计算结果,而不是 Unit- 快速路径直接返回已完成的结果值
- 状态检查逻辑与
join()
基本相同
2. 🔗 awaitSuspend()
- 挂起等待核心
private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->// 📝 关键:注册完成回调,当目标协程完成时会被调用val handle = invokeOnCompletion(onCancelling = false) { cause ->// 🎯 这个回调会在目标协程完成时执行val state = this@DeferredCoroutine.stateif (cause != null) {// ❌ 异常完成cont.resumeWithException(cause)} else {// ✅ 正常完成:获取实际结果值if (state is CompletedExceptionally) {cont.resumeWithException(state.cause)} else {// 🎯 关键:提取并返回实际结果val result = getCompleted()cont.resume(result)}}}// 🧹 设置取消处理cont.invokeOnCancellation { handle.dispose() }
}
明确要点:
- 回调中需要调用
getCompleted()
获取实际结果 - 异常处理逻辑与
join()
相同 cont.resume(result)
传递的是实际结果值
3. 📦 getCompleted()
- 结果提取核心
// 📍 位置:kotlinx.coroutines/JobSupport.kt
public val isCompleted: Boolean get() = state !is Incompleteinternal fun getCompleted(): Any? {val state = this.statecheck(state !is Incomplete) { "This job has not completed yet" }// 🔍 处理异常完成状态if (state is CompletedExceptionally) {throw state.cause}// 🎯 返回实际结果值return state
}// 📊 类型安全的结果获取
public suspend fun await(): T {// ...挂起逻辑...@Suppress("UNCHECKED_CAST")return getCompleted() as T // 类型转换为期望类型
}
明确要点:
getCompleted()
在协程完成后提取实际结果- 异常状态会重新抛出异常
- 正常状态返回存储的结果值
4. 📝 AwaitContinuation
- 专用回调节点
// 📍 位置:kotlinx.coroutines/JobSupport.kt
private class AwaitContinuation<T>(private val continuation: CancellableContinuation<T>
) : JobNode<Job>() {override fun invoke(cause: Throwable?) {val state = job.stateif (cause != null) {// ❌ 异常完成:传播异常continuation.resumeWithException(cause)} else {// ✅ 正常完成:提取并传递结果if (state is CompletedExceptionally) {continuation.resumeWithException(state.cause)} else {// 🎯 关键:获取实际结果值@Suppress("UNCHECKED_CAST")val result = state as Tcontinuation.resume(result)}}}
}
明确要点:
AwaitContinuation
是专门为await()
设计的回调节点- 负责在协程完成时提取和传递实际结果
- 类型安全转换确保结果类型正确
5. 🎯 目标协程完成 - 结果存储
// 📍 位置:kotlinx.coroutines/AbstractCoroutine.kt
public final override fun resumeWith(result: Result<T>) {// 🔄 将结果转换为内部状态val state = result.toState()// 📍 完成协程并存储结果if (makeCompletingOnce(state)) {return}// 处理重复完成的情况
}// 🔄 结果状态转换
internal fun <T> Result<T>.toState(): Any? = fold(onSuccess = { value -> value }, // ✅ 成功:直接存储值onFailure = { exception -> CompletedExceptionally(exception) } // ❌ 失败:包装异常
)
明确要点:
- 成功结果直接存储为状态值
- 异常结果包装为
CompletedExceptionally
makeCompletingOnce
确保原子性完成
6. 📢 notifyCompletion()
- 通知等待者
private fun notifyCompletion(list: NodeList, cause: Throwable?) {// 🔄 遍历所有等待的回调节点list.forEach<JobNode<*>> { node ->try {// 🎯 执行回调:await()的回调会在这里被调用node.invoke(cause)} catch (ex: Throwable) {// 🛡️ 回调异常处理handleOnCompletionException(ex)}}
}// 🎯 AwaitContinuation.invoke() 在这里被调用
// 📦 回调中会调用 getCompleted() 获取结果
// 📞 然后调用 continuation.resume(result) 恢复等待协程
明确要点:
- 所有注册的回调节点都会被通知
await()
的回调会提取实际结果值- 异常隔离防止回调异常影响其他等待者
7. 📞 CancellableContinuation.resume(result)
- 传递结果
// 📍 位置:kotlinx.coroutines/CancellableContinuationImpl.kt
public override fun resume(value: T) {// 🔄 包装成功结果,注意这里的value是实际结果值resumeWith(Result.success(value))
}// 📞 核心恢复方法
override fun resumeWith(result: Result<T>) {val state = result.toState() // 转换为内部状态// 🔒 尝试原子性恢复if (tryResumeImpl(state) === TryResumeToken) {// ✅ 恢复成功:派发到目标线程执行dispatchResume(resumeMode)}
}
明确要点:
- value 参数是从
getCompleted()
获取的实际结果 - 结果被包装为
Result.success(value)
- 后续派发和恢复逻辑与
join()
相同
8. ⚙️ 状态机 .invokeSuspend()
- 返回实际结果
// 编译器生成的状态机代码(简化版)
override fun invokeSuspend(result: Result<Any?>): Any? {when (label) {0 -> {// 🔍 检查前一步的结果throwOnFailure(result)// 🎯 这里是await()调用点label = 1val awaitResult = targetDeferred.await() // 挂起点if (awaitResult === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED}1 -> {// ✅ await()完成,继续执行后续代码throwOnFailure(result) // 检查await是否有异常// 🎯 关键:result包含实际的计算结果val actualResult = result.getOrThrow() // 获取实际结果值// 🏁 用户代码继续执行,可以使用实际结果println("Got result: $actualResult")return actualResult // 返回给调用者}}
}
明确要点:
result.getOrThrow()
获取await()
返回的实际值- 状态机恢复时携带具体的计算结果
- 用户代码可以直接使用返回的结果值
9. 🏁 await()
方法返回实际结果
// 用户代码示例
suspend fun example() {val deferred = async {delay(1000)"Hello, World!" // 🎯 实际的计算结果}println("Before await")val result = deferred.await() // ⏸️ 在这里挂起等待println("Got result: $result") // 🎯 result = "Hello, World!"// 🏁 可以直接使用计算结果
}
明确要点:
- a
wait()
返回泛型类型T
的实际值 - 用户可以直接使用返回的计算结果
- 类型安全:编译时确保类型正确
🔄 await() vs join()
核心差异对比
📊 关键区别总结
特性 | join() | await() |
---|---|---|
返回类型 | Unit | T (泛型) |
用途 | 等待完成 | 获取结果 |
适用对象 | Job | Deferred |
快速路径 | 检查完成状态 | 检查完成状态 + 提取结果 |
回调处理 | cont.resume(Unit) | cont.resume(getCompleted()) |
状态机恢复 | 携带 | Unit 携带实际结果值 |
📊 关键区别总结
// 🔄 join() 的核心逻辑
suspend fun join() {if (!isActive) return // ✅ 只关心是否完成return joinSuspend() // ⏳ 挂起等待完成
}// 🎯 await() 的核心逻辑
suspend fun await(): T {val state = this.stateif (state !is Incomplete) {return state as T // ✅ 返回实际结果值}return awaitSuspend() // ⏳ 挂起等待并获取结果
}// 📞 回调差异
// join() 回调
{ cause -> if (cause != null) cont.resumeWithException(cause)else cont.resume(Unit) // 只传递完成信号
}// await() 回调
{ cause ->if (cause != null) cont.resumeWithException(cause) else {val result = getCompleted() // 🎯 提取实际结果cont.resume(result) // 传递实际值}
}
🤔 问题 & 回答
Q1: await()
如何保证类型安全?
A1: 通过泛型约束和编译时检查确保类型安全
🎯 类型安全机制
// 🔒 泛型约束
interface Deferred<out T> : Job {suspend fun await(): T // 🎯 返回类型与泛型参数一致
}// 🏗️ 创建时确定类型
val stringDeferred: Deferred<String> = async { "Hello" }
val intDeferred: Deferred<Int> = async { 42 }// ✅ 编译时类型检查
val str: String = stringDeferred.await() // ✅ 类型匹配
val num: Int = intDeferred.await() // ✅ 类型匹配// ❌ 编译错误
// val wrong: Int = stringDeferred.await() // 编译错误!// 🔄 运行时类型转换
@Suppress("UNCHECKED_CAST")
return getCompleted() as T // 安全转换,编译时已验证
Q2: await()
如何处理 null
结果?
A2: null
是合法的结果值,正常传递
🎯 null 值处理
// ✅ null 是合法结果
val nullableDeferred: Deferred<String?> = async { if (Random.nextBoolean()) "Hello" else null
}val result: String? = nullableDeferred.await() // 可能为 null// 🔄 内部处理逻辑
fun getCompleted(): Any? {val state = this.stateif (state is CompletedExceptionally) throw state.causereturn state // 🎯 state 可能为 null,这是合法的
}// 📦 状态存储
// null 值直接作为状态存储,不需要特殊包装
// CompletedExceptionally 用于异常,null 用于正常的 null 结果
Q3: await()
的性能特点?
A3: 与 join()
性能相近,额外开销主要在结果提取
📊 性能分析
// 🚀 快速路径性能(已完成)
// join(): 1 次状态检查
// await(): 1 次状态检查 + 1 次结果提取 (getCompleted)// ⏳ 挂起路径性能(未完成)
// 两者基本相同:
// - 相同的挂起机制
// - 相同的回调注册
// - 相同的恢复派发
// - await() 额外的结果提取开销很小// 🧠 内存开销
// join(): 存储 Unit (0 字节)
// await(): 存储实际结果 (取决于结果大小)// 🎯 优化建议
// 1. 如果只需要等待完成,使用 join()
// 2. 如果需要结果,使用 await()
// 3. 避免不必要的 await() 调用
Q4: await()
与 runBlocking
的区别?
A4: await()
是挂起函数,runBlocking
是阻塞函数
🔄 挂起 vs 阻塞对比
// ✅ await() - 挂起函数
suspend fun suspendingWay() {val deferred = async { computeValue() }val result = deferred.await() // ⏸️ 挂起,不阻塞线程println("Result: $result")
}// ❌ runBlocking - 阻塞函数
fun blockingWay() {val result = runBlocking {val deferred = async { computeValue() }deferred.await() // 🚫 阻塞当前线程}println("Result: $result")
}// 🧵 线程使用对比
// await(): 线程可以执行其他协程
// runBlocking: 线程被阻塞,无法处理其他任务
Q5: 多个 await()
的并发处理?
A5: 可以并发等待多个 Deferred
,提高效率
🚀 并发 await 策略
// ❌ 串行等待 - 效率低
suspend fun sequential() {val deferred1 = async { computeValue1() }val deferred2 = async { computeValue2() }val result1 = deferred1.await() // 等待1完成val result2 = deferred2.await() // 等待2完成// 总时间 = time1 + time2
}// ✅ 并发等待 - 高效
suspend fun concurrent() {val deferred1 = async { computeValue1() }val deferred2 = async { computeValue2() }// 🚀 同时等待多个结果val results = awaitAll(deferred1, deferred2)// 或者val result1 = deferred1.await()val result2 = deferred2.await()// 总时间 = max(time1, time2)
}// 🎯 awaitAll 实现原理
suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> {return deferreds.map { it.await() } // 并发等待所有结果
}
🎯 核心要点总结
💡 关键理解
- 结果传递:
await()
传递实际计算结果,join()
只传递完成信号 - 类型安全:泛型约束确保编译时类型正确性
- 性能相近:挂起机制相同,额外开销主要在结果提取
- 并发友好:支持多个
await()
并发执行 - 异常处理:异常传播机制与
join()
完全相同