当前位置: 首页 > news >正文

Kotlin 协程之 Flow 的理解使用及源码解析

在这里插入图片描述

前言

在前面的文章中,我们已经讨论了 Channel 的概念和基本使用以及 Channel 的高阶应用。这篇我们来看日常开发中更常用的Flow


“冷流” 和 “热流” 的本质

先来梳理一下所谓的 “冷流” 和 “热流”。

核心概念

我们已经知道 Channel 是 “热流”,而用 flow{} 构建器创建的 Flow 是 “冷流”(Flow 也有热流形式,如 SharedFlow
StateFlow,这个后面文章会详细介绍)。

所谓的热和冷,本质上是指"数据生产"和"数据消费"是否解耦为两套独立逻辑。

  • 热流:不依赖消费端,提前生产数据,即使没有消费者也会持续工作

  • 冷流:当有消费端触发消费事件时,才开始生产数据,懒加载的思想

对比示例

让我们通过代码来直观感受一下:

Channel(热流)示例

suspend fun channelHotExample() {val channel = Channel<String>(Channel.BUFFERED)// 生产者开始工作,不管有没有消费者val producer = GlobalScope.launch {repeat(3) { i ->delay(1000)channel.send("热流数据-$i")println("\u001B[32m[Channel生产者] 数据已发送: 热流数据-$i\u001B[0m")}channel.close()}// 等待2秒后才开始消费delay(2500)println("\u001B[34m[Channel消费者] 2秒后开始消费...\u001B[0m")for (data in channel) {println("\u001B[34m[Channel消费者] 收到: $data\u001B[0m")}producer.join()
}

在这里插入图片描述

Flow(冷流)示例

Flow 的创建和收集很简单:用 flow {} 创建,用 collect 收集。

suspend fun flowColdExample() {// 定义Flow,但此时还没有开始生产数据val flow = flow {repeat(3) { i ->delay(1000)println("\u001B[32m[Flow生产者] 数据开始发送: 冷流数据-$i\u001B[0m")emit("冷流数据-$i")println("\u001B[32m[Flow生产者] 数据发送完毕: 冷流数据-$i\u001B[0m")println()}}println("\u001B[36mFlow已定义,但还没有开始生产数据\u001B[0m")println()delay(2000)println("\u001B[34m[Flow消费者] 开始收集,此时才开始生产数据...\u001B[0m")flow.collect { data ->println("\u001B[34m[Flow消费者] 收到: $data\u001B[0m")// 模拟处理逻辑delay(500)println("\u001B[34m[Flow消费者] 数据处理完毕: $data...\u001B[0m")}
}

热流 vs 冷流对比

特性热流冷流
数据生产立即开始,不管有没有消费者只有在被收集时才开始生产
数据共享多个消费者共享同一份数据每个收集器都有独立的数据流
资源消耗持续消耗资源按需消耗资源
背压处理通过缓冲区和挂起机制天然支持背压,生产速度跟随消费
生命周期独立于消费者与收集器生命周期绑定
内存使用需要缓冲区存储数据按需生产,内存友好

Flow 的基本使用

Flow 的创建有多种方式,不同方式背后的实现原理和适用场景也不同。

flow 构建器

flow{} 构建器是最常用的也是很重要的一种方式。后面提到的 flowOfasFlow、以及 channelFlow,本质上都是对 flow{}
的封装或扩展。

使用示例

先来看个最简单的例子:

