Kotlin 协程之 Flow 操作符大全
前言
在 专栏 中我们掌握了 Flow
、SharedFlow
、StateFlow
的概念和基本用法之后,我们再来看一看 Flow
里的常见操作符。
按照官方文档的习惯,可以把操作符分成以下几类:
- 创建操作符:用于创建和构建
Flow
实例,是数据流的起点。 - 中间操作符:对数据做处理/转换/过滤,返回一个新的
Flow
,后续可以接着链式调用其他 API。 - 终端操作符:真正触发收集,把
Flow
里的数据送到下游去处理,不能再继续链式调用其他 API。
Flow
自带的操作符已经很丰富了,大部分场景都够用;实在不够,还可以自定义操作符。
Flow 创建操作符
先简要回顾一下 Flow
的创建方式。这些创建操作符在专栏之前的文章中已经详细介绍过,本篇文章就不过多赘述了。
常用创建操作符
操作符 | 功能描述 | 示例 |
---|---|---|
flow { } | 最基础的 Flow 构建器,通过 emit 发射值 | flow { emit(1); emit(2) } |
flowOf() | 从可变参数创建 Flow | flowOf(1, 2, 3) |
asFlow() | 将集合、数组、序列等转换为 Flow | listOf(1, 2, 3).asFlow() |
channelFlow { } | 基于 Channel 的 Flow 构建器,支持并发发射 | channelFlow { send(1) } |
callbackFlow { } | 用于回调 API 的 Flow 构建器,常用于 Android | callbackFlow { callback.register() } |
emptyFlow() | 创建空的 Flow | emptyFlow<Int>() |
特殊创建操作符
MutableSharedFlow()
- 创建可变的热流MutableStateFlow()
- 创建可变的状态流SharedFlow.asSharedFlow()
- 转换为只读SharedFlowStateFlow.asStateFlow()
- 转换为只读StateFlow
操作符分类概览
分类 | 操作符 | 详细功能描述 | 类型 | 注意事项 |
---|---|---|---|---|
基础操作符 | collect | 启动 Flow 收集,触发上游数据流开始执行 | 终端 | 会挂起当前协程;需在协程作用域中调用 |
transform | 通用转换;可发射0个、1个或多个值 | 中间 | 功能强、需保证发射与完成语义一致 | |
map | 一对一转换,将每个输入值映射为一个输出值 | 中间 | 常用且安全;不改变元素数量 | |
onEach | 执行附加处理(如日志/打印/统计),不改变数据 | 中间 | 仅做附加处理;避免在此做业务转换 | |
onStart | 收集前执行一次,常用于显示加载状态 | 中间 | 只执行一次;适合初始化 | |
onCompletion | 流完成时执行,无论正常完成还是异常结束 | 中间 | 可检测异常但不处理异常 | |
onEmpty | 空流时执行备用逻辑或发射默认值 | 中间 | 仅当上游未发射任何值时触发 | |
过滤操作符 | filter | 保留满足条件的元素 | 中间 | 简单直观,性能良好 |
filterNot | 保留不满足条件的元素 | 中间 | 等价于 filter { !cond } | |
filterIsInstance | 类型过滤,只保留指定类型的元素 | 中间 | 自动类型转换;类型安全 | |
take | 取前N个元素,随后完成流 | 中间 | 会提前结束流;注意上游资源清理 | |
takeWhile | 条件为真持续取;遇到第一个为假即停止 | 中间 | 一旦为假即终止流 | |
drop | 跳过前N个元素,发射剩余 | 中间 | 常用于分页或跳过头部数据 | |
dropWhile | 跳过满足条件元素直到遇到第一个不满足 | 中间 | 仅在开头判断;后续不再判断 | |
distinctUntilChanged | 去除连续重复,仅在值变化时发射 | 中间 | 仅比较相邻元素;非全局去重 | |
累积操作符 | scan | 累积计算并发射每个中间结果(含初始值) | 中间 | 发射初始值;结果数量 = 输入数量 + 1 |
runningFold | 带初始值的累积;发射每次折叠结果 | 中间 | 与 scan 行为一致 | |
runningReduce | 使用第一个元素作为初始值进行累积 | 中间 | 空流会抛异常;结果数量 = 输入数量 | |
组合操作符 | combine | 组合多个流的最新值;任一上游发射都会触发 | 中间 | 所有上游至少发射一次才开始;可能引入并发 |
combineTransform | 组合并在转换块中自定义发射逻辑 | 中间 | 转换块可发射多个值;所有上游至少发射一次 | |
merge | 并发收集多个流,按到达顺序发射元素;不做配对 | 中间 | 不等待配对;可能乱序;注意控制上游数量 | |
zip | 一一配对;等待所有上游都有新值才发射 | 中间 | 速度受最慢上游限制;可能造成背压 | |
withIndex | 为每个元素添加从0开始的索引 | 中间 | 返回 IndexedValue<T> | |
展开操作符 | flatMapConcat | 顺序展开内部流,等待前一个完成再处理下一个 | 中间 | 串行执行;保证顺序但可能较慢 |
flatMapMerge | 并发展开内部流;可同时处理多个内部流 | 中间 | 并发执行;控制并发数量避免资源耗尽 | |
flatMapLatest | 只处理最新的内部流;新到来时取消旧的 | 中间 | 会取消之前的内部流;适合搜索、实时输入 | |
最新值操作符 | mapLatest | 映射最新值;新值到来时取消之前的映射 | 中间 | 适合耗时转换;避免过时计算 |
transformLatest | 转换最新值;转换块可发射多个结果 | 中间 | 比 mapLatest 更灵活;注意取消逻辑 | |
transformWhile | 条件转换;当条件为false时终止整个流 | 中间 | 终止的是整个流而非仅当前元素 | |
分块操作符 | chunked | 将连续元素分组成指定大小的块 | 中间 | 最后一块可能不满;注意边界 |
上下文控制 | flowOn | 切换上游执行上下文(线程) | 中间 | 仅影响上游;下游仍在原线程 |
buffer | 添加缓冲区;允许生产者与消费者异步执行 | 中间 | 提升吞吐;增加内存占用;与 conflate /zip 交互需理解 | |
conflate | 合并快速发射的值,只保留最新 | 中间 | 会丢失中间值;适合 UI 等只关心最新值场景 | |
cancellable | 让阻塞处理可取消;定期检查取消 | 中间 | 仅对阻塞/CPU密集处理有意义;与 map /transform 搭配使用 | |
时间控制 | debounce | 防抖;在指定时间内没有新值时才发射 | 中间 | 会延迟发射;最后一个值总会发射 |
sample | 采样;按固定时间间隔发射最新值 | 中间 | 可能丢失值;适合定期更新 | |
timeout | 指定时限内未发射则抛 TimeoutCancellationException | 中间 | 下游延迟不计入;非正值立即超时 | |
异常处理 | catch | 捕获上游异常;可发射默认值或重新抛出 | 中间 | 仅捕获上游异常;不处理下游 |
retry | 异常自动重试指定次数 | 中间 | 注意退出条件;避免无限重试 | |
retryWhen | 基于异常类型与次数的条件化重试 | 中间 | 可实现指数退避、延迟等策略 | |
终端操作符 | collectLatest | 收集最新值;新值到来时取消之前处理 | 终端 | 会取消慢速处理;适合 UI 更新 |
collectIndexed | 收集时携带索引(从0开始) | 终端 | 与 collectLatest 不同;不会取消块 | |
launchIn | 在指定协程作用域中启动收集;返回 Job | 终端 | 非阻塞;可取消;绑定生命周期 | |
produceIn | 在作用域中生产到 ReceiveChannel ;返回通道 | 终端 | 作用域取消会关闭通道;适合跨 API 边界消费 | |
toList | 收集所有元素到 List | 终端 | 需等待流完成;注意内存 | |
toSet | 收集所有元素到 Set (自动去重) | 终端 | 全局去重;需等待流完成 | |
first/firstOrNull | 获取第一个元素;随后立即终止流 | 终端 | first 空流抛异常;firstOrNull 更安全 | |
last/lastOrNull | 获取最后一个元素 | 终端 | 必须等待流完成;空流异常(lastOrNull 更安全) | |
single/singleOrNull | 获取唯一元素;数量不为1则出错 | 终端 | 适用于严格唯一场景;否则使用集合或 first /last | |
reduce | 归约为单个值;使用第一个元素作为初始值 | 终端 | 空流抛异常;与 fold 的选择取决于是否需要初始值 | |
fold | 带初始值的归约;可返回不同类型 | 终端 | 空流安全;返回初始值 | |
count | 计算元素数量(可带条件) | 终端 | 必须遍历所有元素;大流注意性能 | |
all | 是否所有元素都满足条件 | 终端 | 短路求值;遇到 false 立即返回;空流返回 true | |
any | 是否存在任意满足条件的元素 | 终端 | 短路求值;遇到 true 立即返回;空流返回 false |
基础操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
collect | 触发收集、挂起当前协程 | 终端 | 在协程作用域中调用;正确处理取消 |
transform | 多发射、多完成控制 | 中间 | 保持发射/完成语义一致;避免复杂副作用 |
map | 一对一映射 | 中间 | 纯函数;避免阻塞计算 |
onEach | 附加处理、不改数据 | 中间 | 仅做附加处理;尽量轻量 |
onStart | 收集前执行一次 | 中间 | 适合初始化/展示加载 |
onCompletion | 完成时执行 | 中间 | 可检测异常但不处理异常 |
onEmpty | 空流时触发 | 中间 | 可发射默认值或备选数据 |
collect - 启动收集
先看最常用的终端操作符 collect
。
所有的 Flow
最终都会落到一个 collect
方法上。collect
的作用就是"开始收集",把上游发来的值一个个交给我们定义的处理逻辑(下游)。
示例代码:
fun demoCollect() {println("collect: 开始收集 Flow")val flow = flowOf(1, 2, 3).onEach { println("collect: Flow 发射了 $it") }// 注意:collect 是挂起函数,必须在协程中调用CoroutineScope(Dispatchers.IO).launch {flow.collect { value ->println("collect: 接收到值 $value")delay(100) // 模拟处理时间println("collect: 处理了 $value")}}println("执行完毕")// 这里需要等待,否则 collect 的协程可能还没跑完,进程就结束了TimeUnit.MILLISECONDS.sleep(1000 * 5)
}
需要注意:终端操作符(比如 collect
)都是挂起函数,必须在挂起函数里或者协程里调用。
这里说一下为什么示例里要 TimeUnit.MILLISECONDS.sleep(1000 * 5)
?
- 在 JVM 控制台程序里,进程的生命周期跟
main()
主线程绑在一起。主线程一结束,如果剩下的线程都是“守护线程”(协程默认就跑在这样的线程池上),那进程就直接退出了,协程可能还没来得及跑完。 - 在 Android 里一般不用担心这个问题:主线程(UI 线程)有 Looper,属于非守护线程;只要应用还在前台或没被系统杀,协程就有机会跑完。
如果你不想用 sleep
,还可以:
- 用
runBlocking { ... }
包住示例代码; - 或者保存
launch
返回的Job
,再job.join()
等待它结束;
示例是为了演示 collect
终端操作符的用法,所以 sleep
了几秒
transform - 通用转换
transform
可以说是 Flow
操作符中最灵活、最强大的通用操作符。通过源码可以看到,它通过暴露 FlowCollector
给调用者,我们可以:
- 自由决定是否发射元素
- 发射多个元素
- 插入任意逻辑
绝大多数常见操作符(map
、filter
、take
等)都是基于 transform
实现的。
另外,transform
会通过 flow {}
创建一个新的 SafeFlow
实例,因此每次调用 transform
都会返回一个新的
Flow
。这也是为什么你可以无限链式调用 Flow
操作符,而不会相互干扰。
示例代码:
suspend fun demoSuspendTransform() {flowOf("用户A", "用户B", "用户C").transform { userName ->emit("开始查询 $userName 的信息...")// 模拟异步网络请求delay(500)val userInfo = "[$userName: ID=${userName.hashCode()}, 状态=在线]"emit("查询完成: $userInfo")}.collect { println(" $it") }
}
map - 一对一转换
map
用于把每个元素做一个"一进一出"的转换,返回一个新的 Flow
,里面是转换后的结果。是最常用、最直观的转换操作符。
示例代码:
suspend fun demoTypeConversion() {flowOf(1, 2, 3, 4, 5).map { "数字: $it" } // Int -> String.collect { println(" $it") }
}
onEach - 数据遍历
也是基于 transform
实现,flow
生产的每个数据都会经过 onEach
,可以执行附加处理(例如日志、埋点、计时),但不改变数据本身。
示例代码:
suspend fun demoOnEach() {flowOf("任务A", "任务B", "任务C").onEach { task ->val startTime = System.currentTimeMillis()println(" 开始处理: $task (${startTime})")}.map { task ->delay(100) // 模拟处理时间"完成-$task"}.onEach { result ->val endTime = System.currentTimeMillis()println(" 处理完成: $result (${endTime})")}.collect { println(" 收集到: $it") }
}
注意事项:
onEach
里的代码可以是挂起函数,比如网络请求、数据库操作等- 不要在
onEach
里做耗时操作,会影响整个流的性能 - 如果
onEach
里抛异常,会中断整个流 - 适合日志、打印、统计、埋点等旁路操作(不改变数据)
onStart - 开始收集前的回调
从源码可以看到,onStart
在 Flow
开始被收集之前 触发(先执行 onStart 的代码,然后才执行的 collect
),接收者是
FlowCollector
,所以可以通过 emit()
发射额外的元素。
示例代码:
suspend fun demoOnStart() {// 模拟一个可能为空的数据源val dataFlow = flow<String> {delay(500) // 模拟延迟emit("真实数据")}dataFlow.onStart {emit("默认占位数据") // 立即发射默认值println(" 💡 发射了默认值,用户不会看到空白页面")}.collect { println(" 收到: $it") }
}
使用场景:
- 显示加载状态:
onStart { _loading.value = true }
- 发射默认值:
onStart { emit(defaultValue) }
- 记录开始时间:
onStart { startTime = System.currentTimeMillis() }
- 初始化资源:
onStart { initializeResources() }
注意点:
- 是在
flow
开始收集之前执行的 - 由于
SharedFlow
是热流,onStart
执行时 SharedFlow 可能已经在发射数据,所以可能会错过一些值。如果需要确保不错过
SharedFlow
的发射,应该考虑使用onSubscription
onCompletion - 完成时的回调
onCompletion
在 Flow
完成后触发(无论是正常完成还是异常完成),常用来做清理工作。
参数说明:
cause
为null
表示正常完成cause
不为null
表示异常完成,可以获取异常信息
示例代码:
suspend fun demoOnCompletion() {flowOf(1, 2, 3).onStart { println(" 🚀 开始执行") }.map {delay(100)it * 2}.onCompletion { cause ->if (cause == null) {println(" ✅ 正常完成!cause = null")} else {println(" ❌ 异常完成:$cause")}}.collect { println(" 收到: $it") }
}
使用场景:
- 隐藏加载状态:
onCompletion { _loading.value = false }
- 清理资源:
onCompletion { cleanup() }
- 记录完成时间:
onCompletion { endTime = System.currentTimeMillis() }
onEmpty - 空流处理
onEmpty
用于处理空的 Flow
,当上游 Flow
没有发射任何元素时,可以发射默认值或执行特定操作。它与 onStart
、onCompletion
一样,都属于生命周期回调操作符。
触发时机:
- 当上游
Flow
完成但没有发射任何元素时触发。 - 可以在其
action
中发射一个或多个默认值。
示例代码:
suspend fun demoOnEmpty() {println("1. 空 `Flow` 处理示例:")
// 空 `Flow` 的情况emptyFlow<Int>().onEmpty { emit(0) } // 空 `Flow` 时发射默认值 0.collect { println(" 空 `Flow` 结果: $it") } // 输出: 0println("2. 非空 `Flow` 不受影响:")flowOf(1, 2, 3).onEmpty { emit(0) } // 非空 `Flow`,onEmpty 不会执行.collect { println(" 非空 `Flow` 结果: $it") } // 输出: 1, 2, 3println("3. 过滤后变空的 `Flow`:")flowOf(1, 2, 3, 4, 5).filter { it > 10 } // 过滤后变成空 `Flow`.onEmpty { emit(-1) } // 发射默认值.collect { println(" 过滤后空 `Flow` 结果: $it") } // 输出: -1
}
使用场景:
- 默认值处理:当数据流为空时提供一个默认值,避免下游出现空状态。
- 用户体验优化:在搜索无结果时,显示“未找到结果”的提示信息。
- 数据兜底:确保下游总能收到一个有效的数据,防止因空数据流导致逻辑中断。
- 状态管理:在 MVVM/MVI 架构中,处理空数据状态,更新 UI。
注意事项:
onEmpty
只在Flow
完全没有发射任何元素时才会触发。- 如果
Flow
发射了元素但后来被filter
等操作符过滤掉了,onEmpty
仍然会触发。 onEmpty
的action
中可以发射多个值,但要注意这可能会影响下游的预期。
过滤操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
filter | 条件过滤 | 中间 | 条件尽量轻量,提升性能 |
filterNot | 反向过滤 | 中间 | 等价 filter { !cond } |
filterIsInstance | 类型过滤 | 中间 | 自动类型转换,类型安全 |
take | 取前 N 个后完成 | 中间 | 提前结束流;清理上游资源 |
takeWhile | 条件为真持续取 | 中间 | 第一次为假后终止整个流 |
drop | 跳过前 N 个 | 中间 | 常用于分页/跳过头部 |
dropWhile | 开头满足条件跳过 | 中间 | 仅开头判断;后续不再判断 |
distinctUntilChanged | 相邻去重 | 中间 | 非全局去重;可自定义比较 |
filter - 基础过滤
filter
用来过滤数据,只让满足条件的元素通过,也是基于 transform
实现的。
示例代码:
suspend fun demoFilter() {flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).filter { it % 2 == 0 } // 只保留偶数.collect { println(" $it") }
}
filterNot - 反向过滤
filterNot
是 filter
的反向操作,过滤掉满足条件的元素。
示例代码:
suspend fun demoFilterNot() {flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).filterNot { it % 2 == 0 } // 过滤掉偶数,保留奇数.collect { println(" 奇数: $it") }
}
filterIsInstance - 类型过滤
filterIsInstance
用来过滤特定类型的元素,在处理混合类型的流时很有用。实际上就是对 filter
进行了类型检查,只保留符合类型的元素。
示例代码:
suspend fun demoFilterIsInstance() {flowOf(1, "hello", 2.5, 3, "world", true, 4L).filterIsInstance<String>() // 只保留String类型.collect { println(" 字符串: $it") }
}
泛型类型过滤注意事项:
对于泛型类型的过滤,特别是集合类型,涉及到泛型擦除的情况时,需要额外的处理才能精确的判断:
suspend fun demoFilterIsInstanceWithGenerics() {val mixedCollections = flowOf(listOf(1, 2, 3), // List<Int>listOf("a", "b", "c"), // List<String>setOf("x", "y", "z"), // Set<String>setOf(4, 5, 6), // Set<Int>mapOf("key" to "value"), // Map<String, String>mapOf(1 to 2), // Map<Int, Int>arrayOf(4, 5, 6),"not a collection",mutableListOf(7, 8, 9))println(" 所有List类型 (无论泛型参数):")mixedCollections.filterIsInstance<List<*>>().collect { println(" List: $it") }println(" ❌ 错误做法 - 这样无法区分List<Int>和List<String>:")println(" filterIsInstance<List<Int>>() // 运行时无效,还是会把所有 List 类型的数据都过滤出来")println(" ✅ 正确做法 - 需要额外的类型检查:")mixedCollections.filterIsInstance<List<*>>().filter { list ->// 检查列表是否包含整数list.isNotEmpty() && list.first() is Int}.collect { println(" List<Int>: $it") }
}
take - 取前N个
take
用来只取前 N 个元素,取够了就自动完成。
示例代码:
suspend fun demoTake() {flow {var i = 1while (true) {emit(i++)delay(100) // 模拟数据产生的延迟}}.take(5) // 只取前5个,避免无限循环.onCompletion {if (it != null) {println(" 流异常: ${it.message}")} else {println(" 流正常完成")}}.collect { println(" 无限流中的第 $it 个数字") }
}
takeWhile - 条件取值
源码内部是调用了 collectWhile
takeWhile
会一直取元素,直到遇到第一个不满足条件的元素整个流就停止,后面的元素都不会处理了。
示例代码:
suspend fun demoTakeWhile() {println(" 取小于5的连续元素:")flowOf(1, 2, 3, 4, 5, 6, 7, 8).takeWhile { it < 5 }.onCompletion {println("流结束")}.collect { println(" $it") } // 输出: 1, 2, 3, 4println(" 取偶数的连续元素:")flowOf(2, 4, 6, 7, 8, 10).takeWhile { it % 2 == 0 }.onCompletion {println("流结束")}.collect { println(" $it") } // 输出: 2, 4, 6 (遇到7就停止)
}
drop - 跳过前N个
drop
用来跳过前 N 个元素,从第 N+1 个开始处理。
示例代码:
suspend fun demoDrop() {flowOf(1, 2, 3, 4, 5, 6, 7, 8).drop(3) // 跳过前3个: 从4开始.collect { println(" $it") } // 输出: 4, 5, 6, 7, 8
}
dropWhile - 条件跳过
dropWhile
会一直跳过满足条件的元素,对于不满足条件的元素,才会开始正常处理。
示例代码:
suspend fun demoDropWhile() {flowOf(1, 2, 3, 4, 5, 6, 7, 8).dropWhile { it < 5 } // 跳过小于5的元素: 从5开始.collect { println(" $it") } // 输出: 5, 6, 7, 8
}
distinctUntilChanged - 去重连续相同值
distinctUntilChanged
用来过滤掉连续重复的值,只有当值真正发生变化时才会发射。
示例代码:
data class User(val id: Int, val name: String, val status: String)suspend fun demoObjectComparison() {println("2. 对象比较示例:")val users = flowOf(User(1, "Alice", "online"),User(1, "Alice", "online"), // 重复User(2, "Bob", "offline"),User(2, "Bob", "offline"), // 重复User(3, "Charlie", "online"),User(1, "Alice", "online") // 不连续,不会被去重)println(" 原始用户数据:")users.collect { println(" $it") }println(" 去重连续相同用户:")users.distinctUntilChanged().collect { println(" $it") }println(" 按用户ID去重:")users.distinctUntilChanged { old, new ->old.id == new.id}.collect { println(" $it") }println(" 按用户状态去重:")users.distinctUntilChanged { old, new ->old.status == new.status}.collect { println(" $it") }
}
两种用法:
- 无参数:默认使用
equals
方法比较连续的元素是否相等 - 有参数:可以自定义比较函数,根据对象的某个属性去重
累积操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
scan | 累积并包含初始值 | 中间 | 发射初始值;数量 = 输入 + 1 |
runningFold | 带初始值累积 | 中间 | 与 scan 行为一致 |
runningReduce | 无初始值累积 | 中间 | 空流抛异常;数量 = 输入 |
scan - 累积并发射中间结果
scan
会对每个元素进行累积计算,并把每次的中间结果都发射出来。可以看到,实际就是调用了 runningFold
示例代码:
suspend fun demoScan() {flowOf(1, 2, 3, 4, 5).scan(0) { acc, value ->acc + value}.collect { println(" $it") } // 输出: 0, 1, 3, 6, 10, 15
}
runningFold - 带初始值的累积
runningFold
和 scan
类似,但语义更明确,表示"运行时累积"。
示例代码:
suspend fun demoRunningFold() {flowOf(1, 2, 3, 4, 5).runningFold(0) { acc, value ->acc + value}.collect { println(" $it") } // 输出: 0, 1, 3, 6, 10, 15
}
runningReduce - 不带初始值的累积
runningReduce
不需要初始值,直接用第一个元素作为初始累积值。
示例代码:
suspend fun demoRunningReduce() {flowOf(1, 2, 3, 4, 5).runningReduce { acc, value ->acc + value}.collect { println(" $it") } // 输出: 1, 3, 6, 10, 15
}
组合操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
combine | 最新值组合,任一上游驱动 | 中间 | 所有上游至少发一次;可能并发 |
combineTransform | 组合并自定义发射 | 中间 | 转换块可多发射;语义更灵活 |
merge | 并发合并,不做配对 | 中间 | 乱序;控制上游数量避免资源耗尽 |
zip | 一一配对 | 中间 | 受最慢上游限制;可能背压 |
withIndex | 附带索引 | 中间 | 返回 IndexedValue<T> |
combine - 多流中任一发射新值,则组合所有流的最新值
combine
用来 组合多个流的最新值。当任意一个流发射新值时,就会用所有流的最新值进行组合。
示例代码:
suspend fun demoCombine() {val temperatureFlow = flow {val temps = listOf(25.5, 26.0, 25.8, 26.2)temps.forEach { temp ->emit(temp)delay(200)}}val humidityFlow = flow {val humidity = listOf(60, 65, 62)humidity.forEach { h ->emit(h)delay(300)}}println(" 环境监控数据:")temperatureFlow.combine(humidityFlow) { temp, humidity ->"温度: ${temp}°C, 湿度: ${humidity}%"}.collect { println(" $it") }
}
combineTransform - 组合并转换(可发射多次或跳过)
combineTransform
比 combine
更加灵活 ,它在每次源流发来新元素时给你一个 emit
的挂起函数,从而可以:
- 发射 0 次(跳过当前组合)或多次(比如先发进度、再发结果)
- 执行更复杂的逻辑与条件分支
- 在组合过程中调用挂起函数(如网络/数据库)再决定如何发射
函数签名要点:
combineTransform(f1, f2) { v1, v2 -> emit(...) }
- 提供了 2~5 个源流的重载;当任一源流发出新值时,块会被调用一次
- 与
combine { a, b -> result }
不同,combineTransform
的块内可以多次emit
或者不emit
适用场景:
- 需要发射“进度 + 最终结果”的场景
- 需要根据条件跳过某些组合(例如数据不完整时不发)
- 需要在组合过程中做挂起操作(如校验/补充数据)
示例代码
suspend fun demoConditionalAndMultipleEmits() {val numbers = flowOf(1, 2, 3, 4)val letters = flowOf("A", "B", "C").onEach { delay(50) }numbers.combineTransform(letters) { n, l ->if (n % 2 == 0) {emit("偶数: $n$l")emit("偶数平方: ${n * n}$l")} else {// 奇数时跳过(不发射)}}.collect { println(" $it") }
}
执行结果
注意事项:
combineTransform
每次源流有新值都会触发一次块执行;块内是否发射、发射几次由你决定- 如果只是简单二元合并且总是发射一次,优先
combine
,它更简洁 - 当需要复杂逻辑、条件跳过或多次发射时,选择
combineTransform
merge - 并发合并多个流
merge
会并发收集多个上游流,哪个流先发就先下发给下游。它不做配对或组合(不同于 combine
/ zip
),只是将各个源流的元素按到达顺序汇聚到一起。
适用场景:
- 多数据源同时更新 UI:谁先到就先显示,不等待其他数据源
- 合并多个事件流/日志流,保证尽快处理每条事件
- 将
Flow<Flow<T>>
扁平化为单一Flow<T>
并发收集
示例代码:
两个不同速率的流,合并后按到达顺序处理
suspend fun demoMergeBasic() {val start = System.currentTimeMillis()val left: Flow<String> = flow {listOf("L1", "L2", "L3").forEach { v ->delay(100)println("\u001B[36m[${System.currentTimeMillis() - start}ms] left -> $v\u001B[0m")emit(v)}}val right: Flow<String> = flow {listOf("R1", "R2").forEach { v ->delay(150)println("\u001B[34m[${System.currentTimeMillis() - start}ms] right -> $v\u001B[0m")emit(v)}}merge(left, right).collect { println("\u001B[32m[${System.currentTimeMillis() - start}ms] merge 收到: $it\u001B[0m") }
}
zip - 一一配对
zip
会把两个流的元素一一配对,只有当两个流都有新元素时才会发射组合结果。
示例代码:
suspend fun demoBasicZip() {println("1. 基础 zip 操作:")val numbers = flowOf(1, 2, 3, 4, 5)val letters = flowOf("A", "B", "C")println(" Flow1 (数字):")numbers.collect { print("$it ") }println()println(" Flow2 (字母):")letters.collect { print("$it ") }println()println(" zip 结果 (一对一配对):")numbers.zip(letters) { num, letter -> "$num$letter" }.collect { println(" $it") }println(" 说明: zip 严格按顺序配对,较短的流决定输出长度")
}
和 combine
的区别:
zip
是一一配对,较短的流结束时整个组合就结束combine
是组合最新值,任意流发射时都会组合
withIndex - 添加索引
withIndex
给流中的每个元素添加索引,返回 IndexedValue
对象。
示例代码:
suspend fun demoWithIndex() {println("1. withIndex 操作符示例")println("=" * 30)// 基本用法println("基本用法:")flowOf("Apple", "Banana", "Cherry", "Date").withIndex().collect { indexedValue ->println("索引: ${indexedValue.index}, 值: ${indexedValue.value}")}
}
展开操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
flatMapConcat | 顺序展开 | 中间 | 串行保证顺序;可能较慢 |
flatMapMerge | 并发展开 | 中间 | 控制并发数量以防资源耗尽 |
flatMapLatest | 只处理最新内部流 | 中间 | 新到来取消旧流;适合搜索 |
flatMapConcat - 顺序展开处理流
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =map(transform).flattenConcat()
接收一个参数:transform: suspend (value: T) -> Flow<R>
,用于将每个元素转换为一个流。
flatMapConcat
是一个顺序处理内部流的操作符,会把数据按照顺序跟我们传递进去的 transform
函数进行转换,然后按顺序执行。
flatMapConcat
实际上是 map(transform).flattenConcat()
的简写。
关键特性:
- 顺序执行:严格按照输入顺序处理,前一个流完成后才处理下一个
- 有序输出:输出顺序与输入顺序完全一致
- 串行处理:同一时间只有一个内部流在运行
- 背压友好:不会产生过多的并发流,内存使用更可控
- 异常传播:任何内部流的异常都会中断整个处理过程
示例代码:
suspend fun demoBasicFlatMapConcat() {val orders = listOf("订单001", "订单002", "订单003")flowOf(*orders.toTypedArray()).flatMapConcat { orderId ->flow {try {// 步骤1:锁定库存println(" $orderId: 锁定库存...")delay(50)emit("$orderId: 库存锁定成功")// 步骤2:扣减余额println(" $orderId: 扣减余额...")delay(30)emit("$orderId: 余额扣减成功")// 步骤3:创建订单println(" $orderId: 创建订单...")delay(40)emit("$orderId: 订单创建成功")// 步骤4:发送通知println(" $orderId: 发送通知...")delay(20)emit("$orderId: 通知发送成功")} catch (e: Exception) {emit("$orderId: 处理失败 - ${e.message}")}}}.collect { println(" $it") }
}
flatMapMerge - 并发展开处理流
public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = DEFAULT_CONCURRENCY, // 默认为 16transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
flatMapMerge
是一个非常强大的并发操作符,接收两个参数:
concurrency
:并发执行的内部流数量,默认为 16transform
:Flow
类型,用于将每个元素转换为一个流
flatMapMerge
会并发地执行内部流,并将结果合并到输出流中。
实际上是 map(transform).flattenMerge(concurrency)
的简写。
关键特性:
- 并发执行:多个内部流可以同时运行,不需要等待前一个完成
- 无序输出:由于并发执行,输出顺序可能与输入顺序不同
- 可控并发:通过
concurrency
参数控制同时运行的流数量 - 背压处理:当并发数达到限制时,会等待有空闲位置
- 异常隔离:单个内部流的异常不会影响其他流
示例代码:
suspend fun demoFlatMapMerge() {val userIds = listOf("user1", "user2", "user3", "user4")userIds.asFlow().flatMapMerge(concurrency = 3) { userId ->flow {// 模拟 API 调用val delay = (100..300).random().toLong()delay(delay)emit("API响应: $userId (耗时: ${delay}ms)")}}.collect { println(" $it") }
}
flatMapLatest - 最新值展开
flatMapLatest
只关注最新的内部流,当新元素到来时会取消之前的内部流。
使用场景: 搜索建议,用户快速输入时取消之前的搜索请求。
示例代码:
suspend fun demoFlatMapLatest() {(1..3).asFlow().onEach {delay(150)println("生产了数据:${it}")} // 上游每隔 150ms 发射一个新值.flatMapLatest { number ->println("flatMapLatest 收到数据:${number}")flow {println("内部流开始处理:${number}")delay(200)println("${number} 处理完毕")emit(number)}.onCompletion {if (it != null) {println("${number} 处理异常" + it.message)} else {println("${number} 处理完成")}}}.collect { println("collect:${it}") }
}
最新值操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
mapLatest | 最新值映射 | 中间 | 取消旧映射;适合耗时转换 |
transformLatest | 最新值转换(多发射) | 中间 | 比 mapLatest 更灵活;注意取消 |
transformWhile | 条件转换终止流 | 中间 | 条件为 false 时终止整个流 |
mapLatest - 最新值映射
mapLatest
类似 flatMapLatest
,但用于简单的一对一转换。
示例代码:
suspend fun demoBasicMapLatest() {val startTime = System.currentTimeMillis()(1..4).asFlow().onEach {delay(150)println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 上游发出: $it\u001B[0m")}.mapLatest { n ->println("\u001B[34m[${System.currentTimeMillis() - startTime}ms] mapLatest 开始处理: $n\u001B[0m")try {repeat(3) { step ->delay(120)println("[${System.currentTimeMillis() - startTime}ms] $n -> 第${step + 1}步")}val result = "完成: $n"println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] $result\u001B[0m")result} finally {// 若被新值打断或完成,都会到 finally,可区分取消:isActive=false 代表取消if (!currentCoroutineContext().isActive) {println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] mapLatest $n 被新值取消\u001B[0m")} else {println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] mapLatest $n 正常完成\u001B[0m")}}}.collect { println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 下游收到: $it\u001B[0m") }
}
transformLatest - 最新值转换
transformLatest
是最灵活的"最新值"操作符,可以发射多个值。参数类型是 FlowCollector
,因此,自由度很高。
例如 mapLatest
,flatMapLatest
都是基于 transformLatest
实现的。
示例代码:
suspend fun demoBasicTransformLatest() {val startTime = System.currentTimeMillis()(1..3).asFlow().onEach {delay(200)println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 上游发出: $it\u001B[0m")}.transformLatest { n ->println("\u001B[34m[${System.currentTimeMillis() - startTime}ms] 开始处理: $n\u001B[0m")try {// 模拟耗时操作,分阶段处理并发出结果println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 第1阶段:初始化处理 $n\u001B[0m")delay(100)emit("初始化-$n") // 业务需要:发出初始化完成信号println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 第2阶段:核心计算 $n\u001B[0m")delay(100)emit("计算-$n") // 业务需要:发出计算结果println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 第3阶段:最终处理 $n\u001B[0m")delay(100)emit("完成-$n") // 业务需要:发出最终结果println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] ✓ 变换 $n 全部完成\u001B[0m")} catch (e: CancellationException) {println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] ✗ 变换 $n 被新值取消\u001B[0m")}}.collect {println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 下游收到业务数据: $it\u001B[0m")}
}
transformWhile - 条件转换
transformWhile
会一直处理元素,直到函数返回 false
后会停止流。
示例代码:
suspend fun demoBasicTransformWhile() {val startTime = System.currentTimeMillis()(1..3).asFlow().onEach {delay(100)println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 上游发出: $it\u001B[0m")}.transformWhile { n ->println("\u001B[34m[${System.currentTimeMillis() - startTime}ms] 检查条件: $n <= 5\u001B[0m")if (n <= 2) {println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 条件满足,开始处理 $n\u001B[0m")// 业务处理:计算平方和立方val square = n * nval cube = n * n * nprintln("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 计算结果:$n² = $square, $n³ = $cube\u001B[0m")emit("平方: $square") // 业务需要:发出平方结果emit("立方: $cube") // 业务需要:发出立方结果println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] ✓ 处理 $n 完成\u001B[0m")true // 继续处理下一个值} else {println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] ✗ 条件不满足 ($n > 5),停止整个流\u001B[0m")false // 停止整个流}}.collect {println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 下游收到业务数据: $it\u001B[0m")}
}
分块操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
chunked | 分块 | 中间 | 最后一块可能不满;注意边界 |
chunked - 分块
chunked
把流中的元素按指定数量分成块后,再发到下游去。
示例代码:
suspend fun demoBasicChunked() {println("\n\u001B[34m1) 基础行为:按大小分块处理\u001B[0m")val startTime = System.currentTimeMillis()(1..10).asFlow().onEach {delay(100)println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 上游发出: $it\u001B[0m")}.chunked(3) // 每3个元素分为一块.collect { chunk ->println("\u001B[34m[${System.currentTimeMillis() - startTime}ms] 收到数据块: $chunk\u001B[0m")// 业务处理:对每个块进行统计分析val sum = chunk.sum()val avg = chunk.average()val max = chunk.maxOrNull()println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 块统计 - 大小: ${chunk.size}, 总和: $sum, 平均: %.1f, 最大: $max\u001B[0m".format(avg))println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] ✓ 块 $chunk 处理完成\u001B[0m")}
}
上下文控制操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
flowOn | 切换上游上下文 | 中间 | 仅影响上游;下游不受影响 |
buffer | 缓冲、生产消费解耦 | 中间 | 增内存;与 zip /conflate 交互需理解 |
conflate | 只保留最新值 | 中间 | 丢失中间值;适合UI更新 |
cancellable | 让阻塞处理可取消 | 中间 | 配合 map /transform 使用 |
flowOn - 切换执行上下文
flowOn
用来指定上游操作在哪个调度器上执行。
注意: flowOn
只影响它上游的操作,不影响下游。
示例代码:
suspend fun demoBasicFlowOn() {println("\n\u001B[34m1) 基础行为:切换上游执行上下文\u001B[0m")val startTime = System.currentTimeMillis()flow {println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] Flow构建器执行线程: ${Thread.currentThread().name}\u001B[0m")repeat(5) { i ->println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 生成数据 $i (线程: ${Thread.currentThread().name})\u001B[0m")delay(200) // 模拟耗时的数据生成emit(i)}}.map { value ->println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] map变换 $value -> ${value * 2} (线程: ${Thread.currentThread().name})\u001B[0m")delay(100) // 模拟耗时的变换操作value * 2}.flowOn(Dispatchers.IO) // 上游操作切换到IO线程池.map { value ->println("\u001B[37m[${System.currentTimeMillis() - startTime}ms] 下游map $value -> $value² (线程: ${Thread.currentThread().name})\u001B[0m")value * value // 这个操作在collect的线程执行}.collect { result ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 收集结果: $result (线程: ${Thread.currentThread().name})\u001B[0m")}
}
buffer - 缓冲
buffer
用来在生产者和消费者之间添加缓冲区,避免生产者生产过快而消费者处理不过来的情况。
可以传递以下参数:
capacity: Int = BUFFERED
- 缓冲数量onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
- 溢出策略
效果: 生产者不用等待消费者处理完,可以继续生产到缓冲区。
示例代码:
suspend fun demoWithBuffer() {println("\n\u001B[34m2) 有缓冲:异步执行模式\u001B[0m")val startTime = System.currentTimeMillis()flow {repeat(5) { i ->println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 生产者开始生产数据 $i\u001B[0m")delay(200) // 模拟生产耗时println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 生产者完成数据 $i,放入缓冲区\u001B[0m")emit(i)}println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 🏁 生产者全部完成,无需等待消费者\u001B[0m")}.buffer(capacity = 3) // 设置缓冲区大小为3.collect { value ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 消费者开始处理数据 $value\u001B[0m")delay(300) // 模拟消费耗时println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 消费者完成处理数据 $value\u001B[0m")}println("\u001B[35m💡 性能提升:生产者无需等待消费者,可以并行工作\u001B[0m")
}
conflate - 合并,只保留最新
conflate
当消费者处理不过来时,会丢弃中间的值,只保留最新的。实际上就是调用的 buffer
,只不过缓冲策略是 CONFLATED
。
示例代码:
suspend fun demoWithConflate() {println("\n\u001B[34m2) 有conflate:只处理最新数据\u001B[0m")val startTime = System.currentTimeMillis()flow {repeat(5) { i ->println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 快速生产数据 $i\u001B[0m")delay(100) // 生产速度快emit(i)println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 快速生产数据 $i 完成 \u001B[0m")}println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 🏁 生产者完成\u001B[0m")}.conflate() // 启用conflate:跳过积压的中间值.collect { value ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 消费者开始处理数据 $value\u001B[0m")delay(300) // 消费速度慢println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 消费者完成处理数据 $value\u001B[0m")}
}
cancellable - 使其可取消
cancellable
操作符确保一个原本不可取消的 Flow
变得可以响应取消操作。这在处理那些不遵循协程协作取消机制的 Flow
时非常有用。
使用场景:
- 当一个
Flow
在一个繁忙的循环中发射元素,而没有调用任何挂起函数时,它可能不会检查当前的协程是否已被取消。cancellable
会在每次发射前检查取消状态。
示例代码:
suspend fun demoCancellable() {val flow = flow {for (i in 1..5) {// 模拟一个繁忙的循环,不调用挂起函数Thread.sleep(100)emit(i)}}val job = CoroutineScope(Dispatchers.Default).launch {flow.cancellable().collect { value ->println("Collecting $value")}}delay(350)println("Cancelling the job")job.cancelAndJoin()println("Job cancelled")
}
输出:
Collecting 1
Collecting 2
Collecting 3
Cancelling the job
Job cancelled
如果没有 cancellable()
,Flow
将会继续运行直到完成,即使作业已经被取消。
时间控制操作符
对比总结(本类别)
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
debounce | 防抖 | 中间 | 延迟发射;最后一个值必发 |
sample | 采样 | 中间 | 可能丢值;适合定期刷新 |
timeout | 上游发射超时抛异常(可 catch 兜底) | 中间 | 下游延迟不触发;非正值立即超时 |
timeout - 超时未发射则抛异常
- 概念:如果上游在给定时长内没有发射元素,则流会以
TimeoutCancellationException
结束;可在catch
中转为默认值或补偿信号。 - 签名:
fun <T> Flow<T>.timeout(timeout: kotlin.time.Duration): Flow<T>
- 关键特性:
- 下游延迟不计入超时(例如在
onEach { delay(...) }
中的处理不会触发超时)。 - 非正时长会立即超时。
- 下游延迟不计入超时(例如在
示例代码:
suspend fun demoTimeoutBasic() {val flow = flow {emit(1)delay(100)emit(2)delay(100)emit(3)delay(1000) // 超过 100ms 的间隔,触发超时emit(4)}flow.timeout(100.milliseconds).catch { e ->if (e is TimeoutCancellationException) {emit(-1) // 超时后可以发这个默认值,也可以做其他处理} else throw e}.onEach { delay(300) } // 下游延迟不计入超时.collect { println(" 收到: $it") }
}
debounce - 防抖
debounce
只有在指定时间内没有新值时,才会发射最后一个值。
使用场景: 搜索输入框,用户停止输入一段时间后才执行搜索。
示例代码:
suspend fun demoWithDebounce() {println("\n\u001B[34m2) 有debounce:等待输入稳定\u001B[0m")val startTime = System.currentTimeMillis()flow {// 模拟用户的输入val inputs = listOf("a", "ab", "abc", "abcd", "abcde")inputs.forEachIndexed { index, input ->val delayTime = if (index < 3) 150L else 600L // 前3个快速输入,后2个慢速输入delay(delayTime)println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 用户输入: '$input'\u001B[0m")emit(input)}}.debounce(300) // 300ms内无新输入才会发出值.collect { query ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 执行搜索: '$query' (输入已稳定)\u001B[0m")delay(200L) // 模拟搜索API调用println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 搜索完成: '$query' -> 找到${query.length}个结果\u001B[0m")}
}
sample - 采样
sample
按固定时间间隔采样最新值,发射给下游。
示例代码:
suspend fun demoWithSample() {println("\n\u001B[34m2) 有sample:定期采样最新数据\u001B[0m")val startTime = System.currentTimeMillis()flow {repeat(10) { i ->delay(100L) // 每100ms产生一个数据val value = "数据-$i"println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 生产: $value\u001B[0m")emit(value)}}.sample(250L) // 每250ms采样一次最新值.collect { data ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] UI更新: $data (采样值)\u001B[0m")delay(50L) // 模拟UI更新耗时}
}
异常处理操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
catch | 捕获上游异常 | 中间 | 仅捕获上游;位置要在可能抛出异常之后 |
retry | 简单重试 | 中间 | 需设置上限;避免无限重试 |
retryWhen | 条件化重试 | 中间 | 可实现退避/延迟策略 |
catch - 捕获异常
catch
用来捕获上游的异常,可以发射默认值或重新抛出。
注意: catch
只能捕获上游的异常,不能捕获下游(collect)的异常。
示例代码:
suspend fun demoWithCatch() {val startTime = System.currentTimeMillis()flow {repeat(5) { i ->delay(200L)if (i == 3) {println("\u001B[31m[${System.currentTimeMillis() - startTime}ms] 💥 模拟网络异常!\u001B[0m")throw RuntimeException("网络连接失败")}val data = "数据-$i"println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 生产: $data\u001B[0m")emit(data)}}.catch { exception ->println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] 🛡️ 捕获异常: ${exception.message}\u001B[0m")println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] 🔄 执行恢复策略...\u001B[0m")// 发出默认值或恢复数据emit("恢复数据-默认值")emit("恢复数据-缓存值")}.collect { data ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 处理: $data\u001B[0m")}
}
retry - 重试
retry
在发生异常时自动重试。可以配置重试次数。
示例代码:
suspend fun demoWithRetry() {val startTime = System.currentTimeMillis()var attemptCount = 0flow {repeat(3) { i ->delay(300L)attemptCount++println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] 🔄 第${attemptCount}次尝试...\u001B[0m")if (attemptCount <= 2) {println("\u001B[31m[${System.currentTimeMillis() - startTime}ms] 💥 第${attemptCount}次尝试失败!\u001B[0m")throw RuntimeException("网络不稳定")}val data = "成功数据-$i"println("\u001B[36m[${System.currentTimeMillis() - startTime}ms] ✅ 生产: $data\u001B[0m")emit(data)}}.retry(3) { exception ->println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] 🛡️ 捕获异常: ${exception.message}\u001B[0m")println("\u001B[33m[${System.currentTimeMillis() - startTime}ms] ⏳ 准备重试...\u001B[0m")delay(500L) // 重试前等待500mstrue // 返回true表示需要重试}.catch { exception ->println("\u001B[31m[${System.currentTimeMillis() - startTime}ms] ❌ 重试次数用完,最终失败: ${exception.message}\u001B[0m")emit("降级数据-缓存值") // 提供降级方案}.collect { data ->println("\u001B[32m[${System.currentTimeMillis() - startTime}ms] 处理: $data\u001B[0m")}
}
retryWhen - 条件重试
retryWhen
可以根据异常类型和重试次数决定是否重试。
示例代码:
suspend fun demoBasicRetryWhen() {var attemptCount = 0flow {attemptCount++println("\u001B[36m🔄 第${attemptCount}次尝试...\u001B[0m")if (attemptCount == 5) {emit(attemptCount)} else {println("\u001B[31m💥 网络异常\u001B[0m")throw RuntimeException("网络不稳定")}}.retryWhen { cause, attempt ->println("\u001B[33m🛡️ 捕获异常: ${cause.message}, 重试次数: $attempt\u001B[0m")if (attempt <= 2) {println("\u001B[33m⏳ 1秒后重试\u001B[0m")delay(1000L)true // 继续重试} else {println("\u001B[31m❌ 超过最大重试次数\u001B[0m")false // 停止重试}}.catch {println("\u001B[31m[异常] ${it.message}\u001B[0m")emit(888)}.onCompletion {println("\u001B[32m✅ 完成,共尝试 ${attemptCount} 次\u001B[0m")}.collect { data ->println("\u001B[32m✅ 处理: $data\u001B[0m")}
}
终端操作符
概览
操作符 | 关键特性 | 类型 | 注意事项 |
---|---|---|---|
collectLatest | 收集最新,取消旧处理 | 终端 | 适合UI更新;避免长时间阻塞 |
collectIndexed | 带索引收集 | 终端 | 索引从0开始;不取消块 |
launchIn | 在作用域启动收集 | 终端 | 返回 Job ;非阻塞;可取消 |
produceIn | 转为 ReceiveChannel | 终端 | 作用域取消关闭通道;热化消费 |
toList | 收集为 List | 终端 | 等待流完成;注意内存 |
toSet | 收集为 Set | 终端 | 全局去重;等待完成 |
first/firstOrNull | 获取第一个元素 | 终端 | first 空流异常;*OrNull 更安全 |
last/lastOrNull | 获取最后一个元素 | 终端 | 必须等待完成;空流异常(用 *OrNull 更安全) |
single/singleOrNull | 获取唯一元素 | 终端 | 数量不为1抛异常;仅适用严格唯一场景 |
reduce | 归约为单值 | 终端 | 空流抛异常;需要至少一个元素 |
fold | 带初始值归约 | 终端 | 空流安全;返回初始值 |
count | 统计数量 | 终端 | 遍历所有元素;大流注意性能 |
all | 是否全部满足 | 终端 | 短路;空流返回 true |
any | 是否任意满足 | 终端 | 短路;空流返回 false |
collectLatest - 收集最新值
collectLatest
当新值到来时,会取消之前的收集操作,只处理最新的值。
示例代码:
suspend fun demoWithCollectLatest() {flow {repeat(5) { index ->println("\u001B[36m📤 发送数据: $index\u001B[0m")emit(index)delay(300L) // 快速发送}}.collectLatest { value ->println("\u001B[33m🔄 开始处理: $value\u001B[0m")delay(1000L) // 慢速处理println("\u001B[32m✅ 完成处理: $value\u001B[0m")}
}
launchIn - 在指定作用域启动
launchIn
在指定的协程作用域中启动流的收集,实际上就是 collect
的封装。
示例代码:
suspend fun demoWithLaunchIn() {val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())val flow = flow {repeat(3) { index ->println("\u001B[36m📤 发送数据: $index\u001B[0m")emit(index)delay(500L)}}println("\u001B[33m🚀 启动Flow收集...\u001B[0m")val job = flow.onEach { value ->println("\u001B[32m✅ 处理数据: $value\u001B[0m")}.launchIn(scope) // 在指定作用域中启动println("\u001B[35m💡 Flow在后台运行,当前协程可以继续执行其他任务\u001B[0m")// 模拟执行其他任务repeat(2) { index ->println("\u001B[33m🔧 执行其他任务: $index\u001B[0m")delay(600L)}job.join() // 等待Flow完成println("\u001B[33m⏹️ Flow收集完成\u001B[0m")scope.cancel() // 清理作用域
}
produceIn - flow 转为 ReceiveChannel
produceIn
在指定的 CoroutineScope
中启动对上游 Flow
的收集,并将元素发送到返回的ReceiveChannel
中。适合需要以Channel
方式消费的场景。
关键特性:
- 非阻塞:立即返回
ReceiveChannel
,生产与消费可并行。 - 作用域取消:
scope.cancel()
会关闭通道并取消上游收集。 - 通道协作:可用
for (x in channel)
、consumeEach {}
、select {}
等方式消费。
示例代码:(摘自 ProduceInExample.demoProduceInBasic
)
suspend fun demoProduceInBasic() {coroutineScope {val numbers: Flow<Int> = flow {repeat(5) { i ->delay(100)println("\u001B[36m📤 上游发射: $i\u001B[0m")emit(i)}}val channel = numbers.produceIn(this)for (value in channel) {println("\u001B[32m✅ 下游收到: $value\u001B[0m")}println("\u001B[33m⏹️ 上游完成,Channel 关闭\u001B[0m")}
}
collectIndexed - 带索引的收集
collectIndexed
是一个终端操作符,它在收集流中每个元素的同时,还提供该元素在流中的索引。这对于需要知道元素位置的场景非常有用。
函数签名:
public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit
): Unit
action
: 一个挂起 lambda,接收两个参数:index
:Int
类型,表示当前元素的索引,从 0 开始。value
:T
类型,表示当前元素的值。
使用场景:
- 当处理列表数据并需要显示其位置时(例如,“第 x 项:…”)。
- 在日志记录或调试中,需要知道事件发生的顺序。
- 当后续处理逻辑依赖于元素的位置时。
示例代码:
suspend fun demoCollectIndexed() {flowOf("A", "B", "C", "D").collectIndexed { index, value ->println("✅ 索引 $index: 收到值 '$value'")}
}
执行结果:
✅ 索引 0: 收到值 'A'
✅ 索引 1: 收到值 'B'
✅ 索引 2: 收到值 'C'
✅ 索引 3: 收到值 'D'
与 withIndex()
的区别:
withIndex()
是一个中间操作符,它将Flow<T>
转换为Flow<IndexedValue<T>>
。需要继续使用终端操作符(如collect
)来处理。collectIndexed
是一个终端操作符,它直接消费流并执行操作,代码更简洁。
选择哪个取决于你的需求。如果你需要在 collect
之前对带索引的数据进行进一步的转换(如 map
、filter
),withIndex()
更灵活。如果只是为了在最终收集中获取索引,collectIndexed
更直接。
toList - 转为List
toList
收集所有元素到一个 List 中。
示例代码:
suspend fun demoBasicToList() {println("\n\u001B[34m1) 基础toList:收集所有数据\u001B[0m")val numberFlow = flow {repeat(5) { index ->println("\u001B[36m📤 发送数据: $index\u001B[0m")emit(index)delay(200L)}}println("\u001B[33m🔄 开始收集Flow到List...\u001B[0m")val resultList = numberFlow.toList()println("\u001B[32m✅ 收集完成,结果: $resultList\u001B[0m")
}
toSet - 转为Set
toSet
收集所有元素到一个 Set 中,自动去重。
示例代码:
suspend fun demoBasicToSet() {println("\n\u001B[34m1) 基础toSet:收集数据并自动去重\u001B[0m")val numberFlow = flow {val numbers = listOf(1, 2, 3, 2, 4, 1, 5, 3)numbers.forEach { number ->println("\u001B[36m📤 发送数据: $number\u001B[0m")emit(number)delay(200L)}}println("\u001B[33m🔄 开始收集Flow到Set...\u001B[0m")val resultSet = numberFlow.toSet()println("\u001B[32m✅ 收集完成,结果: $resultSet\u001B[0m")
}
first/firstOrNull - 获取第一个
first
获取第一个元素,如果流为空会抛异常;获取后流会结束。
firstOrNull
尝试获取第一个元素,如果流为空,返回 null。获取后流会结束。
两种用法:
- 直接获取第一个数据:
suspend fun demoBasicFirst() {println("\n\u001B[34m1) 基础first:获取第一个元素\u001B[0m")val numberFlow = flow {repeat(5) { index ->println("\u001B[36m📤 发送数据: $index\u001B[0m")emit(index)delay(200L)}}println("\u001B[33m🔄 获取第一个元素...\u001B[0m")val firstElement = numberFlow.first()println("\u001B[32m✅ 第一个元素: $firstElement\u001B[0m")println("\u001B[35m💡 特点:获取到第一个元素后立即停止Flow\u001B[0m")
}
- 根据条件获取第一个数据:
suspend fun demoFirstWithPredicate() {println("\n\u001B[34m2) firstWithPredicate:根据条件获取第一个元素\u001B[0m")val numberFlow = flow {repeat(5) { index ->println("\u001B[36m📤 发送数据: $index\u001B[0m")emit(index)delay(200L)}}println("\u001B[33m🔄 获取第一个偶数...\u001B[0m")val firstEven = numberFlow.first { it % 2 == 0 }println("\u001B[32m✅ 第一个偶数: $firstEven\u001B[0m")println("\u001B[35m💡 特点:根据条件获取第一个元素后立即停止Flow\u001B[0m")
}
注意点: 使用 first
时要注意异常处理。优先考虑使用 firstOrNull
。
last/lastOrNull - 获取最后一个
last
获取最后一个元素。如果流为空,会抛异常。获取后流会结束。
lastOrNull
获取最后一个元素,获取不到则返回 null。获取后流会结束。
示例代码:
suspend fun demoBasicLast() {val numberFlow = flow {repeat(5) { index ->println("\u001B[36m📤 发送数据: $index\u001B[0m")emit(index)delay(200L)}}val lastElement = runCatching {numberFlow.last()}.getOrNull()val lastElementOrNull = numberFlow.lastOrNull()
}
single/singleOrNull - 获取唯一值
single
:流中只有一个元素,获取该元素。如果流为 null 或者流中有多个元素,抛异常singleOrNull
:流中只有一个元素,获取该元素。如果流为 null 或者流中有多个元素,返回 null
示例代码:
suspend fun demoBasicSingle() {// 只有一个元素的Flowval singleFlow = flowOf("唯一元素")val result = singleFlow.single()val singleOrNull = singleFlow.singleOrNull()
}
reduce - 归约
reduce
把流中的元素归约成一个值,不需要初始值。
特点:
- 归约操作:将
Flow
中的所有元素归约为单个值 - 累积计算:使用第一个元素作为初始累积值
- 顺序处理:按顺序处理每个元素进行累积
示例代码:
suspend fun demoBasicReduce() {val numberFlow = flowOf(1, 2, 3, 4, 5)println("\u001B[33m🔄 对数字进行求和归约...\u001B[0m")val sum = numberFlow.reduce { accumulator, value ->println("\u001B[36m📊 累加: $accumulator + $value = ${accumulator + value}\u001B[0m")accumulator + value}println("\u001B[32m✅ 最终结果: $sum\u001B[0m")println("\u001B[35m💡 特点:使用第一个元素作为初始值\u001B[0m")
}
fold - 带初始值归约
fold
类似 reduce
,但可以指定初始值和返回不同类型。
注意点: 针对空流时要处理异常。
示例代码:
suspend fun demoTypeConversion() {val numberFlow = flowOf(10, 25, 30, 45, 50)println("\u001B[33m📊 原始数据: 10, 25, 30, 45, 50\u001B[0m")// 将数字累积成统计信息字符串val statistics = numberFlow.fold("统计信息:\n") { acc, value ->val newAcc = acc + "- 数值: $value (${if (value >= 30) "高" else "低"})\n"println("\u001B[36m🔄 添加统计: $value\u001B[0m")newAcc}println("\u001B[32m✅ 最终结果:\u001B[0m")println(statistics)
}
count - 计数
count
计算流中元素的数量,可以带条件。
示例代码:
suspend fun demoConditionalCount() {println("\n\u001B[34m2) 条件计数:统计满足条件的元素\u001B[0m")val numberFlow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)println("\u001B[33m📊 原始数据: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10\u001B[0m")val totalCount = numberFlow.count()println("\u001B[32m✅ 元素总数: $totalCount\u001B[0m")// 统计偶数val evenCount = numberFlow.count { it % 2 == 0 }println("\u001B[32m✅ 偶数个数: $evenCount\u001B[0m")// 统计大于5的数val greaterThan5Count = numberFlow.count { it > 5 }println("\u001B[32m✅ 大于5的数: $greaterThan5Count\u001B[0m")// 统计能被3整除的数val divisibleBy3Count = numberFlow.count { it % 3 == 0 }println("\u001B[32m✅ 能被3整除的数: $divisibleBy3Count\u001B[0m")
}
all - 全部满足条件
all
检查是否所有元素都满足条件,返回 Boolean
值。
特点:
- 短路求值:遇到第一个不满足条件的元素立即返回 false
- 空
Flow
处理:空Flow
返回 true
示例代码:
suspend fun demoBasicAll() {val positiveNumbers = flowOf(1, 2, 3, 4, 5)val allPositive = positiveNumbers.all { it > 0 }println("所有数字都是正数: $allPositive")
}
any - 任一满足条件
any
检查是否有任意元素满足条件。
特点:
- 终端操作符:返回 Boolean 类型结果
- 短路求值:遇到第一个满足条件的元素立即返回 true
- 空
Flow
处理:空Flow
返回 false(空集合的存在量词为假)
示例代码:
suspend fun demoBasicAny() {val mixedNumbers = flowOf(1, 3, 5, 7, 8, 9)// 检查是否有偶数val hasEven = mixedNumbers.any { it % 2 == 0 }println("是否包含偶数: $hasEven")
}
自定义操作符
如果有一些个性化的需求官方的操作符满足不了,我们还可以自定义操作符。
自定义操作符的基本步骤:
- 中间操作符:使用
flow { }
构建器,内部collect
并emit
,返回值还是个Flow
- 终端操作符:直接对
Flow
调用collect
并返回结果,返回值非Flow
类型
自定义中间操作符
// 自定义中间操作符 - 过滤出偶数
fun <T : Number> Flow<T>.filterEven(): Flow<T> = flow {collect { value ->if (value.toInt() % 2 == 0) {emit(value)}}
}
自定义终端操作符
// 自定义终端操作符 - 计算所有元素的总和
suspend fun <T : Number> Flow<T>.sum(): Int {var sum = 0collect { value ->sum += value.toInt()}return sum
}
总结
Flow
的操作符体系非常丰富且设计精良,而且这些操作符基本上跟 kotlin
中集合的一些扩展 API 非常相似,概念甚至用法几乎都一样。
掌握这些操作符,对我们构建出高效、优雅、可维护的响应式数据处理系统有很大的帮助。记住:理解语义比记住 API 更重要,组合使用比单独使用更强大。
这么多操作符,其实常用的也就那些,比如 map
、filter
、reduce
、count
、all
、any
等。其中,带 transform
字样的操作符相对通用,且更加的灵活强,其他的操作符大多都是基于这些操作符的组合实现的。
记不住也没关系,有个印象即可,需要的时候翻阅文档就是。