当前位置: 首页 > news >正文

Kotlin 协程之 SharedFlow 与 StateFlow 深度解析

在这里插入图片描述

前言

在前面的文章中,我们已经深入探讨了 Flow 的基础概念、创建方式以及 ChannelFlow 突破 Flow 限制等知识点。

今天我们来看一下 Kotlin 协程中的两个重要成员:SharedFlowStateFlow

我们通常把 SharedFlow 叫做事件流,把 StateFlow 叫做状态流。在日常开发中,事件状态 是非常聚焦且常见的应用场景,因此 Kotlin 协程 针对这两个场景专门提供了 SharedFlowStateFlow

SharedFlowStateFlow 是对 Flow 的进一步抽象和扩展,但是它们与普通 Flow 有着本质区别

特性Flow(冷流)SharedFlow/StateFlow(热流)
数据生产时机只有消费层触发收集时,生产层才开始工作无论是否有消费者都会生产数据
消费者关系每个消费者独立触发数据生产多个消费者共享同一个数据源
接口设计实现 Flow 接口同样实现 Flow 接口(保持消费端 API 一致性)
生产逻辑基于协程构建器的冷流生产逻辑完全独立实现的热流生产逻辑,采用发布-订阅模式
缓冲机制依赖 Channel 的缓冲策略自定义缓冲区实现,支持重放和额外缓冲

无论叫事件流还是状态流,本质上都是数据流。关键是要弄清楚它们的内部实现机制、缓冲策略以及适用场景。


SharedFlow 详解

SharedFlow热流,但是跟 Channel生产消费模型不同,SharedFlow 采用的是 发布订阅(Publish-Subscribe)模型。当数据发送后,会被 广播 给所有正在收集的订阅者。

继承关系

在这里插入图片描述

从继承关系图可以看出:

  • SharedFlow 本身是一个接口,继承自接口 Flow
  • SharedFlow 有两个子接口:MutableSharedFlowStateFlow,也就是说 StateFlow 是基于 SharedFlow 实现的
  • SharedFlow 核心实现类是 SharedFlowImpl

MutableSharedFlow

我们可以通过 MutableSharedFlow() 这个顶层函数来创建 SharedFlow,返回值类型是 MutableSharedFlow<T>

MutableSharedFlow() 源码
public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {// 参数校验require(replay >= 0) { "replay cannot be negative, but was $replay" }require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"}// 计算总缓冲容量val bufferCapacity0 = replay + extraBufferCapacityval bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0// 返回具体实现return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

从源码可以看到,SharedFlow 的缓冲容量是 replayextraBufferCapacity 之和。具体的实现则是由 SharedFlowImpl 完成,SharedFlowImpl 内部的实现还是非常复杂的。

基本使用方式:

因为继承自 Flow,所以在使用上基本上跟 FlowAPI 是一样的:

  • 使用 emit() / tryEmit() 发射数据
  • 使用 collect() 订阅数据

参数说明

MutableSharedFlow() 函数有三个参数,如下:

参数类型默认值作用说明
replayInt0重放缓存大小
• 新订阅者能立即收到的历史数据条数
• 值为0时,新订阅者只能收到订阅后发射的数据
• 值大于0时,新订阅者会先收到最近的N条历史数据
extraBufferCapacityInt0额外缓冲容量
• 除重放缓存外的额外缓冲空间
• 用于减少背压处理,提高发射性能
• 总缓冲容量 = replay + extraBufferCapacity
onBufferOverflowBufferOverflowSUSPEND缓冲区溢出策略
SUSPEND: 挂起等待,直到有空间可用
DROP_LATEST: 丢弃最新发射的数据
DROP_OLDEST: 丢弃最旧的数据,为新数据腾出空间
参数组合规则

根据源码中的参数校验逻辑,参数组合必须满足以下条件:

// 当使用非默认的溢出策略时,必须配置缓冲区
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND)

规则说明:

  • 如果使用 DROP_LATESTDROP_OLDEST 策略,必须设置 replay > 0extraBufferCapacity > 0
  • 使用 SUSPEND 策略时,可以不设置任何缓冲区

使用示例

1. 热流&数据广播
suspend fun hotFlowDemo() {val hotFlow = MutableSharedFlow<String>()println("\u001B[33m[说明] 热流特性:无论是否有订阅者,数据都会发射\u001B[0m")// 生产者:立即开始发射数据val producer = GlobalScope.launch {repeat(3) { i ->val data = "热流数据$i"println("\u001B[32m[生产者] 发射: $data(此时无订阅者)\u001B[0m")hotFlow.emit(data)delay(300)}}delay(1000) // 等待生产者发射完成// 消费者1:晚加入val consumer1 = GlobalScope.launch {println("\u001B[34m[消费者1] 开始订阅(错过了前面的数据)\u001B[0m")hotFlow.collect { data ->println("\u001B[34m[消费者1] 收到: $data\u001B[0m")}}delay(200)// 继续发射数据val producer2 = GlobalScope.launch {repeat(2) { i ->val data = "新数据${i + 3}"println("\u001B[32m[生产者] 发射: $data\u001B[0m")hotFlow.emit(data)delay(300)}}delay(500)// 消费者2:更晚加入val consumer2 = GlobalScope.launch {println("\u001B[35m[消费者2] 开始订阅(只能收到后续数据)\u001B[0m")hotFlow.collect { data ->println("\u001B[35m[消费者2] 收到: $data\u001B[0m")}}delay(200)// 最后发射一条数据println("\u001B[32m[生产者] 发射: 最终数据\u001B[0m")hotFlow.emit("最终数据")delay(500)producer.join()producer2.join()consumer1.cancel()consumer2.cancel()
}