suspend fun basicFlowBuilder() {val numberFlow = flow {repeat(5) { i ->delay(500)emit(i) // 发射数据}}numberFlow.collect { value ->println("[消费者] 收到: $value")}
}

使用起来非常简单,用 flow{} 创建,用 collect 收集即可。

flowOf

当你有一组已知的静态数据需要转成 Flow,flowOf 最方便:

suspend fun flowOfExample() {val staticFlow = flowOf("Apple", "Banana", "Cherry")staticFlow.collect { fruit ->println("[消费者] 水果: $fruit")}}

源码

// Flow.kt
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {for (element in elements) {emit(element)}
}

可以看到,flowOf 本质就是对 flow 的封装:遍历所有元素并逐个 emit

asFlow 扩展函数

将各类数据结构直接转换为 Flow

在这里插入图片描述

示例:

suspend fun asFlowExample() {
// 集合转Flowval listFlow = listOf(1, 2, 3, 4, 5).asFlow()listFlow.collect { number ->println("\u001B[34m[消费者] 数字: $number\u001B[0m")}// 区间转Flowval rangeFlow = (1..3).asFlow()rangeFlow.collect { value ->println("\u001B[34m[消费者] 区间值: $value\u001B[0m")}// 数组转Flowval arrayFlow = arrayOf("A", "B", "C").asFlow()arrayFlow.collect { letter ->println("\u001B[34m[消费者] 字母: $letter\u001B[0m")}}

在这里插入图片描述

源码


// Iterable -> Flow
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {forEach { value ->emit(value)}
}// Array -> Flow
public fun <T> Array<T>.asFlow(): Flow<T> = flow {forEach { value ->emit(value)}
}

核心特点:同步转换、按需发射、内存友好。


Flow 源码分析

Flow 内部的实现还是很有意思的, 我们就基于上述示例代码结合源码进行分析,来看一看 flow 内部的执行流程。

1. 创建阶段

例如:


val numberFlow = flow {repeat(5) { i ->delay(500)emit(i)}
}
flow{}
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
  • 参数类型suspend FlowCollector<T>.() -> Unit(一个可挂起的扩展函数类型)

  • 返回值:返回 SafeFlow

FlowCollector
public fun interface FlowCollector<in T> {/*** Collects the value emitted by the upstream.* This method is not thread-safe and should not be invoked concurrently.*/public suspend fun emit(value: T)
}

可以看到,FlowCollector 接口提供了 emit 方法,因此我们才能在 flow{} 代码块里直接调用 emit(),因为这个代码块本身就是FlowCollector 的扩展函数。

SafeFlow

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {override suspend fun collectSafely(collector: FlowCollector<T>) {collector.block()//这里的 block,实际上就是我们写的生产逻辑的代码块}
}

关键点:

  1. flow { ... } 返回 SafeFlow 实例
  2. 传入的 block 实际就是我们写的生产逻辑代码块
  3. 此时生产逻辑并没有被执行,等待 collectSafely 被调用后才会执行生产逻辑

2. 收集阶段

numberFlow.collect { value ->println("[消费者] 收到: $value")
}

numberFlowSafeFlow 的实例

AbstractFlow.collect()

SafeFlow 继承自 AbstractFlow,当我们调用 numberFlow.collect 时,实际上是走到了 AbstractFlow.collect()

// kotlinx-coroutines-core/common/src/flow/AbstractFlow.kt
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {/*** 这里的 collector,就是我们写的消费逻辑的代码块*/public final override suspend fun collect(collector: FlowCollector<T>) {// 创建 SafeCollector ,并且把消费逻辑代码块作为参数传递进去val safeCollector = SafeCollector(collector, coroutineContext)try {//调用实现类(SafeFlow)的collectSafely,此时的safeCollector是包含了消费逻辑代码块的collectSafely(safeCollector)} finally {safeCollector.releaseIntercepted()}}
}

关键点:

  • safeCollector 创建时,把 collector 传入,并且把 collector 包装成 SafeCollector,也就是保存了消费逻辑代码块
  • 然后,调用了 collectSafely,这里会走到实现类(SafeFlow)的collectSafely

SafeFlow.collectSafely()

//这里的 block 参数上面说过了,是我们生产逻辑的代码块
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {//这里的collector,则是把消费逻辑包装后的SafeCollector,当触发 collect 后,会触发生产逻辑,此时,collector也是包含消费逻辑了的。override suspend fun collectSafely(collector: FlowCollector<T>) {collector.block()}
}

关键点:

  • collector.block() 中的 collectorSafeCollector 实例

  • block 是我们写的生产逻辑(flow { ... } 中的代码),也就是示例中的:

repeat(5) { i ->delay(500)emit(i)
}

当执行到 emit(i) 时,实际上调用的是 SafeCollector.emit(i)

SafeCollector.emit()


internal actual class SafeCollector<T> actual constructor(@JvmField internal actual val collector: FlowCollector<T>,//保存的消费代码块@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {/*** This is a crafty implementation of state-machine reusing.* First it checks that it is not used concurrently (which we explicitly prohibit) and* then just cache an instance of the completion in order to avoid extra allocation on each emit,* making it effectively garbage-free on its hot-path.*/override suspend fun emit(value: T) {return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->try {emit(uCont, value)//执行下面的私有方法} catch (e: Throwable) {// Save the fact that exception from emit (or even check context) has been thrown// Note, that this can the first emit and lastEmissionContext may not be saved yet,// hence we use `uCont.context` here.lastEmissionContext = DownstreamExceptionContext(e, uCont.context)throw e}}}private fun emit(uCont: Continuation<Unit>, value: T): Any? {val currentContext = uCont.contextcurrentContext.ensureActive()//检查协程是否处于活跃状态// This check is triggered once per flow on happy path.//上下文检查,flow 不允许跨协程发送数据,这个后面会讲到val previousContext = lastEmissionContextif (previousContext !== currentContext) {checkContext(currentContext, previousContext, value)lastEmissionContext = currentContext}completion = uCont//真正执行消费的地方:collector 是我们写的消费逻辑代码块,value 就是 生产逻辑中发送的数据val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)/** If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)* and we don't have to retain a strong reference to it to avoid memory leaks.*/if (result != COROUTINE_SUSPENDED) {completion = null}return result}
}

关键点:

  • SafeCollector 保存了消费逻辑代码块,也就是参数 collector
  • emit 最终会调用到 emitFun
  • emitFun(collector, value, this) 中的 collector 是我们传入的消费逻辑代码块,就是示例代码中的
    { value -> println("[消费者] 收到: $value") }

emitFun

@Suppress("UNCHECKED_CAST")
private val emitFun =FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

这里要转换来看

// emitFun 的类型是 Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
// 相当于下面代码:
fun emitFun(collector: FlowCollector<Any?>, value: Any?, completion: Continuation<Unit>): Any? {return collector.emit(value, completion)  // 调用消费者的 emit 方法
} 

当执行 emitFun(collector, value, this) 时:

  • collector 是我们的消费逻辑({ value -> println("收到: $value") }

  • value 是数据(比如 0, 1, 2)

  • this 是 SafeCollector 自己作为 Continuation

我们的消费逻辑代码块实际上就是一个 FlowCollector 的实现

// 当我们写 collect { value -> println("收到: $value") } 时
// 实际上创建了一个匿名类,实现了 FlowCollector 接口:val consumer = object : FlowCollector<Int> {override suspend fun emit(value: Int) {println("收到: $value")  // 我们的消费逻辑}
}

所以当调用 emitFun(collector, value, completion) 时,实际上是在调用我们的消费逻辑

换句话说,当我们调用 emit 时,相当于是把 emit() 替换为消费代码块里的代码。

例如:

suspend fun basicFlowBuilder() {val numberFlow = flow {repeat(5) { i ->delay(500)emit(i) // 发射数据}}numberFlow.collect { value ->println("[消费者] 收到: $value")}
}

就相当于:

suspend fun basicFlowBuilder() {repeat(5) { i ->delay(500)println("[消费者] 收到: $value")//消费代码块替换掉 emit}
}

整体执行流程

1.numberFlow.collect { value -> println("[消费者] 收到: $value") }2.AbstractFlow.collect(collector) 被调用-collector = { value -> println("[消费者] 收到: $value") }(我们的消费逻辑)3.SafeCollector(collector, coroutineContext) 被创建-SafeCollector.collector = 我们的消费逻辑
-SafeCollector 自己也实现了 FlowCollector<T>4.SafeFlow.collectSafely(safeCollector) 被调用-safeCollector 是 SafeCollector 实例
↓
5.safeCollector.block() 执行-block 是我们的生产逻辑 : repeat (5) { i -> delay(500); emit(i) }
-当执行到 emit (i) 时 , 调用 SafeCollector.emit(i)6.SafeCollector.emit(i) 执行-检查上下文一致性
-调用 emitFun (SafeCollector.collector, i, continuation)
-SafeCollector.collector 就是我们的消费逻辑
↓
7.我们的消费逻辑被执行:println("[消费者] 收到: $i")8.继续下一次循环,直到 repeat(5) 完成

总结

  • SafeFlow 负责执行生产逻辑,SafeCollector 负责执行消费逻辑。

  • collector.block() 中的 collectorSafeCollector 实例,保存了消费逻辑代码块

  • block 是我们的生产逻辑(flow { ... } 中的代码)

  • 当生产逻辑调用 emit(value) 时,会触发 SafeCollector.emit(value)

  • SafeCollector.emit(value) 最终会调用我们传入的消费逻辑

以上就是 Flow 内部的实现机制。这就是为什么说 Flow 是"冷流",因为生产逻辑只有在被收集时才开始执行,而且每次收集都是全新的执行。

到这里,Flow 内部的执行机制就搞明白了。


Flow 的重要限制

在看其他创建方式之前,先明确一个非常重要的限制:Flow 不允许在不同协程中调用 emit()

示例

suspend fun wrongConcurrentEmitExample() {val errorFlow = flow {println("\u001B[32m[主协程] 开始创建Flow\u001B[0m")//在新协程中调用emitlaunch {delay(1000)emit("来自子协程的数据")  // 这里会抛出异常!}emit("正常数据")  // 这个是正常的}try {errorFlow.collect { data ->println("\u001B[34m[消费者] 收到: $data\u001B[0m")}} catch (e: Exception) {println("\u001B[31m[异常] ${e.message}\u001B[0m")}
}

在这里插入图片描述

抛出的异常表明:FlowCollector 不是线程安全的,禁止并发发送数据

报错堆栈:

在这里插入图片描述

SafeCollector 的检查机制

collect 内部会构建 SafeCollector,这个我们前面分析源码的时候已经知道了

在这里插入图片描述

SafeCollector 在执行 emit 时会检查上下文:

在这里插入图片描述

当协程上下文不一致时,会抛出异常:

在这里插入图片描述

关键代码:

if (emissionParentJob !== collectJob) {error("Flow invariant is violated:\n" +"\t\tEmission from another coroutine is detected.\n" +"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'")
}

关键点emit 发生时的 Job 必须与 collectJob 一致(或为其子层级)。因此在另一协程中调用 emit 会触发异常。

为什么要有这个限制?

根因在于 Flow 的冷流本质

冷流的设计理念

Flow 是冷流,每次 collect 都会重新执行 flow{} 代码块,就像一次函数调用:

  • 调用者(collect)与被调用者(flow{})在同一执行上下文中

  • 生产与消费是同步协作,而非并发竞争

  • 过程需要顺序、可预测

协程上下文的一致性

Flow 中,emit 的协程上下文应当与 collect 保持一致。否则:

  • 上下文一致性被破坏,可能产生线程切换问题

  • 异常无法被正确传播到消费者

  • 取消机制失效,子协程取消无法正确传递

源码分析的时候也提到过,emit() 的代码相当于是把消费逻辑代码块给替换掉 emit(),如果说 Flow
生产逻辑可以跨协程并发执行,那么,消费逻辑逻辑代码块就会出现跟预期不符的逻辑。

例如,我原本在消费逻辑默认是在主线程中运行,如果可以跨协程 emit,例如,切到了 IO 线程,那么,消费逻辑就会跑到 emit
所在的IO线程中执行,无法保证上下文一致。


总结

Flow 在使用上还是很简单的,关键是要搞清楚概念特性以及底层实现原理,以及结合特性用在合适的场景中去。

核心原则:Flow 是用来处理数据流的,如果你的场景是一次性的数据获取,直接用 suspend fun 就够了。记住,合适的工具做合适的事情,这样代码才会既清晰又高效。

Flow 虽然在生产端存在限制,不能跨协程并发地生产数据,但 Kotlin 还给我们提供了其他的解决方案,具备更加灵活的生产端。下一篇文章,我们将深入探讨如何突破Flow 的限制,看看 ChannelFlow 的结合之道。


好了, 本篇文章就是这些,希望能帮到你。下一篇:突破 Flow 限制:Channel 与 Flow 的结合之道

感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客


文章转载自:

http://aXLhrNwp.pLxhq.cn
http://hL8jb35J.pLxhq.cn
http://GuogtAaZ.pLxhq.cn
http://pMb4wG1h.pLxhq.cn
http://L0ZXfeHh.pLxhq.cn
http://VTT4D8WA.pLxhq.cn
http://Pzvddb0L.pLxhq.cn
http://MHLNdpmN.pLxhq.cn
http://pQWhMVS3.pLxhq.cn
http://6le9rpvk.pLxhq.cn
http://dQiT2mQ0.pLxhq.cn
http://A1bx4bKA.pLxhq.cn
http://WJqikFuU.pLxhq.cn
http://KnDFA9uC.pLxhq.cn
http://Vu8qwyv4.pLxhq.cn
http://BldsK19X.pLxhq.cn
http://JjDLcE7W.pLxhq.cn
http://cvFqIvae.pLxhq.cn
http://ErIrr4l7.pLxhq.cn
http://3HjeiWlX.pLxhq.cn
http://fdyzVDoi.pLxhq.cn
http://vVOPJJ5s.pLxhq.cn
http://HsgaQAk0.pLxhq.cn
http://8vFakv9V.pLxhq.cn
http://6M4M1JYv.pLxhq.cn
http://SyIspuOV.pLxhq.cn
http://56eIpQ2z.pLxhq.cn
http://sMSpXWtM.pLxhq.cn
http://BzM2BP0B.pLxhq.cn
http://n0bErrii.pLxhq.cn
http://www.dtcms.com/a/371944.html

相关文章:

  • Vue2.x核心技术与实战(六)-Vuex
  • 认知篇#12:基于非深度学习方法的图像特征提取
  • 软考备考①
  • 信息安全工程师软考攻坚:第三章网络安全技术深度解析与实战训练
  • JDK17日期格式‘MMM’导致九月Sept/Sep格式化异常问题❗❗❗
  • Vulkan 学习(20)---- UniformBuffer 的使用
  • 微信小程序中实现AI对话、生成3D图像并使用xr-frame演示
  • 【不背八股】9.MySQL知识点汇总
  • MySQL6
  • 论文阅读:ICLR 2021 BAG OF TRICKS FOR ADVERSARIAL TRAINING
  • GD32自学笔记:4.ADC
  • LeetCode 522.最长特殊序列2
  • CentOS 7.2 虚机 ssh 登录报错在重启后无法进入系统
  • 腾讯混元 3D 2.0 Windows 便携版:低显存需求下的高效文/图生3D体验
  • 火山 RTC 引擎15 拉流 推流 地址生成器 、合流转推 开关
  • CesiumJS详解:打造专业级Web 3D地球仪与地图的JavaScript库
  • 数据结构:顺序表与链表
  • C++ 前缀和 高频笔试考点 实用技巧 牛客 DP34 [模板] 前缀和 题解 每日一题
  • kotlin - 平板分屏,左右拖动,2个Activity计算宽度,使用ActivityOptions、Rect(三)
  • 【软考架构】第七章 系统架构设计基础知识-7.2基于架构的软件开发方法:Architecture-Based Software Design,ABSD
  • Dify 从入门到精通(第 81/100 篇):Dify 的多模态模型监控(高级篇)
  • 2019年11月系统架构设计师真题及解析摘要
  • 基于Django的“社区爱心养老管理系统”设计与开发(源码+数据库+文档+PPT)
  • IO性能篇(二):文件读写的四种分类
  • 超越模仿,探寻智能的本源:从人类认知机制到下一代自然语言处理
  • 计算机视觉(十二):人工智能、机器学习与深度学习
  • 去中心化投票系统开发教程 第五章:测试与部署
  • 自然语言处理之第一课语言转换方法
  • 移动端代理配置:iOS和Android设备代理设置完全指南
  • 【面试向】区块链介绍