当前位置: 首页 > news >正文

Flow 笔记

转:https://juejin.cn/post/7476850354796380197

Flow.launchIn()

Flow 有一个拓展函数 Flow.launchIn(),它的原理很简单,就是利用给定的 CoroutineScope 开启一个新协程,然后在这个新协程里面调用 Flow.collect()

Collect.kt public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch { collect()
}

生产出来的数据可能会流经各个中间操作符,collect 只是数据的最后一站。launchIn() 一般是配合中间操作符 onEach() 一起使用的:

this.launch {flow.onEach { println(it) }.collect()}// 等同于上面的写法
flow
.onEach { println(it) }
.launchIn(this)

Flow.shareIn()

Flow 有一个拓展函数 Flow.shareIn() 用于将一个 Flow 转换为 SharedFlow:

// Share.kt
fun <T> Flow<T>.shareIn(scope: CoroutineScope, // [Where] 指定在哪开启新协程调用 Flow.collect()started: SharingStarted, // [When] 指定什么时候调用 Flow.collect()replay: Int = 0
): SharedFlow<T>

SharingStarted.Eagerly,Eagerly 是 "急切地、急迫地" 的意思,它会立刻调用上游 Flow 的 collect() 启动其生产流程,并将收集到的每个数据转发给所有收集者。

SharingStarted.Lazily,Lazy是"懒"的意思,不会立马开始启动上游 Flow 的生产流程,直到第一次调用 SharedFlow.collect()

SharingStarted.WhileSubscribed(),当下游的所有订阅全都结束之后,它会把上游的所有的生产过程也结束掉,而且这时候如果再来新的订阅,它就会重新启动上游的数据流。

SharedFlow 官方文档里面写道:An active collector of a shared flow is calle a subscriber.

意思是普通 Flow 的 collector 称为(数据)收集者,但是 SharedFlow 的 collector 可以被视为订阅者。

SharedFlow 是一种 Hot Flow(热流),而普通的 Flow 是一种 Cold Flow(冷流)。

何谓冷流🥶和热流🥵呢?冷热的概念来源于 Flow 的特性,它们描述了流的行为方式和数据的发出时机。

冷流指的是每次收集(collect)流时,数据生产线会从头开始执行、发出数据。称其为冷流是因为它的行为像是在冷藏的状态下,只有在“激活”它(即调用 collect)时,才开始从头运行并发出数据。

热流指的是流的数据是实时生成并广播给所有观察者的。也就是说,热流的生产者在流创建后就会生产并发射数据,而不管有没有人去收集它,热流的数据生产和消费是独立的。称其为热流是因为它的行为像是一个实时的广播,总是活跃的,像是一个正在加热的设备,不会因是否有人“收听”而改变其状态。

SharedFlow 的 buffer 配合 replay 起到了缓存的作用。其实 buffer 还有另一个作用:在下游来不及消费时缓冲上游的数据。

在生产者-消费者模型中,当生产速度超过消费速度时,系统需要采取措施加以控制,通常的做法是暂停生产或丢弃来不及消费的数据。这种现象在 SharedFlow 中同样存在:当 Subscriber 的处理效率较低时,Buffer 中的数据若无法及时处理,缓冲区总有一刻将被填满。默认情况下,buffer 被填满后数据发射操作会被挂起(onBufferOverflow = SUSPEND)。

suspend fun main(): Unit = coroutineScope { // this: CoroutineScopeflow<Int> {val start = System.currentTimeMillis()(1..100).forEach {emit(it)println("emit $it finished at " +(System.currentTimeMillis() - start).toDuration(DurationUnit.MILLISECONDS).toString())}}.shareIn(scope = this,replay = 1,started = SharingStarted.Eagerly,).collect {delay(3000)}
}

上面的代码里,生产每条数据几乎不耗时,处理每条数据则要花费 3 秒。

  1. Emmiter 发送 1,进入 buffer 被缓存,同时发给 Subscriber 处理(耗时 3 秒);
  2. Emmiter 发送 2,替换掉 buffer 中的 1,替换掉是因为 replay = 1,只需保留最新的 1 条数据;
  3. Emmiter 发送 3,进入 buffer 被缓冲,它不能替换掉 2,因为 2 还没有被现有的所有 Subscriber 消费,还得留着 2,等 Subscriber 处理完 1 了,把 2 给它处理;
  4. ...
  5. Emmiter 发送 66,想要进入 buffer,此时 buffer 已经满了,于是生产流程只能停止(挂起);
  6. 直到第 3 秒,数据 1 被处理完毕,Subscriber 开始处理 buffer 中的 2,此时 2 被现有的所有 Subscriber 消费了,它不需要被缓冲了,同时,因为 replay = 1,2 不是最新那条数据,也没必要留着发送给未来的新订阅者。是时候将 2 逐出 buffer 了,腾出位置让门外的 66 进来。
  7. 生产流程得以继续,Emitter 发送 67,想要进入 buffer,此时 buffer 又已经满了,于是生产流程又只能停止(挂起)了......