在这里插入图片描述

  • 热流特性:无论是否有订阅者,数据都会发射
  • 数据广播:数据会广播给所有订阅者
2. 数据重放
suspend fun replayMechanismDemo() {// 设置重放数量为3val sharedFlow = MutableSharedFlow<String>(replay = 3)println("\u001B[33m[说明] 创建replay=3的SharedFlow,新订阅者可获取最近3条数据\u001B[0m")// 生产者:先发射一些数据val producer = GlobalScope.launch {listOf("数据1", "数据2", "数据3", "数据4").forEach { data ->delay(100)println("\u001B[32m[生产者] 发射: $data\u001B[0m")sharedFlow.emit(data)}}delay(500) // 等待生产者发射一些数据// 消费者1:晚加入,应该收到重放数据val consumer1 = GlobalScope.launch {println("\u001B[34m[消费者1] 开始订阅(收到最近3条重放数据)\u001B[0m")sharedFlow.collect { data ->println("\u001B[34m[消费者1] 收到: $data\u001B[0m")}}delay(300)// 继续发射数据GlobalScope.launch {listOf("数据5", "数据6").forEach { data ->delay(200)println("\u001B[32m[生产者] 发射: $data\u001B[0m")sharedFlow.emit(data)}}delay(600)consumer1.cancel()producer.join()
}

在这里插入图片描述

可以看到,如果设置了重放数量为3:

  • 新订阅者可获取最近3条数据
3. 数据缓冲&溢出策略
配置额外缓冲区 + 默认的策略 SUSPEND
private suspend fun suspendStrategyDemo() {println("\u001B[33m[策略1] SUSPEND - 背压时挂起发射者\u001B[0m")val suspendFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)// SUSPEND消费者val suspendConsumer = GlobalScope.launch {suspendFlow.collect { value ->println("\u001B[34m[SUSPEND消费者] 收到: $value\u001B[0m")delay(500) // 模拟慢消费}}delay(200)// 生产者val suspendProducer = GlobalScope.launch {repeat(5) { i ->println("\u001B[32m[SUSPEND生产者] 准备发射: $i\u001B[0m")suspendFlow.emit(i) // 当缓冲区满时会挂起println("\u001B[32m[SUSPEND生产者] 已发射: $i\u001B[0m")}}delay(2000)suspendProducer.cancel()suspendConsumer.cancel()
}

在这里插入图片描述

配置额外缓冲区,并使用 DROP_OLDEST 策略
private suspend fun dropOldestStrategyDemo() {println("\u001B[33m[策略2] DROP_OLDEST - 丢弃最旧数据\u001B[0m")val dropOldestFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)val emittedDataOldest = mutableListOf<Int>()val receivedDataOldest = mutableListOf<Int>()// 消费者val dropOldestConsumer = GlobalScope.launch {println("\u001B[35m[DROP_OLDEST消费者] 开始订阅并慢速处理\u001B[0m")dropOldestFlow.collect { value ->receivedDataOldest.add(value)println("\u001B[35m[DROP_OLDEST消费者] 收到: $value\u001B[0m")delay(600) // 模拟慢消费,让缓冲区容易满}}// 生产者val dropOldestProducer = GlobalScope.launch {println("\u001B[32m[DROP_OLDEST生产者] 开始快速发射数据\u001B[0m")repeat(6) { i ->delay(100) // 快速发射val value = i + 1emittedDataOldest.add(value)dropOldestFlow.tryEmit(value)println("\u001B[32m[DROP_OLDEST生产者] 发射: $value\u001B[0m")}}delay(3000)dropOldestProducer.cancel()dropOldestConsumer.cancel()// 显示丢弃效果delay(100)println("\u001B[33m[DROP_OLDEST分析] 发射的数据: $emittedDataOldest\u001B[0m")println("\u001B[33m[DROP_OLDEST分析] 接收的数据: $receivedDataOldest\u001B[0m")val droppedOldest = emittedDataOldest - receivedDataOldest.toSet()if (droppedOldest.isNotEmpty()) {println("\u001B[31m[DROP_OLDEST分析] 被丢弃的旧数据: $droppedOldest\u001B[0m")} else {println("\u001B[32m[DROP_OLDEST分析] 没有数据被丢弃\u001B[0m")}
}

在这里插入图片描述

