CompletableDeferred、defer 和 Job 关系
1. Job - 协程的基本单位
Job
是 Kotlin 协程中的基本概念,代表一个可取消的、有生命周期的工作单元。
import kotlinx.coroutines.*fun main() = runBlocking {// launch 返回一个 Jobval job: Job = launch {delay(1000)println("Job completed")}// 可以等待 Job 完成job.join()println("All done")
}
Job 的主要特性:
-
可取消 (
job.cancel()
) -
可等待完成 (
job.join()
) -
有状态(New, Active, Completing, Completed, Cancelling, Cancelled)
-
可以有父子关系
2. Deferred - 带结果的 Job
Deferred
是 Job
的子接口,代表一个会产生结果的异步计算。
import kotlinx.coroutines.*fun main() = runBlocking {// async 返回一个 Deferredval deferred: Deferred<Int> = async {delay(1000)42 // 这个结果会被返回}// 等待并获取结果val result: Int = deferred.await()println("Result: $result")
}
Deferred 的特点:
-
继承自
Job
,所以也有取消、等待等功能 -
通过
await()
方法获取结果 -
如果计算过程中出现异常,
await()
会抛出异常
3. CompletableDeferred - 可手动完成的 Deferred
CompletableDeferred
是一个特殊的 Deferred
,你可以手动控制它何时完成以及以什么结果完成。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*fun main() = runBlocking {// 创建一个可以手动完成的 Deferredval completable = CompletableDeferred<String>()// 启动一个协程来等待结果launch {val result = completable.await()println("Received: $result")}// 在另一个地方手动完成它launch {delay(1000)completable.complete("Hello World!") // 手动设置结果// 或者 completable.completeExceptionally(RuntimeException("Error"))}
}
4. 三者的关系对比
类型 | 特点 | 创建方式 | 使用场景 |
---|---|---|---|
Job | 无返回值的工作单元 | launch , Job() | 执行后台任务,不需要结果 |
Deferred | 有返回值的 Job | async | 执行异步计算,需要获取结果 |
CompletableDeferred | 可手动控制的 Deferred | CompletableDeferred() | 桥接回调式API、手动控制完成时机 |
5. 实际应用场景
场景1:将回调API转换为协程
suspend fun fetchUserData(userId: String): UserData {val deferred = CompletableDeferred<UserData>()// 传统回调式APIsomeCallbackBasedApi.fetchUser(userId,onSuccess = { user -> deferred.complete(user) // 成功时完成},onError = { error ->deferred.completeExceptionally(error) // 失败时完成})return deferred.await() // 等待回调触发
}
注意:需要手动complete,否则 await 会一直挂起。
场景2:多个生产者一个消费者
fun main() = runBlocking {val resultDeferred = CompletableDeferred<Int>()// 多个协程尝试计算结果repeat(5) { i ->launch {try {val result = computeResult(i)// 只有第一个完成的会设置结果if (resultDeferred.complete(result)) {println("Worker $i produced the result")}} catch (e: Exception) {resultDeferred.completeExceptionally(e)}}}val finalResult = resultDeferred.await()println("Final result: $finalResult")
}suspend fun computeResult(index: Int): Int {delay((1000L..2000L).random())return index * 10
}
场景3:超时控制
suspend fun fetchWithTimeout(): String {val deferred = CompletableDeferred<String>()// 设置超时launch {delay(5000)if (!deferred.isCompleted) {deferred.completeExceptionally(TimeoutException("Request timed out"))}}// 模拟网络请求launch {try {delay(3000) // 模拟网络延迟deferred.complete("Data received")} catch (e: Exception) {deferred.completeExceptionally(e)}}return deferred.await()
}
6. 重要注意事项
避免重复完成
val deferred = CompletableDeferred<String>()// 正确:检查是否成功设置
if (deferred.complete("success")) {println("Successfully set the result")
}// 后续的 complete 调用会返回 false,不会覆盖之前的结果
deferred.complete("another attempt") // 返回 false,结果仍然是 "success"
异常处理
val deferred = CompletableDeferred<String>()// 设置异常结果
deferred.completeExceptionally(RuntimeException("Something went wrong"))try {deferred.await()
} catch (e: Exception) {println("Caught exception: ${e.message}")
}
总结
-
Job: 基础的工作单元,无返回值
-
Deferred: 带结果的 Job,通过
async
创建 -
CompletableDeferred: 可手动控制的 Deferred,用于桥接回调API或复杂协调场景
CompletableDeferred
是一个非常强大的工具,特别适合在需要将传统回调式代码集成到协程体系中,或者在多个协程之间需要复杂协调的场景中使用。
CompletableDeferred
确实不适合数据流场景。它设计用于单次的异步结果,而不是持续的数据流。
解释:
为什么 CompletableDeferred 不适合数据流
知识点补充:
CompletableDeferred 使用和注意事项