使用 SharingStarted.WhileSubscribed(),还可以配置上游 Flow 结束和重启的规则。当下游的所有订阅全都结束之后,它会把上游的所有的生产过程也结束掉,而且这时候如果再来新的订阅,它就会重新启动上游的数据流。

suspend fun main(): Unit = coroutineScope { // this: CoroutineScopeval flow = flow {println("start emit")(1..3).forEach {delay(1000)emit(it)}}val sharedFlow = flow.shareIn(scope = this,started = SharingStarted.WhileSubscribed(),replay = 1,)val job = launch {println("collect1 start")sharedFlow.collectIndexed { index, value ->println("collect1: $value")if (index == 1) cancel() // 收集完第二个数据后停止收集}}job.join()println("collect1 finished")delay(1000)launch {println("collect2 start")sharedFlow.collect {println("collect2: $it")}}
}

第一次收集的时候,收集了两个数据就结束收集了,此时唯的一 collect 已经结束,所以上游的 Flow 生产流程也结束了,随后再次调用 SharedFlow.collect(),上游 Flow 生产流程再次被启动了,所以打印了两次 "start emit".

请注意,第二次调用 SharedFlow.collect() 时,上游的生产还没被重启,就先打印了一条数据(2),这是因为参数 replay = 1,那是 SharedFlow 缓存下来的数据。

WhileSubscribed() 有两个参数,它们都是用于配置延时

stopTimeoutMillis 用于配置结束的延时。具体来说,如果 SharedFlow 的所有 collect 都结束了,此时先不结束上游 Flow 的生产流程,从现在开始计时,如果计时过程中,有新的 SharedFlow.collect() 被调用,那么就不取消上游 Flow 的生产流程了。当然,如果计时结束了仍没有新的 SharedFlow.collect() 被调用,那么就会正式结束上游 Flow 的生产。

suspend fun main(): Unit = coroutineScope { // this: CoroutineScopeval flow = flow {println("start emit")(1..3).forEach {delay(1000)emit(it)}}val sharedFlow = flow.shareIn(scope = this,started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 1000,),replay = 1,)val job = launch {println("collect1 start")sharedFlow.collectIndexed { index, value ->println("collect1: $value")if (index == 1) cancel()}}job.join()println("collect1 finished")delay(500)launch {println("collect2 start")sharedFlow.collect {println("collect2: $it")}}
}

"start emit" 只打印了一次,可见上游 Flow 确实没有重启。

第二个参数, replayExpirationMillis,

它设置的是缓存的失效时间。在上游 Flow 结束了,并且设置的 replayExpirationMillis 也超时之后(期间没有新的 SharedFlow.collect() 调用),SharedFlow 缓存下来的那些最近数据就会被清空。

replayExpirationMillis 的默认值是无限大,也就是永远不会在上游 Flow 结束后丢弃缓存的最近数据,如果设置成 0,它就是在上游 Flow 结束的一瞬间就清空缓存的最近数据。

suspend fun main(): Unit = coroutineScope { // this: CoroutineScopeval flow = flow {println("start emit")(1..3).forEach {delay(1000)emit(it)}}val sharedFlow = flow.shareIn(scope = this,started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 0,replayExpirationMillis = 250,),replay = 1,)val job = launch {println("collect1 start")sharedFlow.collectIndexed { index, value ->println("collect1: $value")if (index == 1) cancel()}}job.join()println("collect1 finished")delay(500)launch {println("collect2 start")sharedFlow.collect {println("collect2: $it")}}
}

结果:

collect1 start
start emit
collect1: 1
collect1: 2
collect1 finished
collect2 start
collect2: 2     👈这句没输出
start emit
collect2: 1
collect2: 2
collect2: 3

stopTimeoutMillis 和 replayExpirationMillis 两个参数都是配置延时,一个是对于结束 Flow 的延时,一个是 Flow 结束之后,清空缓存数据的延时。

如果程序里有一个可以被订阅的事件流,该事件流会在多个地方被订阅,同时这个事件流的生产流程非常消耗资源,需要在所有订阅都结束的时候,及时的结束 Flow 的生产,这种场景下就非常适合用 WhileSubscribed() 来配置这种自动结束自动重启的 SharedFlow 了。

我们发现 MutableSharedFlow 是 SharedFlow 的子接口,同时它也是 FlowCollector 的子接口:

// SharedFlow.kt    
public interface MutableSharedFlow<T>: SharedFlow<T>, FlowCollector<T> {override suspend fun emit(value: T) // 📌public fun tryEmit(value: T): Booleanpublic val subscriptionCount: StateFlow<Int>public fun resetReplayCache()
}

它重写了 emit 方法

有什么不同