配置额外缓冲区,并使用 DROP_LATEST 策略
private suspend fun dropLatestStrategyDemo() {println("\u001B[33m[策略3] DROP_LATEST - 丢弃最新数据\u001B[0m")val dropLatestFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST)val emittedDataLatest = mutableListOf<Int>()val receivedDataLatest = mutableListOf<Int>()// 消费者val dropLatestConsumer = GlobalScope.launch {println("\u001B[36m[DROP_LATEST消费者] 开始订阅并慢速处理\u001B[0m")dropLatestFlow.collect { value ->delay(800)receivedDataLatest.add(value)println("\u001B[36m[DROP_LATEST消费者] 收到: $value\u001B[0m")}}// 生产者:延迟启动,快速发射数据val dropLatestProducer = GlobalScope.launch {println("\u001B[33m[DROP_LATEST生产者] 开始快速发射数据\u001B[0m")repeat(6) { i ->delay(100) // 快速发射,比消费者处理速度快很多val value = i + 1emittedDataLatest.add(value)dropLatestFlow.tryEmit(value) // tryEmit总是返回true,不能用来判断丢弃println("\u001B[33m[DROP_LATEST生产者] 发射: $value\u001B[0m")}}delay(4000) // 等待演示完成dropLatestProducer.cancel()dropLatestConsumer.cancel()// 显示丢弃效果delay(100)println("\u001B[33m[DROP_LATEST分析] 发射的数据: $emittedDataLatest\u001B[0m")println("\u001B[33m[DROP_LATEST分析] 接收的数据: $receivedDataLatest\u001B[0m")val droppedLatest = emittedDataLatest - receivedDataLatest.toSet()if (droppedLatest.isNotEmpty()) {println("\u001B[31m[DROP_LATEST分析] 被丢弃的新数据: $droppedLatest\u001B[0m")} else {println("\u001B[32m[DROP_LATEST分析] 没有数据被丢弃\u001B[0m")}
}

在这里插入图片描述

sharedIn

如果数据的产生是由自己控制的,那么我们可以直接通过 MutableSharedFlow 来创建一个 SharedFlow
但是如果数据的产生是由一个外部的冷流(Flow)提供,但是我们又想转换为热流使用,那么就可以使用 shareIn 操作符来把一个冷流转换为热流。

public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)val shared = MutableSharedFlow<T>(replay = replay,extraBufferCapacity = config.extraBufferCapacity,onBufferOverflow = config.onBufferOverflow)@Suppress("UNCHECKED_CAST")val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}
参数说明

shareIn 函数有三个参数:

参数类型作用说明
scopeCoroutineScope指定共享流的作用域,决定了热流的生命周期
startedSharingStarted控制共享流的启动策略,有三种选择:
Eagerly: 立即开始收集,即使没有订阅者
Lazily: 第一个订阅者出现时开始收集
WhileSubscribed: 有订阅者时收集,无订阅者时停止
replayInt新订阅者可以获得的历史数据数量,类似 SharedFlowreplay 参数
started 启动策略
策略启动时机停止时机适用场景
Eagerly立即启动作用域结束• 数据必须从一开始就收集
• 数据生产成本较低
• 不关心资源消耗
Lazily首个订阅作用域结束• 数据随第一个订阅开始
• 之后保持收集状态
• 允许错过初始数据
WhileSubscribed首个订阅最后订阅取消• 按需收集数据
• 需要节省资源
• 数据生产成本高
使用示例

scopereplay 参数都很好理解,我们重点看下 started 参数的使用。

