当前位置: 首页 > news >正文

从 LiveData 到 Flow:状态、事件、背压与回调全面实战

前言

在 Android 架构的演进中,我们从 LiveData → SingleLiveEvent → Flow → StateFlow/SharedFlow → Channel → callbackFlow,一步步解决了“粘性问题”、“一次性事件”、“背压控制”、“回调转流”等场景痛点。
本文结合实战问题和例子,带你从历史到现代,全面理解这些工具的定位与使用方式。

1. LiveData 的粘性

  • LiveData 缓存最后一次值,新观察者一注册就立刻收到 → 粘性

  • 适合:状态(开关、电量、加载中)。

  • 不适合:一次性事件(Toast、导航)。

val live = MutableLiveData<String>()
live.value = "Hello"
live.observe(this) { println(it) } // 新观察者立刻收到 "Hello"

2. SingleLiveEvent(历史过渡)

  • 出自 Google 官方 sample,用 AtomicBoolean 确保只派发一次。

  • 局限:单观察者适用,多观察者竞态,高频事件可能丢失。

  • 不推荐新项目使用

3. Kotlin Flow 与“天然背压”

  • Flow 是冷流,只有 collect 时才生产。

  • 天然背压:下游慢,emit() 挂起等待,不再“狂喷”。

常用调速算子:

  • buffer(n):缓存 n 条。

  • conflate():丢掉中间值,只保最新。

  • collectLatest {}:新值来就取消旧处理。

  • debounce()/sample():防抖/抽样。

Kotlin Flow 与“天然背压”(完整示例)

4. StateFlow 与 SharedFlow

StateFlow —— 状态容器

  • 始终有初始值,始终保存最新状态。

  • 粘性:新订阅者立刻拿到当前值。

data class UiState(val loading:Boolean=false, val name:String="", val error:String?=null)class ProfileVm: ViewModel() {private val _state = MutableStateFlow(UiState())val state: StateFlow<UiState> = _statefun load() = viewModelScope.launch {_state.update { it.copy(loading = true) }runCatching { repo.fetchName() }.onSuccess { n -> _state.update { it.copy(loading = false, name = n) } }.onFailure { e -> _state.update { it.copy(loading = false, error = e.message) } }}
}

UI 收集:

viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {vm.state.collect { s -> render(s) }}
}

SharedFlow —— 事件广播

  • 默认无粘性(replay=0),新订阅者拿不到历史值。

  • 可配置 replay、extraBufferCapacity、onBufferOverflow。

  • 适合:事件(Toast、导航)。

sealed interface UiEvent {data class Toast(val msg: String): UiEventdata object NavigateSettings: UiEvent
}class HomeVm: ViewModel() {private val _events = MutableSharedFlow<UiEvent>(replay = 0, extraBufferCapacity = 1)val events = _events.asSharedFlow()fun saveOk() { _events.tryEmit(UiEvent.Toast("保存成功")) }fun goSettings() = viewModelScope.launch { _events.emit(UiEvent.NavigateSettings) }
}

UI 收集:

viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {vm.events.collect { e -> handleEvent(e) }}
}

5. Channel 与 Actor 模式(重点)

5.1 Channel 基础

  • 协程间的线程安全 FIFO 队列

容量类型:

类型行为
Channel.RENDEZVOUS (0)无缓冲,一发一收,强背压
Channel(n)有界缓冲,满了 send 挂起、trySend 失败
Channel.BUFFERED默认小缓冲(实现相关,一般 64)
Channel.UNLIMITED无界,可能 OOM,慎用
Channel.CONFLATED只保留最新,旧的丢掉

背压策略:

  • send(x):队列满时挂起,不丢

  • trySend(x):队列满时立即失败,可能丢

5.2 Actor 模式 = Channel + 单消费者协程

保证:所有任务顺序处理,避免并发乱序。

(1) 命令队列(严格 FIFO,不可丢)
sealed interface Cmd { data class Write(val bytes:ByteArray): Cmd; data object Close: Cmd }class SerialActor(private val port: SerialPort): Closeable {private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)private val inbox = Channel<Cmd>(64)init {scope.launch {for (c in inbox) when(c) {is Cmd.Write -> port.write(c.bytes)Cmd.Close    -> { port.close(); break }}}}suspend fun write(b: ByteArray) = inbox.send(Cmd.Write(b)) // 顺序执行fun tryWrite(b: ByteArray) = inbox.trySend(Cmd.Write(b)).isSuccessoverride fun close() { inbox.trySend(Cmd.Close); inbox.close(); scope.cancel() }
}

(2) 请求-应答(一发一收)

sealed interface Req {data class ReadAngle(val reply: CompletableDeferred<Int>): Req
}class MotorActor(private val port: SerialPort) {private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)private val box = Channel<Req>(32)init {scope.launch {for (r in box) when(r) {is Req.ReadAngle -> {val v = port.readAngleBlocking()r.reply.complete(v)}}}}suspend fun readAngle(): Int {val d = CompletableDeferred<Int>()box.send(Req.ReadAngle(d))return d.await()}
}

(3) 只要最新(高频状态)

private val _angle = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1)
fun setAngle(d: Int) { _angle.tryEmit(d) }viewModelScope.launch(Dispatchers.IO) {_angle.distinctUntilChanged().collectLatest { d -> motor.goto(d) }
}

5.3 背压与丢弃策略

suspend fun safeSend(ch: Channel<Cmd>, c: Cmd) {var ok = ch.trySend(c).isSuccessvar delayMs = 5Lwhile (!ok && delayMs <= 40) {delay(delayMs)ok = ch.trySend(c).isSuccessdelayMs *= 2}
}