FlowCollector 的 emit 方法是 Flow 数据传递的基础,无法自主发射新事件,无法合并多个数据源,而 SharedFlow 的 emit 方法则是为了支持多个订阅者之间的数据共享和广播

public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)val shared = MutableSharedFlow<T>(replay = replay,extraBufferCapacity = config.extraBufferCapacity,onBufferOverflow = config.onBufferOverflow)@Suppress("UNCHECKED_CAST")val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}

发现了么,返回的是 ReadonlySharedFlow, shareIn() 生成的 SharedFlow 只能被动转发原始 Flow 的数据

MutableSharedFlow 继承 FlowCollector 是为了让它具备接收和处理元素的能力。

1. 遵循 Flow 框架的统一设计理念

在 Kotlin 的协程 Flow 框架里,FlowCollector 是一个核心接口,其定义了如何收集(也就是接收和处理)流中的元素。

MutableSharedFlow 继承 FlowCollector 能让它遵循 Flow 框架的统一设计,成为一个标准的流收集器。这意味着 MutableSharedFlow 可以和其他 Flow 操作符、构建器以及相关的工具无缝集成。

2. 便于作为收集器使用

由于 MutableSharedFlow 继承了 FlowCollector,所以它能够直接当作收集器传递给其他 Flow 的操作。例如,你可以把 MutableSharedFlow 实例传递给 Flow 的 collect 方法,从而把一个 Flow 中的元素收集到 MutableSharedFlow 里,示例代码如下:

fun main() = runBlocking {val mutableSharedFlow = MutableSharedFlow<Int>()val flow = flow {for (i in 1..5) {emit(i)delay(100)}}// 将 MutableSharedFlow 作为收集器使用flow.collect(mutableSharedFlow)
}

3. 解耦发射逻辑

尽管 SharedFlow 里有 emit 方法,但 MutableSharedFlow 继承 FlowCollector 能把发射逻辑进一步抽象和解耦。FlowCollector 接口只定义了 emit 操作,而 MutableSharedFlow 实现了这个接口,这样就把发射逻辑封装在 MutableSharedFlow 内部,使得代码的结构更加清晰,也更易于维护和扩展。

4. 提高代码的灵活性和可复用性

继承 FlowCollector 能让 MutableSharedFlow 在不同的场景下被复用。无论是在简单的数据流处理中,还是在复杂的异步编程架构里,MutableSharedFlow 都可以作为一个通用的收集器来使用,从而提高了代码的灵活性和可复用性。

综上所述,MutableSharedFlow 继承 FlowCollector 是为了更好地融入 Flow 框架,实现统一的设计理念,方便作为收集器使用,解耦发射逻辑,以及提高代码的灵活性和可复用性。

replay 的侧重点是“缓存重放”,如果你不想要重放,只想提升 Emitter 吞吐量,那么就应该使用 extraBufferCapacity

asSharedFlow

MutableSharedFlow 提供了一个 asSharedFlow() 扩展函数,它可以将 MutableSharedFlow 转换为 SharedFlow。当你希望将 MutableSharedFlow 对外暴露供订阅使用,但又不希望外部有权限发送数据时,就可以利用该函数创建一个只读的 SharedFlow:

class MyViewModel: ViewModel() {private val _event = MutableSharedFlow()val event: SharedFlow = _event.asSharedFlow()
}

相关文章:

  • 什么是cnas认证?认证cnas证书有什么好处?
  • 《基于神经网络实现手写数字分类》
  • 用最简单的方式了解单例模式
  • PDF多功能转换编辑及扫描仪 iLovePDF 3.10.0
  • 【verilog】多个 if 控制同一个变量 是否不是标准的语言规范,标准的语言规范应该如何写?
  • 在时间的缝隙里雕刻故事:龙臻的影像语言与全球认可
  • opencv图片颜色识别
  • 【linux模块化编译驱动Makefile异常引发的惨案】
  • 方案精读:华为数字化转型实践案例合集【附全文阅读】
  • 关于Diamond机械手的运动学与动力学的推导
  • Java 开发玩转 MCP:从 Claude 自动化到 Spring AI Alibaba 生态整合
  • list的使用
  • 基于C++(MFC)实现的文件管理系统
  • Visual Studio C++引入第三方库
  • Spring HTTP Interface 入门案例介绍
  • sentinel安装部署及测试--实践
  • 在Vmware15(虚拟机免费) 中安装纯净win10详细过程
  • RK3588 实现音视频对讲
  • Oracle 12.1.0.2补丁安装全流程
  • 如何使用3DMAX插件PFSpliner将3D对象转化为艺术样条线?
  • 网站建设教程视频/百度产品推广怎么收费
  • 企业官方网站格式/全国病毒感染最新消息
  • 公司网站开发费用计入什么科目/电脑优化大师下载安装
  • 关于我们网站设计/营销策划公司名字
  • 兰州新站点seo加盟/线上推广平台有哪些
  • 爱搜索中级网站建设/大连网络推广