为什么 CompletableDeferred 不适合数据流---》数据流的正确解决方案
1. 单次性限制
val deferred = CompletableDeferred<String>()// 只能完成一次
deferred.complete("第一个值")
deferred.complete("第二个值") // 这个会被忽略!val result = deferred.await() // 永远只能得到 "第一个值"
2. 无法处理多个值
// 错误用法:试图用多个 CompletableDeferred
val deferred1 = CompletableDeferred<String>()
val deferred2 = CompletableDeferred<String>()
val deferred3 = CompletableDeferred<String>()// 这会导致复杂的协调和资源管理问题
数据流的正确解决方案
对于数据流场景,Kotlin 提供了专门的解决方案:
1. Flow - 最推荐的方案
// 创建数据流
fun dataStream(): Flow<String> = flow {emit("数据1")delay(1000)emit("数据2") delay(1000)emit("数据3")
}// 收集数据流
fun main() = runBlocking {dataStream().collect { value ->println("收到: $value")}
}
2. Channel - 用于热数据流
fun main() = runBlocking {val channel = Channel<String>()// 生产者launch {channel.send("消息1")delay(1000)channel.send("消息2")delay(1000)channel.send("消息3")channel.close() // 结束流}// 消费者launch {for (msg in channel) {println("消费: $msg")}}
}
3. StateFlow - 状态管理
class DataManager {private val _state = MutableStateFlow("初始状态")val state: StateFlow<String> = _state.asStateFlow()fun updateData() {_state.value = "更新后的状态"}
}// 监听状态变化
fun main() = runBlocking {val manager = DataManager()launch {manager.state.collect { state ->println("状态变化: $state")}}delay(1000)manager.updateData()
}
4. SharedFlow - 事件总线模式
object EventBus {private val _events = MutableSharedFlow<String>()val events = _events.asSharedFlow()suspend fun emitEvent(event: String) {_events.emit(event)}
}// 多个订阅者
fun main() = runBlocking {// 订阅者1launch {EventBus.events.collect { event ->println("订阅者1收到: $event")}}// 订阅者2 launch {EventBus.events.collect { event ->println("订阅者2收到: $event")}}// 发布事件EventBus.emitEvent("事件1")delay(1000)EventBus.emitEvent("事件2")
}
场景对比表
场景 | 适合的方案 | 不适合的方案 |
---|---|---|
单次异步结果 | CompletableDeferred , async | Flow , Channel |
持续数据流 | Flow , Channel | CompletableDeferred |
状态管理 | StateFlow , LiveData | CompletableDeferred |
事件广播 | SharedFlow , BroadcastChannel | CompletableDeferred |
请求-响应 | CompletableDeferred , suspend函数 |
|
实际例子对比
❌ 错误:用 CompletableDeferred 处理数据流
// 反模式!
class WrongStreamProcessor {private var currentDeferred: CompletableDeferred<String>? = nullfun processStream() {// 复杂的协调逻辑,容易出错// 需要管理多个 CompletableDeferred// 资源泄漏风险高}
}
✅ 正确:用 Flow 处理数据流
class StreamProcessor {fun dataStream(): Flow<String> = flow {var count = 0while (true) {emit("数据${count++}")delay(1000)}}suspend fun process() {dataStream().take(10) // 只取前10个.collect { data ->println("处理: $data")}}
}
总结
判断:
CompletableDeferred
:适合单次的异步结果(如网络请求响应、用户确认)Flow
/Channel
:适合持续的数据流(如实时数据更新、事件流、状态变化)
选择正确的工具很重要:
需要一次性结果?用
CompletableDeferred
需要持续的数据流?用
Flow
或Channel
需要状态管理?用
StateFlow
需要事件广播?用
SharedFlow
这样可以避免很多不必要的复杂性和潜在的错误!