从 LiveData 到 Flow:状态、事件、背压与回调全面实战
前言
在 Android 架构的演进中,我们从 LiveData → SingleLiveEvent → Flow → StateFlow/SharedFlow → Channel → callbackFlow,一步步解决了“粘性问题”、“一次性事件”、“背压控制”、“回调转流”等场景痛点。
本文结合实战问题和例子,带你从历史到现代,全面理解这些工具的定位与使用方式。
1. LiveData 的粘性
-
LiveData 缓存最后一次值,新观察者一注册就立刻收到 → 粘性。
-
适合:状态(开关、电量、加载中)。
-
不适合:一次性事件(Toast、导航)。
val live = MutableLiveData<String>()
live.value = "Hello"
live.observe(this) { println(it) } // 新观察者立刻收到 "Hello"
2. SingleLiveEvent(历史过渡)
-
出自 Google 官方 sample,用
AtomicBoolean
确保只派发一次。 -
局限:单观察者适用,多观察者竞态,高频事件可能丢失。
-
不推荐新项目使用。
3. Kotlin Flow 与“天然背压”
-
Flow 是冷流,只有
collect
时才生产。 -
天然背压:下游慢,
emit()
挂起等待,不再“狂喷”。
常用调速算子:
-
buffer(n)
:缓存 n 条。 -
conflate()
:丢掉中间值,只保最新。 -
collectLatest {}
:新值来就取消旧处理。 -
debounce()/sample()
:防抖/抽样。
Kotlin Flow 与“天然背压”(完整示例)
4. StateFlow 与 SharedFlow
StateFlow —— 状态容器
-
始终有初始值,始终保存最新状态。
-
粘性:新订阅者立刻拿到当前值。
data class UiState(val loading:Boolean=false, val name:String="", val error:String?=null)class ProfileVm: ViewModel() {private val _state = MutableStateFlow(UiState())val state: StateFlow<UiState> = _statefun load() = viewModelScope.launch {_state.update { it.copy(loading = true) }runCatching { repo.fetchName() }.onSuccess { n -> _state.update { it.copy(loading = false, name = n) } }.onFailure { e -> _state.update { it.copy(loading = false, error = e.message) } }}
}
UI 收集:
viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {vm.state.collect { s -> render(s) }}
}
SharedFlow —— 事件广播
-
默认无粘性(
replay=0
),新订阅者拿不到历史值。 -
可配置 replay、extraBufferCapacity、onBufferOverflow。
-
适合:事件(Toast、导航)。
sealed interface UiEvent {data class Toast(val msg: String): UiEventdata object NavigateSettings: UiEvent
}class HomeVm: ViewModel() {private val _events = MutableSharedFlow<UiEvent>(replay = 0, extraBufferCapacity = 1)val events = _events.asSharedFlow()fun saveOk() { _events.tryEmit(UiEvent.Toast("保存成功")) }fun goSettings() = viewModelScope.launch { _events.emit(UiEvent.NavigateSettings) }
}
UI 收集:
viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {vm.events.collect { e -> handleEvent(e) }}
}
5. Channel 与 Actor 模式(重点)
5.1 Channel 基础
-
协程间的线程安全 FIFO 队列。
容量类型:
类型 | 行为 |
---|---|
Channel.RENDEZVOUS (0) | 无缓冲,一发一收,强背压 |
Channel(n) | 有界缓冲,满了 send 挂起、trySend 失败 |
Channel.BUFFERED | 默认小缓冲(实现相关,一般 64) |
Channel.UNLIMITED | 无界,可能 OOM,慎用 |
Channel.CONFLATED | 只保留最新,旧的丢掉 |
背压策略:
-
send(x)
:队列满时挂起,不丢 -
trySend(x)
:队列满时立即失败,可能丢
5.2 Actor 模式 = Channel + 单消费者协程
保证:所有任务顺序处理,避免并发乱序。
(1) 命令队列(严格 FIFO,不可丢)
sealed interface Cmd { data class Write(val bytes:ByteArray): Cmd; data object Close: Cmd }class SerialActor(private val port: SerialPort): Closeable {private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)private val inbox = Channel<Cmd>(64)init {scope.launch {for (c in inbox) when(c) {is Cmd.Write -> port.write(c.bytes)Cmd.Close -> { port.close(); break }}}}suspend fun write(b: ByteArray) = inbox.send(Cmd.Write(b)) // 顺序执行fun tryWrite(b: ByteArray) = inbox.trySend(Cmd.Write(b)).isSuccessoverride fun close() { inbox.trySend(Cmd.Close); inbox.close(); scope.cancel() }
}
(2) 请求-应答(一发一收)
sealed interface Req {data class ReadAngle(val reply: CompletableDeferred<Int>): Req
}class MotorActor(private val port: SerialPort) {private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)private val box = Channel<Req>(32)init {scope.launch {for (r in box) when(r) {is Req.ReadAngle -> {val v = port.readAngleBlocking()r.reply.complete(v)}}}}suspend fun readAngle(): Int {val d = CompletableDeferred<Int>()box.send(Req.ReadAngle(d))return d.await()}
}
(3) 只要最新(高频状态)
private val _angle = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1)
fun setAngle(d: Int) { _angle.tryEmit(d) }viewModelScope.launch(Dispatchers.IO) {_angle.distinctUntilChanged().collectLatest { d -> motor.goto(d) }
}
5.3 背压与丢弃策略
suspend fun safeSend(ch: Channel<Cmd>, c: Cmd) {var ok = ch.trySend(c).isSuccessvar delayMs = 5Lwhile (!ok && delayMs <= 40) {delay(delayMs)ok = ch.trySend(c).isSuccessdelayMs *= 2}
}
5.4 多消费者与“抢活”
-
电机、串口:必须严格顺序 → 单消费者
-
日志落盘、图像处理:允许乱序 → 多消费者形成工作池
6. callbackFlow
回调转 Flow:
fun motorAngles(): Flow<Int> = callbackFlow {val l = object: AngleListener {override fun onAngle(d:Int) { trySend(d).isSuccess }override fun onError(e:Throwable) { close(e) }}motor.addListener(l)awaitClose { motor.removeListener(l) }
}.buffer(64).conflate().flowOn(Dispatchers.IO)
UI 收集:
motorAngles().collectLatest { d -> motor.goto(d) }
7. lifecycleScope vs viewModelScope
-
生产:ViewModel →
viewModelScope
-
消费:UI →
viewLifecycleOwner.lifecycleScope
class DashboardVm: ViewModel() {private val _state = MutableStateFlow(UiState())val state = _state.asStateFlow()fun load() = viewModelScope.launch { _state.update { it.copy(loading=true) } }
}viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {vm.state.collect { render(it) }}
}
8. 高频命令场景
逐条执行(严格 FIFO):
val channel = Channel<Int>(64)
launch { for (d in channel) motor.goto(d) }
只要最新(丢弃旧的):
val latest = MutableSharedFlow<Int>(replay=0, extraBufferCapacity=1)
fun setAngle(d:Int) { latest.tryEmit(d) }launch {latest.distinctUntilChanged().collectLatest { d -> motor.goto(d) }
}
9. 背压工具清单
-
buffer(n):缓存队列
-
conflate():只保最新
-
collectLatest{}:新来取消旧
-
debounce()/sample():控速
-
flowOn(IO):切线程
10. 高频命令进阶(角度缓存问题)
// 每条角度都执行
val cmd = Channel<Int>(64)
launch { for (d in cmd) motor.goto(d) }// 只执行最新
val latest = MutableSharedFlow<Int>(replay=0, extraBufferCapacity=1)
launch { latest.collectLatest { d -> motor.goto(d) } }
11. 选型速查表
场景 | 工具 | 说明 |
---|---|---|
页面状态 | StateFlow | 粘性 |
一次性事件(单播) | Channel | FIFO |
一次性事件(广播) | SharedFlow(replay=0) | 多观察者 |
严格顺序任务 | Channel + Actor | 顺序执行 |
回调源 | callbackFlow | 回调转流 |
高频状态只要最新 | conflate / collectLatest | 丢旧保新 |
12. 完整模板(VM + UI)
class Vm : ViewModel() {private val _state = MutableStateFlow(UiState())val state: StateFlow<UiState> = _stateprivate val _events = Channel<UiEvent>(Channel.BUFFERED)val events = _events.receiveAsFlow()private val _angleCmd = MutableSharedFlow<Int>(replay=0, extraBufferCapacity=1)fun setAngle(d: Int) { _angleCmd.tryEmit(d) }init {viewModelScope.launch(Dispatchers.IO) {_angleCmd.collectLatest { d -> motor.goto(d) }}}fun goSettings() = viewModelScope.launch { _events.send(UiEvent.NavigateSettings) }
}
viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {launch { vm.state.collect { render(it) } }launch { vm.events.collect { handle(it) } }}
}
结语
-
LiveData 粘性 → SingleLiveEvent 权宜 → Flow 体系全面取代。
-
StateFlow 管状态,SharedFlow 管事件广播,Channel/Actor 管严格队列,callbackFlow 管回调源。
-
Flow 天然背压,配合 buffer/conflate/collectLatest/debounce 灵活控速。
-
UI 用 lifecycleScope 收集,VM 用 viewModelScope 生产,职责清晰,生命周期安全。
一句话总结:状态用 StateFlow,广播事件用 SharedFlow,单播/队列用 Channel(配 Actor),回调用 callbackFlow。