5.4 多消费者与“抢活”

  • 电机、串口:必须严格顺序 → 单消费者

  • 日志落盘、图像处理:允许乱序 → 多消费者形成工作池

6. callbackFlow

回调转 Flow:

fun motorAngles(): Flow<Int> = callbackFlow {val l = object: AngleListener {override fun onAngle(d:Int) { trySend(d).isSuccess }override fun onError(e:Throwable) { close(e) }}motor.addListener(l)awaitClose { motor.removeListener(l) }
}.buffer(64).conflate().flowOn(Dispatchers.IO)

UI 收集:

motorAngles().collectLatest { d -> motor.goto(d) }

7. lifecycleScope vs viewModelScope

  • 生产:ViewModel → viewModelScope

  • 消费:UI → viewLifecycleOwner.lifecycleScope

class DashboardVm: ViewModel() {private val _state = MutableStateFlow(UiState())val state = _state.asStateFlow()fun load() = viewModelScope.launch { _state.update { it.copy(loading=true) } }
}viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {vm.state.collect { render(it) }}
}

8. 高频命令场景

逐条执行(严格 FIFO):

val channel = Channel<Int>(64)
launch { for (d in channel) motor.goto(d) }

只要最新(丢弃旧的):

val latest = MutableSharedFlow<Int>(replay=0, extraBufferCapacity=1)
fun setAngle(d:Int) { latest.tryEmit(d) }launch {latest.distinctUntilChanged().collectLatest { d -> motor.goto(d) }
}

9. 背压工具清单

  • buffer(n):缓存队列

  • conflate():只保最新

  • collectLatest{}:新来取消旧

  • debounce()/sample():控速

  • flowOn(IO):切线程


10. 高频命令进阶(角度缓存问题)

// 每条角度都执行
val cmd = Channel<Int>(64)
launch { for (d in cmd) motor.goto(d) }// 只执行最新
val latest = MutableSharedFlow<Int>(replay=0, extraBufferCapacity=1)
launch { latest.collectLatest { d -> motor.goto(d) } }

11. 选型速查表

场景工具说明
页面状态StateFlow粘性
一次性事件(单播)ChannelFIFO
一次性事件(广播)SharedFlow(replay=0)多观察者
严格顺序任务Channel + Actor顺序执行
回调源callbackFlow回调转流
高频状态只要最新conflate / collectLatest丢旧保新

12. 完整模板(VM + UI)

class Vm : ViewModel() {private val _state = MutableStateFlow(UiState())val state: StateFlow<UiState> = _stateprivate val _events = Channel<UiEvent>(Channel.BUFFERED)val events = _events.receiveAsFlow()private val _angleCmd = MutableSharedFlow<Int>(replay=0, extraBufferCapacity=1)fun setAngle(d: Int) { _angleCmd.tryEmit(d) }init {viewModelScope.launch(Dispatchers.IO) {_angleCmd.collectLatest { d -> motor.goto(d) }}}fun goSettings() = viewModelScope.launch { _events.send(UiEvent.NavigateSettings) }
}

viewLifecycleOwner.lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {launch { vm.state.collect { render(it) } }launch { vm.events.collect { handle(it) } }}
}

结语

  • LiveData 粘性SingleLiveEvent 权宜Flow 体系全面取代

  • StateFlow 管状态,SharedFlow 管事件广播,Channel/Actor 管严格队列,callbackFlow 管回调源。

  • Flow 天然背压,配合 buffer/conflate/collectLatest/debounce 灵活控速。

  • UI 用 lifecycleScope 收集,VM 用 viewModelScope 生产,职责清晰,生命周期安全。

一句话总结:状态用 StateFlow,广播事件用 SharedFlow,单播/队列用 Channel(配 Actor),回调用 callbackFlow。

http://www.dtcms.com/a/449346.html

相关文章:

  • 数据库与缓存数据一致性的全部方案
  • 算命公司网站建设制作开发方案网站商城怎么做app
  • 遗传算法解决TSP问题
  • MVC的含义
  • DBSCAN 密度聚类算法
  • 【极客日常】用Eino+Ollama低成本研发LLM的Agent
  • 《深入 Django ORM:select_related 与 prefetch_related 的实战剖析与性能优化指南》
  • 男科医院网站模板视频加字幕软件app
  • 网站开发自荐信江门专业网站制作费用
  • nat address-group 概念及题目
  • 深度学习模型构建的本质——“核心四要素+任务适配逻辑”
  • 基于SpringBoot+Vue的志行交通法规在线模拟考试(AI问答、WebSocket即时通讯、Echarts图形化分析、随机测评)
  • 厦门建网站费用一览表网站设计流行趋势
  • Docker Compose 搭建 LNMP 环境并部署 WordPress 论坛
  • 无锡企业网站制作哪家好前端的网站重构怎么做
  • TensorFlow2 Python深度学习 - 深度学习概述
  • Davor的北极探险资金筹集:数学建模与算法优化(洛谷P4956)
  • Web Components 的开发过程举例
  • 【Algorithm】Day-1
  • 提示工程深度解析:驾驭大语言模型的艺术与科学
  • 网站开发证书是什么中国建设学会查询网站
  • java代码随想录day50|图论理论基础
  • 【模型量化迁移】详解:让AI大模型在端侧“轻装上阵”的核心技术
  • 【Proteus仿真】虚拟终端出现乱码问题解决
  • 深入理解HarmonyOS ArkTS语法:从基础到高级应用开发
  • Photoshop - Photoshop 工具栏(5)多边套索工具
  • 做彩票网站空间去哪买网站主播
  • JavaWeb--Ajax
  • 网站建设与维护报告总结许昌网站建设汉狮套餐
  • [初学C语言]关于scanf和printf函数