1. Eagerly 策略示例
    suspend fun sharedInEagerlyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 Eagerly 策略:冷流转热流后立即开始收集数据\u001B[0m")println("\u001B[32m[Eagerly-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Eagerly-生产者] 开始生产数据(冷流 -> 热流后立即执行)\u001B[0m")var i = 1try {while (true) {val data = "数据 ${i++}"println("\u001B[32m[Eagerly-生产者] 发射:$data\u001B[0m")emit(data)delay(1000)}} finally {println("\u001B[32m[Eagerly-生产者] 数据生产已停止\u001B[0m")}}println("\u001B[32m[Eagerly-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")delay(1000)println("\u001B[32m[Eagerly-生产者] 准备将冷流转换为热流...\u001B[0m")// 2. 转换为热流(使用 Eagerly 策略,会立即开始收集)val sharedFlow = coldFlow.shareIn(scope = scope,started = SharingStarted.Eagerly, // 立即开始收集,即使没有订阅者replay = 0 // 不保留历史数据)println("\u001B[32m[Eagerly-生产者] 已转换为热流,此时会触发冷流开始生产数据\u001B[0m")delay(2000) // 等待一会儿,让我们观察到数据生产情况// 3. 添加第一个订阅者(此时生产者已经在发射数据)val job1 = scope.launch {println("\u001B[34m[Eagerly-订阅者1] 开始订阅(生产者已经在发射数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[34m[Eagerly-订阅者1] 收到:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享之前已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Eagerly-订阅者2] 开始订阅(与订阅者1共享数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[35m[Eagerly-订阅者2] 收到:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 即使取消所有订阅者,Eagerly策略下数据收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产
}

在这里插入图片描述

2. Lazily 策略示例
    suspend fun sharedInLazilyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 Lazily 策略:首个订阅者触发数据收集\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Lazily-生产者] 开始生产数据(首个订阅者触发)\u001B[0m")var i = 1try {while (true) {val data = "数据 ${i++}"println("\u001B[32m[Lazily-生产者] 发射:$data\u001B[0m")emit(data)delay(1000)}} finally {println("\u001B[32m[Lazily-生产者] 数据生产已停止\u001B[0m")}}println("\u001B[32m[Lazily-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备将冷流转换为热流...\u001B[0m")// 2. 转换为热流(使用 Lazily 策略,等待首个订阅者)val sharedFlow = coldFlow.shareIn(scope = scope,started = SharingStarted.Lazily, // 首个订阅者到来时才开始收集replay = 0 // 不保留历史数据)println("\u001B[32m[Lazily-生产者] 已转换为热流,但要等待首个订阅者才开始生产数据\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据// 3. 添加第一个订阅者(此时才开始生产数据)val job1 = scope.launch {println("\u001B[34m[Lazily-订阅者1] 开始订阅(这将触发数据生产)\u001B[0m")sharedFlow.collect { value ->println("\u001B[34m[Lazily-订阅者1] 收到:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Lazily-订阅者2] 开始订阅(与订阅者1共享数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[35m[Lazily-订阅者2] 收到:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 取消所有订阅者后,Lazily策略下数据收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产
}

在这里插入图片描述

3. WhileSubscribed 策略示例
    suspend fun sharedInWhileSubscribedDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 WhileSubscribed 策略:按需收集数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[WhileSubscribed-生产者] 开始生产数据(有订阅者时执行)\u001B[0m")var i = 1try {while (true) {val data = "数据 ${i++}"println("\u001B[32m[WhileSubscribed-生产者] 发射:$data\u001B[0m")emit(data)delay(1000)}} finally {println("\u001B[32m[WhileSubscribed-生产者] 数据生产已停止(无订阅者)\u001B[0m")}}println("\u001B[32m[WhileSubscribed-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备将冷流转换为热流...\u001B[0m")// 2. 转换为热流(使用 WhileSubscribed 策略,仅在有订阅者时收集)val sharedFlow = coldFlow.shareIn(scope = scope,started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 1000 // 所有订阅者取消后等待1秒再停止收集,避免订阅者快速重订阅时重复创建上游Flow),replay = 0 // 不保留历史数据)println("\u001B[32m[WhileSubscribed-生产者] 已转换为热流,但要等待订阅者才开始生产数据\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据// 3. 添加第一个订阅者(此时开始生产数据)val job1 = scope.launch {println("\u001B[34m[WhileSubscribed-订阅者1] 开始订阅(触发数据生产)\u001B[0m")sharedFlow.collect { value ->println("\u001B[34m[WhileSubscribed-订阅者1] 收到:$value\u001B[0m")}}delay(1200)// 4. 添加第二个订阅者(共享数据)val job2 = scope.launch {println("\u001B[35m[WhileSubscribed-订阅者2] 开始订阅(与订阅者1共享数据)\u001B[0m")sharedFlow.collect { value ->println("\u001B[35m[WhileSubscribed-订阅者2] 收到:$value\u001B[0m")}}delay(1200)println("\u001B[33m[说明] 取消所有订阅者后,等待1秒钟,WhileSubscribed策略将停止数据收集\u001B[0m")job1.cancel()job2.cancel()delay(1500) // 等待超过stopTimeoutMillis,此时成产数据会停止println("\u001B[33m[说明] 现在添加新的订阅者,将重新触发数据收集\u001B[0m")val job3 = scope.launch {println("\u001B[36m[WhileSubscribed-订阅者3] 开始订阅(重新触发数据生产)\u001B[0m")sharedFlow.collect { value ->println("\u001B[36m[WhileSubscribed-订阅者3] 收到:$value\u001B[0m")}}delay(2000)job3.cancel()delay(5000) // 再次等待,观察数据生产停止
}

在这里插入图片描述

实际应用场景中的策略选择示例:

  1. 网络请求结果共享

    • 使用 Lazily 策略:首次订阅时才发起请求
    • 设置 replay = 1:新订阅者可获取最新响应
  2. 传感器数据广播

    • 使用 WhileSubscribed 策略:按需采集数据,节省资源
    • 设置 replay = 0:只关注实时数据
  3. 用户操作事件分发

    • 使用 Eagerly 策略:立即开始监听,不错过任何事件
    • 设置 replay = 0:事件型数据无需重放

SharedFlow 核心特性总结

特性说明
热流无论是否有收集者都会发射数据
多播多个收集者共享同一数据源
重放新订阅者可获取历史数据
缓冲可配置缓冲数据容量,当消费端过慢时用于暂存数据
溢出策略配置缓冲区满时的处理策略

StateFlow

在这里插入图片描述

StateFlow 是特殊版本的 SharedFlow,专门用于状态管理。它在 SharedFlow 的基础上增加了状态语义,确保始终有一个当前状态值,并且只有当新值与当前值不同时才会发射。

StateFlow 非常适合用于 UI 状态管理、配置管理等需要状态保持去重的场景。

MutableStateFlow

我们可以通过 MutableStateFlow() 顶层函数来创建 StateFlow,返回值类型是 MutableStateFlow<T>

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {/*** 当前状态流的值。** 设置一个与前一个值[相等][Any.equals]的值不会产生任何效果。** 此属性是**线程安全的**,可以在并发协程中安全更新,无需外部同步。*/public override var value: T/*** 原子性地比较当前[value]与[expect],如果相等则将其设置为[update]。* 如果[value]被设置为[update]则返回`true`,否则返回`false`。** 此函数使用[Any.equals]进行常规比较。如果[expect]和[update]都等于当前[value],* 此函数返回`true`,但实际上不会改变存储在[value]中的引用。** 此方法是**线程安全的**,可以在并发协程中安全调用,无需外部同步。*/public fun compareAndSet(expect: T, update: T): Boolean
}public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

从源码可以看到:

  • MutableStateFlow 构造函数只需要一个参数:初始值
  • 内部实现是 StateFlowImpl

使用示例

// 创建 StateFlow
val stateFlow = MutableStateFlow("初始状态")// 更新状态,直接赋值
stateFlow.value = "新状态"
// 非挂起的发射数据,效果同直接赋值
stateFlow.tryEmit("")
// 或者使用挂起的方式发射数据
stateFlow.emit("新状态")// 获取当前状态
val currentState = stateFlow.value// 订阅状态变化
stateFlow.collect { state ->println("状态变化: $state")
}

整个 api 非常简单,跟 SharedFlow 的 api 一样,只是多了一个 value 属性。

StateFlow 赋值方式对比
赋值方式语法函数类型使用场景特点
直接赋值stateFlow.value = newValue属性访问任何地方都可使用• 同步操作
• 不需要协程环境
• 最常用的方式
tryEmitstateFlow.tryEmit(newValue)普通函数非协程环境• 同步操作
• 返回Boolean表示是否成功
• 对StateFlow总是返回true
emitstateFlow.emit(newValue)挂起函数协程中使用• 异步操作
• 必须在协程中调用
• 与Flow保持一致的API

不管是哪种赋值方式,底层都是走到了 StateFlowImplupdateState 方法中:

// StateFlowImpl 中的核心属性和方法
private val _state = atomic(initialState) // 原子引用,存储当前状态值
private var sequence = 0 // 序列号,用于同步更新操作// value 属性的 getter 和 setter
public override var value: T
get() = NULL.unbox(_state.value)  // 获取当前状态值,NULL.unbox 用于处理 null 值
set(value) {// 设置新值时调用 updateState,expectedState 为 null 表示不进行 CAS 检查updateState(null, value ?: NULL)
}// tryEmit 方法:非挂起函数,直接调用 value setter
override fun tryEmit(value: T): Boolean {this.value = value  // 直接赋值给 value 属性return true         // StateFlow 的 tryEmit 总是返回 true
}// emit 方法:挂起函数,但实际上也是直接调用 value setter
override suspend fun emit(value: T) {this.value = value  // 与 tryEmit 相同,直接赋值
}/*** StateFlow 的核心更新方法* @param expectedState 期望的当前状态(用于 CAS 操作),null 表示不检查* @param newState 新的状态值* @return 是否成功更新*/
private fun updateState(expectedState: Any?, newState: Any): Boolean {var curSequence: Intvar curSlots: Array<StateFlowSlot?>? // 当前的订阅者槽位数组// === 第一阶段:在锁内进行状态检查和更新 ===synchronized(this) {val oldState = _state.value// CAS 支持:如果指定了期望状态但与当前状态不匹配,则更新失败if (expectedState != null && oldState != expectedState) {return false // compareAndSet 操作失败}// 去重检查:如果新值与旧值相同,直接返回 true(不需要通知订阅者)if (oldState == newState) {return true // 值没有变化,但 CAS 操作成功}// 更新状态值_state.value = newState// 获取当前序列号curSequence = sequence// 序列号管理:偶数表示静止状态,奇数表示正在更新if (curSequence and 1 == 0) {// 当前是静止状态(偶数),开始新的更新curSequence++           // 变为奇数,表示开始更新sequence = curSequence} else {// 已经有更新在进行中,通知正在进行的更新并返回sequence = curSequence + 2  // 增加2保持奇数,通知有新的更新return true // 更新已提交,由正在进行的更新负责通知}// 在锁内读取当前的订阅者槽位curSlots = slots}// === 第二阶段:在锁外通知所有订阅者 ===/** 在锁外触发值更新,避免与无限制协程产生死锁。* 循环直到完成所有更改的触发。这是一种简单的扁平化合并,* 确保并发更新的顺序触发,避免多线程并发更新时订阅者恢复的风暴。*/while (true) {// 通知所有当前的订阅者(可能存在良性竞争)curSlots?.forEach { slot ->slot?.makePending()  // 标记槽位为待处理状态,触发订阅者}// 检查在通知过程中是否有新的更新synchronized(this) {if (sequence == curSequence) {// 序列号没有变化,说明没有新的更新,完成当前更新sequence = curSequence + 1  // 变为偶数,表示回到静止状态return true // 更新完成}// 有新的更新,重新读取序列号和槽位,继续下一轮通知curSequence = sequencecurSlots = slots}}
}
updateState 方法核心逻辑解析
  1. 线程安全:使用 synchronized 确保状态更新的原子性
  2. CAS 支持:支持 Compare-And-Set 操作,用于 compareAndSet 方法
  3. 去重机制:自动过滤相同值,避免不必要的通知
  4. 序列号管理
    • 偶数序列号:StateFlow 处于静止状态
    • 奇数序列号:StateFlow 正在进行更新操作
  5. 扁平化合并:多个并发更新会被合并,避免订阅者通知风暴
  6. 锁外通知:在锁外通知订阅者,避免死锁

特性示例

始终保持最新的状态值且订阅者立即获取当前值
suspend fun stateFlowBasicDemo() {// 创建StateFlow,初始值为"初始状态"val stateFlow = MutableStateFlow("初始状态")println("\u001B[33m[说明] StateFlow始终保持当前状态,新订阅者立即获取当前值\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 订阅者1:立即订阅val subscriber1 = GlobalScope.launch {println("\u001B[34m[订阅者1] 开始订阅StateFlow\u001B[0m")stateFlow.collect { state ->println("\u001B[34m[订阅者1] 收到状态: $state\u001B[0m")}}delay(300)// 更新状态println("\u001B[32m[状态更新] 更新为: 状态A\u001B[0m")stateFlow.value = "状态 A"delay(300)// 订阅者2:晚加入,但能立即获取当前状态val subscriber2 = GlobalScope.launch {println("\u001B[35m[订阅者2] 晚加入,立即获取当前状态\u001B[0m")stateFlow.collect { state ->println("\u001B[35m[订阅者2] 收到状态: $state\u001B[0m")}}delay(300)// 继续更新状态println("\u001B[32m[状态更新] 更新为: 状态B\u001B[0m")stateFlow.tryEmit("状态B")delay(300)println("\u001B[32m[状态更新] 更新为: 最终状态\u001B[0m")stateFlow.emit("最终状态")delay(500)subscriber1.cancel()subscriber2.cancel()
}

在这里插入图片描述

具备去重特性
suspend fun stateFlowDistinctDemo() {val stateFlow = MutableStateFlow(0)println("\u001B[33m[说明] StateFlow会自动过滤相同的值,只有状态真正改变时才会通知订阅者\u001B[0m")// 订阅者val subscriber = GlobalScope.launch {println("\u001B[34m[订阅者] 开始监听状态变化\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[订阅者] 状态变化: $value\u001B[0m")}}delay(200)// 更新相同的值println("\u001B[32m[更新] 设置为0(相同值,不会触发)\u001B[0m")stateFlow.value = 0delay(200)println("\u001B[32m[更新] 设置为1(新值,会触发)\u001B[0m")stateFlow.value = 1delay(200)println("\u001B[32m[更新] 再次设置为1(相同值,不会触发)\u001B[0m")stateFlow.value = 1delay(200)println("\u001B[32m[更新] 设置为2(新值,会触发)\u001B[0m")stateFlow.value = 2delay(200)println("\u001B[32m[更新] 设置为2(相同值,不会触发)\u001B[0m")stateFlow.value = 2delay(500)subscriber.cancel()
}

在这里插入图片描述

stateIn

同样的,我们可以通过 stateIn 操作符来把一个冷流转换为状态流。
stateInshareIn 类似,但专门用于状态管理场景,它会自动处理状态的去重和当前值保持。

public fun <T> Flow<T>.stateIn(scope: CoroutineScope,started: SharingStarted,initialValue: T
): StateFlow<T> {val config = configureSharing(1)val state = MutableStateFlow(initialValue)val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)return ReadonlyStateFlow(state, job)
}
参数说明

stateIn 函数有三个参数:

参数类型作用说明
scopeCoroutineScope指定状态流的作用域,决定了状态流的生命周期
startedSharingStarted控制状态流的启动策略,有三种选择:
Eagerly: 立即开始收集,即使没有订阅者
Lazily: 第一个订阅者出现时开始收集
WhileSubscribed: 有订阅者时收集,无订阅者时停止
initialValueT状态流的初始值,这是 StateFlow 必须的参数,确保始终有一个当前状态
使用示例

熟悉了 sharedIn 的参数后,stateIn 就更好理解了,started 的作用都是一致的,无非就是需要一个初始值

基本使用方式:

1. Eagerly 策略示例
suspend fun stateInEagerlyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 stateIn Eagerly 策略:冷流转状态流后立即开始收集数据\u001B[0m")println("\u001B[32m[Eagerly-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Eagerly-生产者] 开始生产状态数据(冷流 -> 状态流后立即执行)\u001B[0m")var i = 1try {while (true) {val state = "状态 ${i++}"println("\u001B[32m[Eagerly-生产者] 发射状态:$state\u001B[0m")emit(state)delay(1000)}} finally {println("\u001B[32m[Eagerly-生产者] 状态生产已停止\u001B[0m")}}println("\u001B[32m[Eagerly-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")delay(1000) // 等待一会儿,证明冷流确实没有开始生产println("\u001B[32m[Eagerly-生产者] 准备将冷流转换为状态流...\u001B[0m")// 2. 转换为状态流(使用 Eagerly 策略,会立即开始收集)val stateFlow = coldFlow.stateIn(scope = scope,started = SharingStarted.Eagerly, // 立即开始收集,即使没有订阅者initialValue = "初始状态" // StateFlow 必须有初始值)println("\u001B[32m[Eagerly-生产者] 已转换为状态流,此时已经开始生产数据\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")delay(2000)println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 3. 添加第一个订阅者(此时生产者已经在发射数据)val job1 = scope.launch {println("\u001B[34m[Eagerly-订阅者1] 开始订阅(生产者已经在发射数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[Eagerly-订阅者1] 收到状态:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享之前已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Eagerly-订阅者2] 开始订阅(与订阅者1共享数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[35m[Eagerly-订阅者2] 收到状态:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 即使取消所有订阅者,Eagerly策略下状态收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")
}

在这里插入图片描述

2. Lazily 策略示例
    suspend fun stateInLazilyDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 stateIn Lazily 策略:首个订阅者触发状态收集\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[Lazily-生产者] 开始生产状态数据(首个订阅者触发)\u001B[0m")var i = 1try {while (true) {val state = "状态 ${i++}"println("\u001B[32m[Lazily-生产者] 发射状态:$state\u001B[0m")emit(state)delay(1000)}} finally {println("\u001B[32m[Lazily-生产者] 状态生产已停止\u001B[0m")}}println("\u001B[32m[Lazily-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[Lazily-生产者] 准备将冷流转换为状态流...\u001B[0m")// 2. 转换为状态流(使用 Lazily 策略,等待首个订阅者)val stateFlow = coldFlow.stateIn(scope = scope,started = SharingStarted.Lazily, // 首个订阅者到来时才开始收集initialValue = "初始状态" // StateFlow 必须有初始值)println("\u001B[32m[Lazily-生产者] 已转换为状态流,但要等待首个订阅者才开始生产数据\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 3. 添加第一个订阅者(此时才开始生产数据)val job1 = scope.launch {println("\u001B[34m[Lazily-订阅者1] 开始订阅(这将触发状态生产,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[Lazily-订阅者1] 收到状态:$value\u001B[0m")}}delay(2000)// 4. 添加第二个订阅者(共享已经在发射的数据)val job2 = scope.launch {println("\u001B[35m[Lazily-订阅者2] 开始订阅(与订阅者1共享数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[35m[Lazily-订阅者2] 收到状态:$value\u001B[0m")}}delay(2000)println("\u001B[33m[说明] 取消所有订阅者后,Lazily策略下状态收集仍会继续\u001B[0m")job1.cancel()job2.cancel()delay(2000) // 等待一会儿,观察数据是否继续生产println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")
}

在这里插入图片描述

3. WhileSubscribed 策略示例
    suspend fun stateInWhileSubscribedDemo() {val scope = CoroutineScope(Dispatchers.Default)println("\u001B[33m[说明] 演示 stateIn WhileSubscribed 策略:按需收集状态数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备创建冷流...\u001B[0m")// 1. 创建冷流(此时不会开始生产数据)val coldFlow = flow {println("\u001B[32m[WhileSubscribed-生产者] 开始生产状态数据(有订阅者时执行)\u001B[0m")var i = 1try {while (true) {val state = "状态 ${i++}"println("\u001B[32m[WhileSubscribed-生产者] 发射状态:$state\u001B[0m")emit(state)delay(1000)}} finally {println("\u001B[32m[WhileSubscribed-生产者] 状态生产已停止(无订阅者)\u001B[0m")}}println("\u001B[32m[WhileSubscribed-生产者] 冷流已创建,但尚未开始生产数据\u001B[0m")println("\u001B[32m[WhileSubscribed-生产者] 准备将冷流转换为状态流...\u001B[0m")// 2. 转换为状态流(使用 WhileSubscribed 策略,仅在有订阅者时收集)val stateFlow = coldFlow.stateIn(scope = scope,started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 1000 // 所有订阅者取消后等待1秒再停止收集,避免订阅者快速重订阅时重复创建上游Flow),initialValue = "初始状态" // StateFlow 必须有初始值)println("\u001B[32m[WhileSubscribed-生产者] 已转换为状态流,但要等待订阅者才开始生产数据\u001B[0m")println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")delay(2000) // 等待一会儿,证明确实没有开始生产数据println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")// 3. 添加第一个订阅者(此时开始生产数据)val job1 = scope.launch {println("\u001B[34m[WhileSubscribed-订阅者1] 开始订阅(触发状态生产,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[34m[WhileSubscribed-订阅者1] 收到状态:$value\u001B[0m")}}delay(1200)// 4. 添加第二个订阅者(共享数据)val job2 = scope.launch {println("\u001B[35m[WhileSubscribed-订阅者2] 开始订阅(与订阅者1共享数据,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[35m[WhileSubscribed-订阅者2] 收到状态:$value\u001B[0m")}}delay(1200)println("\u001B[33m[说明] 取消所有订阅者后,等待1秒钟,WhileSubscribed策略将停止状态收集\u001B[0m")job1.cancel()job2.cancel()delay(1500) // 等待超过stopTimeoutMillis,此时生产数据会停止println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")println("\u001B[33m[说明] 现在添加新的订阅者,将重新触发状态收集\u001B[0m")val job3 = scope.launch {println("\u001B[36m[WhileSubscribed-订阅者3] 开始订阅(重新触发状态生产,立即获取当前状态)\u001B[0m")stateFlow.collect { value ->println("\u001B[36m[WhileSubscribed-订阅者3] 收到状态:$value\u001B[0m")}}delay(2000)job3.cancel()delay(2000) // 再次等待,观察数据生产停止println("\u001B[33m[当前状态] ${stateFlow.value}\u001B[0m")
}

在这里插入图片描述

运行效果分析:

实际应用场景中的策略选择示例:

1. 用户配置状态管理

  • 使用 Lazily 策略:首次访问时才加载配置
  • 设置合适的 initialValue:提供默认配置

2. 网络连接状态监控

  • 使用 WhileSubscribed 策略:按需监控,节省资源
  • 设置 initialValue = false:默认为未连接状态

3. 实时数据仪表盘

  • 使用 Eagerly 策略:立即开始收集,确保数据实时性
  • 设置合适的初始值:提供占位数据

StateFlow 核心特性总结

特性说明
状态保持始终保持一个当前状态值,新订阅者立即收到当前状态
自动去重只有当新值与当前值不同时才发射,避免不必要的更新
同步访问通过 value 属性可以同步获取当前状态,无需异步操作
初始值必需创建时必须提供初始值,确保始终有有效状态
固定重放自动重放当前状态给新订阅者(相当于 replay = 1

总结

SharedFlowStateFlow 都是热流,其中 StateFlow 可以看做是定制版本的 SharedFlow

StateFlow 与 SharedFlow 的核心区别

一个简单的代码示例看下 StateFlowSharedFlow 的区别:

suspend fun stateFlowVsSharedFlowDemo() {// StateFlow:状态流,需要初始值val stateFlow = MutableStateFlow("初始状态")// SharedFlow:事件流,不配置重放数量,默认为0val sharedFlow = MutableSharedFlow<String>()println("\u001B[32mStateFlow设置: 状态A\u001B[0m")stateFlow.value = "状态A"println("\u001B[32mSharedFlow发射: 事件A\u001B[0m")sharedFlow.emit("事件A")delay(500)val stateSubscriber = GlobalScope.launch {println("\u001B[34mStateFlow订阅者启动\u001B[0m")stateFlow.collect { state ->println("\u001B[34m  StateFlow收到: $state\u001B[0m")}}delay(200)val sharedSubscriber = GlobalScope.launch {println("\u001B[35mSharedFlow订阅者启动\u001B[0m")sharedFlow.collect { event ->println("\u001B[35m  SharedFlow收到: $event\u001B[0m")}}delay(800)println("\u001B[33m→ 结果: StateFlow立即收到当前状态,SharedFlow错过历史事件\u001B[0m\n")println("\u001B[32mStateFlow更新: 状态B\u001B[0m")stateFlow.value = "状态B"println("\u001B[32mSharedFlow发射: 事件B\u001B[0m")sharedFlow.emit("事件B")delay(500)println("\u001B[33m→ 结果: 两个订阅者都收到新数据\u001B[0m\n")println("\u001B[32mStateFlow重复设置: 状态B\u001B[0m")stateFlow.value = "状态B"delay(300)println("\u001B[32mSharedFlow重复发射: 事件B\u001B[0m")sharedFlow.emit("事件B")delay(500)println("\u001B[33m→ 结果: StateFlow被过滤无通知,SharedFlow正常通知\u001B[0m\n")println("\u001B[32mStateFlow最终状态: 最终状态\u001B[0m")stateFlow.value = "最终状态"println("\u001B[32mSharedFlow最终事件: 最终事件\u001B[0m")sharedFlow.emit("最终事件")delay(500)println("\u001B[33m→ 结果: 两者都正常发送数据\u001B[0m")delay(1000)stateSubscriber.cancel()sharedSubscriber.cancel()
}

在这里插入图片描述

SharedFlowStateFlow
用途事件流,用于事件广播状态流,用于状态管理
初始值无初始值,可以为空必须有初始值
当前值访问无法同步获取当前值可通过 value 属性同步获取当前状态
去重机制不去重,所有发射的数据都会传递自动去重,只有值改变时才发射
重放行为可配置重放数量固定重放最新1个值(当前状态)
缓冲策略可配置缓冲和溢出策略不可配置

好了, 本篇文章就是这些,希望能帮到你。

感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客

http://www.dtcms.com/a/393603.html

相关文章:

  • python爬虫(请求+解析+案例)
  • 111-Christopher-Dall_Arm-Timers-and-Fire:Arm架构计时器与半虚拟化时间
  • switch缺少break出现bug
  • 【自然语言处理】(3) --RNN循环神经网络
  • C# 中的 ReferenceEquals 方法
  • BERT:用于语言理解的深度双向Transformer预训练【简单分析】
  • 力扣hot100:两数相加(模拟竖式加法详解)(2)
  • Zotero + Word 插件管理参考文献的引用
  • 用Python一键整理文件:自动分类DOCX与PDF,告别文件夹杂乱
  • Ubuntu部署Elasticsearch教程
  • 61.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--新增功能--提取金额
  • 一款基于 .NET 开源、免费、命令行式的哔哩哔哩视频内容下载工具
  • Win Semi宣布推出线性优化的GaN工艺
  • 考研408计算机网络2025年第38题真题解析
  • C++编写的经典贪吃蛇游戏
  • 风险预测模型原理
  • PS练习5:利用翻转制作图像倒影
  • 平替Jenkins,推荐一款国产开源免费的CICD工具 - Arbess
  • aws 实战小bug
  • NumPy 系列(一):numpy 数组基础
  • VSCode 的 launch.json 配置
  • OpenLayers地图交互 -- 章节六:范围交互详解
  • 分布式专题——15 ZooKeeper特性与节点数据类型详解
  • 分布式专题——16 ZooKeeper经典应用场景实战(上)
  • Torch-Rechub学习笔记-task2
  • Hadoop分布式计算平台
  • hive调优系列-1.调优须知
  • 爆炸特效:Unity+Blender-01
  • 解决切换 Node 版本后 “pnpm 不是内部或外部命令”问题
  • flag使用错误出现bug