Kotlin 协程之 SharedFlow 与 StateFlow 深度解析
前言
在前面的文章中,我们已经深入探讨了 Flow
的基础概念、创建方式以及 ChannelFlow
突破 Flow
限制等知识点。
今天我们来看一下 Kotlin
协程中的两个重要成员:SharedFlow 和 StateFlow。
我们通常把 SharedFlow
叫做事件流,把 StateFlow
叫做状态流。在日常开发中,事件 和 状态 是非常聚焦且常见的应用场景,因此 Kotlin 协程 针对这两个场景专门提供了 SharedFlow
和 StateFlow
。
SharedFlow
和 StateFlow
是对 Flow
的进一步抽象和扩展,但是它们与普通 Flow
有着本质区别
特性 | Flow(冷流) | SharedFlow/StateFlow(热流) |
---|---|---|
数据生产时机 | 只有消费层触发收集时,生产层才开始工作 | 无论是否有消费者都会生产数据 |
消费者关系 | 每个消费者独立触发数据生产 | 多个消费者共享同一个数据源 |
接口设计 | 实现 Flow 接口 | 同样实现 Flow 接口(保持消费端 API 一致性) |
生产逻辑 | 基于协程构建器的冷流生产逻辑 | 完全独立实现的热流生产逻辑,采用发布-订阅模式 |
缓冲机制 | 依赖 Channel 的缓冲策略 | 自定义缓冲区实现,支持重放和额外缓冲 |
无论叫事件流还是状态流,本质上都是数据流。关键是要弄清楚它们的内部实现机制、缓冲策略以及适用场景。
SharedFlow 详解
SharedFlow
是 热流,但是跟 Channel
的生产消费模型不同,SharedFlow
采用的是 发布订阅(Publish-Subscribe)模型。当数据发送后,会被 广播 给所有正在收集的订阅者。
继承关系
从继承关系图可以看出:
SharedFlow
本身是一个接口,继承自接口Flow
SharedFlow
有两个子接口:MutableSharedFlow
和StateFlow
,也就是说StateFlow
是基于SharedFlow
实现的SharedFlow
核心实现类是SharedFlowImpl
MutableSharedFlow
我们可以通过 MutableSharedFlow()
这个顶层函数来创建 SharedFlow
,返回值类型是 MutableSharedFlow<T>
。
MutableSharedFlow() 源码
public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {// 参数校验require(replay >= 0) { "replay cannot be negative, but was $replay" }require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"}// 计算总缓冲容量val bufferCapacity0 = replay + extraBufferCapacityval bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0// 返回具体实现return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
从源码可以看到,SharedFlow
的缓冲容量是 replay
与 extraBufferCapacity
之和。具体的实现则是由 SharedFlowImpl
完成,SharedFlowImpl
内部的实现还是非常复杂的。
基本使用方式:
因为继承自 Flow
,所以在使用上基本上跟 Flow
的 API
是一样的:
- 使用
emit()
/tryEmit()
发射数据 - 使用
collect()
订阅数据
参数说明
MutableSharedFlow()
函数有三个参数,如下:
参数 | 类型 | 默认值 | 作用说明 |
---|---|---|---|
replay | Int | 0 | 重放缓存大小 • 新订阅者能立即收到的历史数据条数 • 值为0时,新订阅者只能收到订阅后发射的数据 • 值大于0时,新订阅者会先收到最近的N条历史数据 |
extraBufferCapacity | Int | 0 | 额外缓冲容量 • 除重放缓存外的额外缓冲空间 • 用于减少背压处理,提高发射性能 • 总缓冲容量 = replay + extraBufferCapacity |
onBufferOverflow | BufferOverflow | SUSPEND | 缓冲区溢出策略 • SUSPEND : 挂起等待,直到有空间可用• DROP_LATEST : 丢弃最新发射的数据• DROP_OLDEST : 丢弃最旧的数据,为新数据腾出空间 |
参数组合规则
根据源码中的参数校验逻辑,参数组合必须满足以下条件:
// 当使用非默认的溢出策略时,必须配置缓冲区
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND)
规则说明:
- 如果使用
DROP_LATEST
或DROP_OLDEST
策略,必须设置replay > 0
或extraBufferCapacity > 0
- 使用
SUSPEND
策略时,可以不设置任何缓冲区
使用示例
1. 热流&数据广播
suspend fun hotFlowDemo() {val hotFlow = MutableSharedFlow<String>()println("\u001B[33m[说明] 热流特性:无论是否有订阅者,数据都会发射\u001B[0m")// 生产者:立即开始发射数据val producer = GlobalScope.launch {repeat(3) { i ->val data = "热流数据$i"println("\u001B[32m[生产者] 发射: $data(此时无订阅者)\u001B[0m")hotFlow.emit(data)delay(300)}}delay(1000) // 等待生产者发射完成// 消费者1:晚加入val consumer1 = GlobalScope.launch {println("\u001B[34m[消费者1] 开始订阅(错过了前面的数据)\u001B[0m")hotFlow.collect { data ->println("\u001B[34m[消费者1] 收到: $data\u001B[0m")}}delay(200)// 继续发射数据val producer2 = GlobalScope.launch {repeat(2) { i ->val data = "新数据${i + 3}"println("\u001B[32m[生产者] 发射: $data\u001B[0m")hotFlow.emit(data)delay(300)}}delay(500)// 消费者2:更晚加入val consumer2 = GlobalScope.launch {println("\u001B[35m[消费者2] 开始订阅(只能收到后续数据)\u001B[0m")hotFlow.collect { data ->println("\u001B[35m[消费者2] 收到: $data\u001B[0m")}}delay(200)// 最后发射一条数据println("\u001B[32m[生产者] 发射: 最终数据\u001B[0m")hotFlow.emit("最终数据")delay(500)producer.join()producer2.join()consumer1.cancel()consumer2.cancel()
}
- 热流特性:无论是否有订阅者,数据都会发射
- 数据广播:数据会广播给所有订阅者
2. 数据重放
suspend fun replayMechanismDemo() {// 设置重放数量为3val sharedFlow = MutableSharedFlow<String>(replay = 3)println("\u001B[33m[说明] 创建replay=3的SharedFlow,新订阅者可获取最近3条数据\u001B[0m")// 生产者:先发射一些数据val producer = GlobalScope.launch {listOf("数据1", "数据2", "数据3", "数据4").forEach { data ->delay(100)println("\u001B[32m[生产者] 发射: $data\u001B[0m")sharedFlow.emit(data)}}delay(500) // 等待生产者发射一些数据// 消费者1:晚加入,应该收到重放数据val consumer1 = GlobalScope.launch {println("\u001B[34m[消费者1] 开始订阅(收到最近3条重放数据)\u001B[0m")sharedFlow.collect { data ->println("\u001B[34m[消费者1] 收到: $data\u001B[0m")}}delay(300)// 继续发射数据GlobalScope.launch {listOf("数据5", "数据6").forEach { data ->delay(200)println("\u001B[32m[生产者] 发射: $data\u001B[0m")sharedFlow.emit(data)}}delay(600)consumer1.cancel()producer.join()
}
可以看到,如果设置了重放数量为3:
- 新订阅者可获取最近3条数据
3. 数据缓冲&溢出策略
配置额外缓冲区 + 默认的策略 SUSPEND
private suspend fun suspendStrategyDemo() {println("\u001B[33m[策略1] SUSPEND - 背压时挂起发射者\u001B[0m")val suspendFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)// SUSPEND消费者val suspendConsumer = GlobalScope.launch {suspendFlow.collect { value ->println("\u001B[34m[SUSPEND消费者] 收到: $value\u001B[0m")delay(500) // 模拟慢消费}}delay(200)// 生产者val suspendProducer = GlobalScope.launch {repeat(5) { i ->println("\u001B[32m[SUSPEND生产者] 准备发射: $i\u001B[0m")suspendFlow.emit(i) // 当缓冲区满时会挂起println("\u001B[32m[SUSPEND生产者] 已发射: $i\u001B[0m")}}delay(2000)suspendProducer.cancel()suspendConsumer.cancel()
}
配置额外缓冲区,并使用 DROP_OLDEST
策略
private suspend fun dropOldestStrategyDemo() {println("\u001B[33m[策略2] DROP_OLDEST - 丢弃最旧数据\u001B[0m")val dropOldestFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)val emittedDataOldest = mutableListOf<Int>()val receivedDataOldest = mutableListOf<Int>()// 消费者val dropOldestConsumer = GlobalScope.launch {println("\u001B[35m[DROP_OLDEST消费者] 开始订阅并慢速处理\u001B[0m")dropOldestFlow.collect { value ->receivedDataOldest.add(value)println("\u001B[35m[DROP_OLDEST消费者] 收到: $value\u001B[0m")delay(600) // 模拟慢消费,让缓冲区容易满}}// 生产者val dropOldestProducer = GlobalScope.launch {println("\u001B[32m[DROP_OLDEST生产者] 开始快速发射数据\u001B[0m")repeat(6) { i ->delay(100) // 快速发射val value = i + 1emittedDataOldest.add(value)dropOldestFlow.tryEmit(value)println("\u001B[32m[DROP_OLDEST生产者] 发射: $value\u001B[0m")}}delay(3000)dropOldestProducer.cancel()dropOldestConsumer.cancel()// 显示丢弃效果delay(100)println("\u001B[33m[DROP_OLDEST分析] 发射的数据: $emittedDataOldest\u001B[0m")println("\u001B[33m[DROP_OLDEST分析] 接收的数据: $receivedDataOldest\u001B[0m")val droppedOldest = emittedDataOldest - receivedDataOldest.toSet()if (droppedOldest.isNotEmpty()) {println("\u001B[31m[DROP_OLDEST分析] 被丢弃的旧数据: $droppedOldest\u001B[0m")} else {println("\u001B[32m[DROP_OLDEST分析] 没有数据被丢弃\u001B[0m")}
}
配置额外缓冲区,并使用 DROP_LATEST
策略
private suspend fun dropLatestStrategyDemo() {println("\u001B[33m[策略3] DROP_LATEST - 丢弃最新数据\u001B[0m")val dropLatestFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST)val emittedDataLatest = mutableListOf<Int>()val receivedDataLatest = mutableListOf<Int>()// 消费者val dropLatestConsumer = GlobalScope.launch {println("\u001B[36m[DROP_LATEST消费者] 开始订阅并慢速处理\u001B[0m")dropLatestFlow.collect { value ->delay(800)receivedDataLatest.add(value)println("\u001B[36m[DROP_LATEST消费者] 收到: $value\u001B[0m")}}// 生产者:延迟启动,快速发射数据val dropLatestProducer = GlobalScope.launch {println("\u001B[33m[DROP_LATEST生产者] 开始快速发射数据\u001B[0m")repeat(6) { i ->delay(100) // 快速发射,比消费者处理速度快很多val value = i + 1emittedDataLatest.add(value)dropLatestFlow.tryEmit(value) // tryEmit总是返回true,不能用来判断丢弃println("\u001B[33m[DROP_LATEST生产者] 发射: $value\u001B[0m")}}delay(4000) // 等待演示完成dropLatestProducer.cancel()dropLatestConsumer.cancel()// 显示丢弃效果delay(100)println("\u001B[33m[DROP_LATEST分析] 发射的数据: $emittedDataLatest\u001B[0m")println("\u001B[33m[DROP_LATEST分析] 接收的数据: $receivedDataLatest\u001B[0m")val droppedLatest = emittedDataLatest - receivedDataLatest.toSet()if (droppedLatest.isNotEmpty()) {println("\u001B[31m[DROP_LATEST分析] 被丢弃的新数据: $droppedLatest\u001B[0m")} else {println("\u001B[32m[DROP_LATEST分析] 没有数据被丢弃\u001B[0m")}
}
sharedIn
如果数据的产生是由自己控制的,那么我们可以直接通过 MutableSharedFlow
来创建一个 SharedFlow
。
但是如果数据的产生是由一个外部的冷流(Flow
)提供,但是我们又想转换为热流使用,那么就可以使用 shareIn
操作符来把一个冷流转换为热流。
public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)val shared = MutableSharedFlow<T>(replay = replay,extraBufferCapacity = config.extraBufferCapacity,onBufferOverflow = config.onBufferOverflow)@Suppress("UNCHECKED_CAST")val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}
参数说明
shareIn
函数有三个参数:
参数 | 类型 | 作用说明 |
---|---|---|
scope | CoroutineScope | 指定共享流的作用域,决定了热流的生命周期 |
started | SharingStarted | 控制共享流的启动策略,有三种选择: • Eagerly : 立即开始收集,即使没有订阅者• Lazily : 第一个订阅者出现时开始收集• WhileSubscribed : 有订阅者时收集,无订阅者时停止 |
replay | Int | 新订阅者可以获得的历史数据数量,类似 SharedFlow 的 replay 参数 |
started 启动策略
策略 | 启动时机 | 停止时机 | 适用场景 |
---|---|---|---|
Eagerly | 立即启动 | 作用域结束 | • 数据必须从一开始就收集 • 数据生产成本较低 • 不关心资源消耗 |
Lazily | 首个订阅 | 作用域结束 | • 数据随第一个订阅开始 • 之后保持收集状态 • 允许错过初始数据 |
WhileSubscribed | 首个订阅 | 最后订阅取消 | • 按需收集数据 • 需要节省资源 • 数据生产成本高 |
使用示例
scope
和 replay
参数都很好理解,我们重点看下 started
参数的使用。
1. Eagerly 策略示例
suspend fun sharedInEagerlyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 Eagerly 策略:冷流转热流后立即开始收集数据\u001B[0m")println("\u001B[32m[Eagerly-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Eagerly-生产者] 开始生产数据(冷流 -> 热流后立即执行)\u001B[0m")var i = 1try {while (true) {val data = "数据 ${i++}"println("\u001B[32m[Eagerly-生产者] 发射:$data\u001B[0m")emit(data)delay(1000)}} finally {println("\u001B[32m[Eagerly-生产者] 数据生产已停止\u001B[0m")}}println("\u001B[32m[Eagerly-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")delay(1000)println("\u001B[32m[Eagerly-生产者] 准备将冷流转换为热流...\u001B[0m")// 2. 转换为热流(使用 Eagerly 策略,会立即开始收集)val sharedFlow = coldFlow.shareIn(scope = scope,started = SharingStarted.Eagerly, // 立即开始收集,即使没有订阅者replay = 0 // 不保留历史数据)println("\u001B[32m[Eagerly-生产者] 已转换为热流,此时会触发冷流开始生产数据\u001B[0m")delay(2000) // 等待一会儿,让我们观察到数据生产情况// 3. 添加第一个订阅者(此时生产者已经在发射数据)val job1 = scope.launch {println("\u001B[34m[Eagerly-订阅者1] 开始订阅(生产者已经在发射数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[34m[Eagerly-订阅者1] 收到:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享之前已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Eagerly-订阅者2] 开始订阅(与订阅者1共享数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[35m[Eagerly-订阅者2] 收到:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 即使取消所有订阅者,Eagerly策略下数据收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产
}
2. Lazily 策略示例
suspend fun sharedInLazilyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 Lazily 策略:首个订阅者触发数据收集\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Lazily-生产者] 开始生产数据(首个订阅者触发)\u001B[0m")var i = 1try {while (true) {val data = "数据 ${i++}"println("\u001B[32m[Lazily-生产者] 发射:$data\u001B[0m")emit(data)delay(1000)}} finally {println("\u001B[32m[Lazily-生产者] 数据生产已停止\u001B[0m")}}println("\u001B[32m[Lazily-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备将冷流转换为热流...\u001B[0m")// 2. 转换为热流(使用 Lazily 策略,等待首个订阅者)val sharedFlow = coldFlow.shareIn(scope = scope,started = SharingStarted.Lazily, // 首个订阅者到来时才开始收集replay = 0 // 不保留历史数据)println("\u001B[32m[Lazily-生产者] 已转换为热流,但要等待首个订阅者才开始生产数据\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据// 3. 添加第一个订阅者(此时才开始生产数据)val job1 = scope.launch {println("\u001B[34m[Lazily-订阅者1] 开始订阅(这将触发数据生产)\u001B[0m")sharedFlow.collect { value ->println("\u001B[34m[Lazily-订阅者1] 收到:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Lazily-订阅者2] 开始订阅(与订阅者1共享数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[35m[Lazily-订阅者2] 收到:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 取消所有订阅者后,Lazily策略下数据收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产
}
3. WhileSubscribed 策略示例
suspend fun sharedInWhileSubscribedDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 WhileSubscribed 策略:按需收集数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[WhileSubscribed-生产者] 开始生产数据(有订阅者时执行)\u001B[0m")var i = 1try {while (true) {val data = "数据 ${i++}"println("\u001B[32m[WhileSubscribed-生产者] 发射:$data\u001B[0m")emit(data)delay(1000)}} finally {println("\u001B[32m[WhileSubscribed-生产者] 数据生产已停止(无订阅者)\u001B[0m")}}println("\u001B[32m[WhileSubscribed-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备将冷流转换为热流...\u001B[0m")// 2. 转换为热流(使用 WhileSubscribed 策略,仅在有订阅者时收集)val sharedFlow = coldFlow.shareIn(scope = scope,started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 1000 // 所有订阅者取消后等待1秒再停止收集,避免订阅者快速重订阅时重复创建上游Flow),replay = 0 // 不保留历史数据)println("\u001B[32m[WhileSubscribed-生产者] 已转换为热流,但要等待订阅者才开始生产数据\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据// 3. 添加第一个订阅者(此时开始生产数据)val job1 = scope.launch {println("\u001B[34m[WhileSubscribed-订阅者1] 开始订阅(触发数据生产)\u001B[0m")sharedFlow.collect { value ->println("\u001B[34m[WhileSubscribed-订阅者1] 收到:$value\u001B[0m")}}delay(1200)// 4. 添加第二个订阅者(共享数据)val job2 = scope.launch {println("\u001B[35m[WhileSubscribed-订阅者2] 开始订阅(与订阅者1共享数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[35m[WhileSubscribed-订阅者2] 收到:$value\u001B[0m")}}delay(1200)println("\u001B[33m[说明] 取消所有订阅者后,等待1秒钟,WhileSubscribed策略将停止数据收集\u001B[0m")job1.cancel()job2.cancel()delay(1500) // 等待超过stopTimeoutMillis,此时成产数据会停止println("\u001B[33m[说明] 现在添加新的订阅者,将重新触发数据收集\u001B[0m")val job3 = scope.launch {println("\u001B[36m[WhileSubscribed-订阅者3] 开始订阅(重新触发数据生产)\u001B[0m")sharedFlow.collect { value ->println("\u001B[36m[WhileSubscribed-订阅者3] 收到:$value\u001B[0m")}}delay(2000)job3.cancel()delay(5000) // 再次等待,观察数据生产停止
}
实际应用场景中的策略选择示例:
-
网络请求结果共享
- 使用
Lazily
策略:首次订阅时才发起请求 - 设置
replay = 1
:新订阅者可获取最新响应
- 使用
-
传感器数据广播
- 使用
WhileSubscribed
策略:按需采集数据,节省资源 - 设置
replay = 0
:只关注实时数据
- 使用
-
用户操作事件分发
- 使用
Eagerly
策略:立即开始监听,不错过任何事件 - 设置
replay = 0
:事件型数据无需重放
- 使用
SharedFlow 核心特性总结
特性 | 说明 |
---|---|
热流 | 无论是否有收集者都会发射数据 |
多播 | 多个收集者共享同一数据源 |
重放 | 新订阅者可获取历史数据 |
缓冲 | 可配置缓冲数据容量,当消费端过慢时用于暂存数据 |
溢出策略 | 配置缓冲区满时的处理策略 |
StateFlow
StateFlow
是特殊版本的 SharedFlow
,专门用于状态管理。它在 SharedFlow
的基础上增加了状态语义,确保始终有一个当前状态值,并且只有当新值与当前值不同时才会发射。
StateFlow
非常适合用于 UI 状态管理、配置管理等需要状态保持和去重的场景。
MutableStateFlow
我们可以通过 MutableStateFlow()
顶层函数来创建 StateFlow
,返回值类型是 MutableStateFlow<T>
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {/*** 当前状态流的值。** 设置一个与前一个值[相等][Any.equals]的值不会产生任何效果。** 此属性是**线程安全的**,可以在并发协程中安全更新,无需外部同步。*/public override var value: T/*** 原子性地比较当前[value]与[expect],如果相等则将其设置为[update]。* 如果[value]被设置为[update]则返回`true`,否则返回`false`。** 此函数使用[Any.equals]进行常规比较。如果[expect]和[update]都等于当前[value],* 此函数返回`true`,但实际上不会改变存储在[value]中的引用。** 此方法是**线程安全的**,可以在并发协程中安全调用,无需外部同步。*/public fun compareAndSet(expect: T, update: T): Boolean
}public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
从源码可以看到:
MutableStateFlow
构造函数只需要一个参数:初始值- 内部实现是
StateFlowImpl
使用示例
// 创建 StateFlow
val stateFlow = MutableStateFlow("初始状态")// 更新状态,直接赋值
stateFlow.value = "新状态"
// 非挂起的发射数据,效果同直接赋值
stateFlow.tryEmit("")
// 或者使用挂起的方式发射数据
stateFlow.emit("新状态")// 获取当前状态
val currentState = stateFlow.value// 订阅状态变化
stateFlow.collect { state ->println("状态变化: $state")
}
整个 api 非常简单,跟 SharedFlow
的 api 一样,只是多了一个 value
属性。
StateFlow 赋值方式对比
赋值方式 | 语法 | 函数类型 | 使用场景 | 特点 |
---|---|---|---|---|
直接赋值 | stateFlow.value = newValue | 属性访问 | 任何地方都可使用 | • 同步操作 • 不需要协程环境 • 最常用的方式 |
tryEmit | stateFlow.tryEmit(newValue) | 普通函数 | 非协程环境 | • 同步操作 • 返回Boolean表示是否成功 • 对StateFlow总是返回true |
emit | stateFlow.emit(newValue) | 挂起函数 | 协程中使用 | • 异步操作 • 必须在协程中调用 • 与Flow保持一致的API |
不管是哪种赋值方式,底层都是走到了 StateFlowImpl
的 updateState
方法中:
// StateFlowImpl 中的核心属性和方法
private val _state = atomic(initialState) // 原子引用,存储当前状态值
private var sequence = 0 // 序列号,用于同步更新操作// value 属性的 getter 和 setter
public override var value: T
get() = NULL.unbox(_state.value) // 获取当前状态值,NULL.unbox 用于处理 null 值
set(value) {// 设置新值时调用 updateState,expectedState 为 null 表示不进行 CAS 检查updateState(null, value ?: NULL)
}// tryEmit 方法:非挂起函数,直接调用 value setter
override fun tryEmit(value: T): Boolean {this.value = value // 直接赋值给 value 属性return true // StateFlow 的 tryEmit 总是返回 true
}// emit 方法:挂起函数,但实际上也是直接调用 value setter
override suspend fun emit(value: T) {this.value = value // 与 tryEmit 相同,直接赋值
}/*** StateFlow 的核心更新方法* @param expectedState 期望的当前状态(用于 CAS 操作),null 表示不检查* @param newState 新的状态值* @return 是否成功更新*/
private fun updateState(expectedState: Any?, newState: Any): Boolean {var curSequence: Intvar curSlots: Array<StateFlowSlot?>? // 当前的订阅者槽位数组// === 第一阶段:在锁内进行状态检查和更新 ===synchronized(this) {val oldState = _state.value// CAS 支持:如果指定了期望状态但与当前状态不匹配,则更新失败if (expectedState != null && oldState != expectedState) {return false // compareAndSet 操作失败}// 去重检查:如果新值与旧值相同,直接返回 true(不需要通知订阅者)if (oldState == newState) {return true // 值没有变化,但 CAS 操作成功}// 更新状态值_state.value = newState// 获取当前序列号curSequence = sequence// 序列号管理:偶数表示静止状态,奇数表示正在更新if (curSequence and 1 == 0) {// 当前是静止状态(偶数),开始新的更新curSequence++ // 变为奇数,表示开始更新sequence = curSequence} else {// 已经有更新在进行中,通知正在进行的更新并返回sequence = curSequence + 2 // 增加2保持奇数,通知有新的更新return true // 更新已提交,由正在进行的更新负责通知}// 在锁内读取当前的订阅者槽位curSlots = slots}// === 第二阶段:在锁外通知所有订阅者 ===/** 在锁外触发值更新,避免与无限制协程产生死锁。* 循环直到完成所有更改的触发。这是一种简单的扁平化合并,* 确保并发更新的顺序触发,避免多线程并发更新时订阅者恢复的风暴。*/while (true) {// 通知所有当前的订阅者(可能存在良性竞争)curSlots?.forEach { slot ->slot?.makePending() // 标记槽位为待处理状态,触发订阅者}// 检查在通知过程中是否有新的更新synchronized(this) {if (sequence == curSequence) {// 序列号没有变化,说明没有新的更新,完成当前更新sequence = curSequence + 1 // 变为偶数,表示回到静止状态return true // 更新完成}// 有新的更新,重新读取序列号和槽位,继续下一轮通知curSequence = sequencecurSlots = slots}}
}
updateState 方法核心逻辑解析
- 线程安全:使用
synchronized
确保状态更新的原子性 - CAS 支持:支持 Compare-And-Set 操作,用于
compareAndSet
方法 - 去重机制:自动过滤相同值,避免不必要的通知
- 序列号管理:
- 偶数序列号:
StateFlow
处于静止状态 - 奇数序列号:
StateFlow
正在进行更新操作
- 偶数序列号:
- 扁平化合并:多个并发更新会被合并,避免订阅者通知风暴
- 锁外通知:在锁外通知订阅者,避免死锁
特性示例
始终保持最新的状态值且订阅者立即获取当前值
suspend fun stateFlowBasicDemo() {// 创建StateFlow,初始值为"初始状态"val stateFlow = MutableStateFlow("初始状态")println("\u001B[33m[说明] StateFlow始终保持当前状态,新订阅者立即获取当前值\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 订阅者1:立即订阅val subscriber1 = GlobalScope.launch {println("\u001B[34m[订阅者1] 开始订阅StateFlow\u001B[0m")stateFlow.collect { state ->println("\u001B[34m[订阅者1] 收到状态: $state\u001B[0m")}}delay(300)// 更新状态println("\u001B[32m[状态更新] 更新为: 状态A\u001B[0m")stateFlow.value = "状态 A"delay(300)// 订阅者2:晚加入,但能立即获取当前状态val subscriber2 = GlobalScope.launch {println("\u001B[35m[订阅者2] 晚加入,立即获取当前状态\u001B[0m")stateFlow.collect { state ->println("\u001B[35m[订阅者2] 收到状态: $state\u001B[0m")}}delay(300)// 继续更新状态println("\u001B[32m[状态更新] 更新为: 状态B\u001B[0m")stateFlow.tryEmit("状态B")delay(300)println("\u001B[32m[状态更新] 更新为: 最终状态\u001B[0m")stateFlow.emit("最终状态")delay(500)subscriber1.cancel()subscriber2.cancel()
}
具备去重特性
suspend fun stateFlowDistinctDemo() {val stateFlow = MutableStateFlow(0)println("\u001B[33m[说明] StateFlow会自动过滤相同的值,只有状态真正改变时才会通知订阅者\u001B[0m")// 订阅者val subscriber = GlobalScope.launch {println("\u001B[34m[订阅者] 开始监听状态变化\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[订阅者] 状态变化: $value\u001B[0m")}}delay(200)// 更新相同的值println("\u001B[32m[更新] 设置为0(相同值,不会触发)\u001B[0m")stateFlow.value = 0delay(200)println("\u001B[32m[更新] 设置为1(新值,会触发)\u001B[0m")stateFlow.value = 1delay(200)println("\u001B[32m[更新] 再次设置为1(相同值,不会触发)\u001B[0m")stateFlow.value = 1delay(200)println("\u001B[32m[更新] 设置为2(新值,会触发)\u001B[0m")stateFlow.value = 2delay(200)println("\u001B[32m[更新] 设置为2(相同值,不会触发)\u001B[0m")stateFlow.value = 2delay(500)subscriber.cancel()
}
stateIn
同样的,我们可以通过 stateIn
操作符来把一个冷流转换为状态流。
stateIn
与 shareIn
类似,但专门用于状态管理场景,它会自动处理状态的去重和当前值保持。
public fun <T> Flow<T>.stateIn(scope: CoroutineScope,started: SharingStarted,initialValue: T
): StateFlow<T> {val config = configureSharing(1)val state = MutableStateFlow(initialValue)val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)return ReadonlyStateFlow(state, job)
}
参数说明
stateIn
函数有三个参数:
参数 | 类型 | 作用说明 |
---|---|---|
scope | CoroutineScope | 指定状态流的作用域,决定了状态流的生命周期 |
started | SharingStarted | 控制状态流的启动策略,有三种选择: • Eagerly : 立即开始收集,即使没有订阅者• Lazily : 第一个订阅者出现时开始收集• WhileSubscribed : 有订阅者时收集,无订阅者时停止 |
initialValue | T | 状态流的初始值,这是 StateFlow 必须的参数,确保始终有一个当前状态 |
使用示例
熟悉了 sharedIn
的参数后,stateIn
就更好理解了,started
的作用都是一致的,无非就是需要一个初始值
基本使用方式:
1. Eagerly 策略示例
suspend fun stateInEagerlyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 stateIn Eagerly 策略:冷流转状态流后立即开始收集数据\u001B[0m")println("\u001B[32m[Eagerly-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Eagerly-生产者] 开始生产状态数据(冷流 -> 状态流后立即执行)\u001B[0m")var i = 1try {while (true) {val state = "状态 ${i++}"println("\u001B[32m[Eagerly-生产者] 发射状态:$state\u001B[0m")emit(state)delay(1000)}} finally {println("\u001B[32m[Eagerly-生产者] 状态生产已停止\u001B[0m")}}println("\u001B[32m[Eagerly-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")delay(1000) // 等待一会儿,证明冷流确实没有开始生产println("\u001B[32m[Eagerly-生产者] 准备将冷流转换为状态流...\u001B[0m")// 2. 转换为状态流(使用 Eagerly 策略,会立即开始收集)val stateFlow = coldFlow.stateIn(scope = scope,started = SharingStarted.Eagerly, // 立即开始收集,即使没有订阅者initialValue = "初始状态" // StateFlow 必须有初始值)println("\u001B[32m[Eagerly-生产者] 已转换为状态流,此时已经开始生产数据\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")delay(2000)println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 3. 添加第一个订阅者(此时生产者已经在发射数据)val job1 = scope.launch {println("\u001B[34m[Eagerly-订阅者1] 开始订阅(生产者已经在发射数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[Eagerly-订阅者1] 收到状态:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享之前已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Eagerly-订阅者2] 开始订阅(与订阅者1共享数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[35m[Eagerly-订阅者2] 收到状态:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 即使取消所有订阅者,Eagerly策略下状态收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")
}
2. Lazily 策略示例
suspend fun stateInLazilyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 stateIn Lazily 策略:首个订阅者触发状态收集\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Lazily-生产者] 开始生产状态数据(首个订阅者触发)\u001B[0m")var i = 1try {while (true) {val state = "状态 ${i++}"println("\u001B[32m[Lazily-生产者] 发射状态:$state\u001B[0m")emit(state)delay(1000)}} finally {println("\u001B[32m[Lazily-生产者] 状态生产已停止\u001B[0m")}}println("\u001B[32m[Lazily-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备将冷流转换为状态流...\u001B[0m")// 2. 转换为状态流(使用 Lazily 策略,等待首个订阅者)val stateFlow = coldFlow.stateIn(scope = scope,started = SharingStarted.Lazily, // 首个订阅者到来时才开始收集initialValue = "初始状态" // StateFlow 必须有初始值)println("\u001B[32m[Lazily-生产者] 已转换为状态流,但要等待首个订阅者才开始生产数据\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 3. 添加第一个订阅者(此时才开始生产数据)val job1 = scope.launch {println("\u001B[34m[Lazily-订阅者1] 开始订阅(这将触发状态生产,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[Lazily-订阅者1] 收到状态:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Lazily-订阅者2] 开始订阅(与订阅者1共享数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[35m[Lazily-订阅者2] 收到状态:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 取消所有订阅者后,Lazily策略下状态收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")
}
3. WhileSubscribed 策略示例
suspend fun stateInWhileSubscribedDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 stateIn WhileSubscribed 策略:按需收集状态数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[WhileSubscribed-生产者] 开始生产状态数据(有订阅者时执行)\u001B[0m")var i = 1try {while (true) {val state = "状态 ${i++}"println("\u001B[32m[WhileSubscribed-生产者] 发射状态:$state\u001B[0m")emit(state)delay(1000)}} finally {println("\u001B[32m[WhileSubscribed-生产者] 状态生产已停止(无订阅者)\u001B[0m")}}println("\u001B[32m[WhileSubscribed-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备将冷流转换为状态流...\u001B[0m")// 2. 转换为状态流(使用 WhileSubscribed 策略,仅在有订阅者时收集)val stateFlow = coldFlow.stateIn(scope = scope,started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 1000 // 所有订阅者取消后等待1秒再停止收集,避免订阅者快速重订阅时重复创建上游Flow),initialValue = "初始状态" // StateFlow 必须有初始值)println("\u001B[32m[WhileSubscribed-生产者] 已转换为状态流,但要等待订阅者才开始生产数据\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 3. 添加第一个订阅者(此时开始生产数据)val job1 = scope.launch {println("\u001B[34m[WhileSubscribed-订阅者1] 开始订阅(触发状态生产,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[WhileSubscribed-订阅者1] 收到状态:$value\u001B[0m")}}delay(1200)// 4. 添加第二个订阅者(共享数据)val job2 = scope.launch {println("\u001B[35m[WhileSubscribed-订阅者2] 开始订阅(与订阅者1共享数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[35m[WhileSubscribed-订阅者2] 收到状态:$value\u001B[0m")}}delay(1200)println("\u001B[33m[说明] 取消所有订阅者后,等待1秒钟,WhileSubscribed策略将停止状态收集\u001B[0m")job1.cancel()job2.cancel()delay(1500) // 等待超过stopTimeoutMillis,此时生产数据会停止println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")println("\u001B[33m[说明] 现在添加新的订阅者,将重新触发状态收集\u001B[0m")val job3 = scope.launch {println("\u001B[36m[WhileSubscribed-订阅者3] 开始订阅(重新触发状态生产,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[36m[WhileSubscribed-订阅者3] 收到状态:$value\u001B[0m")}}delay(2000)job3.cancel()delay(2000) // 再次等待,观察数据生产停止println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")
}
运行效果分析:
实际应用场景中的策略选择示例:
1. 用户配置状态管理
- 使用
Lazily
策略:首次访问时才加载配置 - 设置合适的
initialValue
:提供默认配置
2. 网络连接状态监控
- 使用
WhileSubscribed
策略:按需监控,节省资源 - 设置
initialValue = false
:默认为未连接状态
3. 实时数据仪表盘
- 使用
Eagerly
策略:立即开始收集,确保数据实时性 - 设置合适的初始值:提供占位数据
StateFlow 核心特性总结
特性 | 说明 |
---|---|
状态保持 | 始终保持一个当前状态值,新订阅者立即收到当前状态 |
自动去重 | 只有当新值与当前值不同时才发射,避免不必要的更新 |
同步访问 | 通过 value 属性可以同步获取当前状态,无需异步操作 |
初始值必需 | 创建时必须提供初始值,确保始终有有效状态 |
固定重放 | 自动重放当前状态给新订阅者(相当于 replay = 1 ) |
总结
SharedFlow
和 StateFlow
都是热流,其中 StateFlow
可以看做是定制版本的 SharedFlow
。
StateFlow 与 SharedFlow 的核心区别
一个简单的代码示例看下 StateFlow
与 SharedFlow
的区别:
suspend fun stateFlowVsSharedFlowDemo() {// StateFlow:状态流,需要初始值val stateFlow = MutableStateFlow("初始状态")// SharedFlow:事件流,不配置重放数量,默认为0val sharedFlow = MutableSharedFlow<String>()println("\u001B[32mStateFlow设置: 状态A\u001B[0m")stateFlow.value = "状态A"println("\u001B[32mSharedFlow发射: 事件A\u001B[0m")sharedFlow.emit("事件A")delay(500)val stateSubscriber = GlobalScope.launch {println("\u001B[34mStateFlow订阅者启动\u001B[0m")stateFlow.collect { state ->println("\u001B[34m StateFlow收到: $state\u001B[0m")}}delay(200)val sharedSubscriber = GlobalScope.launch {println("\u001B[35mSharedFlow订阅者启动\u001B[0m")sharedFlow.collect { event ->println("\u001B[35m SharedFlow收到: $event\u001B[0m")}}delay(800)println("\u001B[33m→ 结果: StateFlow立即收到当前状态,SharedFlow错过历史事件\u001B[0m\n")println("\u001B[32mStateFlow更新: 状态B\u001B[0m")stateFlow.value = "状态B"println("\u001B[32mSharedFlow发射: 事件B\u001B[0m")sharedFlow.emit("事件B")delay(500)println("\u001B[33m→ 结果: 两个订阅者都收到新数据\u001B[0m\n")println("\u001B[32mStateFlow重复设置: 状态B\u001B[0m")stateFlow.value = "状态B"delay(300)println("\u001B[32mSharedFlow重复发射: 事件B\u001B[0m")sharedFlow.emit("事件B")delay(500)println("\u001B[33m→ 结果: StateFlow被过滤无通知,SharedFlow正常通知\u001B[0m\n")println("\u001B[32mStateFlow最终状态: 最终状态\u001B[0m")stateFlow.value = "最终状态"println("\u001B[32mSharedFlow最终事件: 最终事件\u001B[0m")sharedFlow.emit("最终事件")delay(500)println("\u001B[33m→ 结果: 两者都正常发送数据\u001B[0m")delay(1000)stateSubscriber.cancel()sharedSubscriber.cancel()
}
SharedFlow | StateFlow | |
---|---|---|
用途 | 事件流,用于事件广播 | 状态流,用于状态管理 |
初始值 | 无初始值,可以为空 | 必须有初始值 |
当前值访问 | 无法同步获取当前值 | 可通过 value 属性同步获取当前状态 |
去重机制 | 不去重,所有发射的数据都会传递 | 自动去重,只有值改变时才发射 |
重放行为 | 可配置重放数量 | 固定重放最新1个值(当前状态) |
缓冲策略 | 可配置缓冲和溢出策略 | 不可配置 |
好了, 本篇文章就是这些,希望能帮到你。
感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客