Kotlin协程(六)Flow流
在 Kotlin 中,Flow
是一种 异步数据流,用于连续或延迟生成数据,并在不同协程之间传输数据。我一般做定时任务或者倒计时会用到这个类。
一、为什么需要 Flow
?
在协程中,如果我们要连续返回多个值,可以使用:
- 集合 (
List
):一次性返回所有数据,但不能支持异步或无限数据流。 - 回调 (
Callback
):可以异步获取数据,但不够优雅、易用。 Flow
:支持异步、序列化、流式数据处理,更适用于长时间运行的任务。
二、Flow的基本用法
在创建 Flow
对象的时候我们也需要调用 emit
方法发射数据,collect
方法用来消费收集数据。
emit(value):
收集上游的值并发出。不是线程安全,不应该并发调用。线程安全请使用channelFlow
而不是flow
。collect()
:接收给定收集器emit()
发出的值。它是一个挂起函数,在所在作用域的线程上执行。flow
的代码块只有调用collected()
才开始运行
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 创建 Flow
val flow = flow {
for (i in 1..5) {
delay(1000) // 模拟异步操作
emit(i) // 发送数据
}
}
// 收集 Flow
flow.collect { value ->
println("Received: $value")
}
}
三、Flow 的特点
-
顺序执行(上游
emit()
之后,下游collect()
才会接收) -
冷流(
Flow
只有在collect()
时才会开始执行),所谓冷数据流,就是只有消费时才会生产的数据流 -
可异步(可使用
delay()
等挂起函数) -
背压控制(Flow 内部可以控制数据流速)
四、Flow
的构建方式
1.flow {}
,最常见的方式,使用 emit(value)
发送数据。
val myFlow = flow {
emit(1)
emit(2)
emit(3)
}
2.flowOf(vararg elements)
,直接创建一个 Flow
。
val flow = flowOf(1, 2, 3, 4, 5)
3.asFlow()
把集合或序列转换成 Flow
。
val flow = listOf(1, 2, 3, 4, 5).asFlow()
五、Flow 的基础操作符
map {} - 数据转换
flowOf(1, 2, 3, 4)
.map { it * 10 }
.collect { println(it) } // 输出: 10, 20, 30, 40
===============================
filter {} - 过滤数据
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.collect { println(it) } // 输出: 2, 4
===============================
transform {} - 自定义转换
flowOf(1, 2, 3, 4)
.transform { emit("Number: $it") }
.collect { println(it) }
//输出Number: 1,Number: 2,Number: 3,Number: 4
===============================
take(n) - 只获取前 n 个
flowOf(1, 2, 3, 4, 5)
.take(3)
.collect { println(it) } // 输出: 1, 2, 3
六、Flow 取消 & 超时
6.1 cancel取消
Flow
没有提供取消操作,Flow
的消费依赖于collect
末端操作符,而它们又必须在协程当中调用,因此Flow
的取消主要依赖于末端操作符所在的协程的状态。
val job = launch {
flow {
for (i in 1..10) {
delay(500)
emit(i)
}
}.collect { println(it) }
}
// 2 秒后取消
delay(2000)
job.cancel()
println("Flow cancelled")
6.2 timeout()
- 设置超时
flow {
for (i in 1..10) {
delay(500)
emit(i)
}
}.timeout(2000) // 2 秒超时
.collect { println(it) }
七、线程切换
RxJava 也是一个基于响应式编程模型的异步框架,牛逼的地方就是切换线程。提供了两个切换调度器的 API 分别是 subscribeOn
和 observeOn
,Flow
也可以设定它运行时所使用的调度器,它更加简单,只需使用flowOn
就可以了:
lifecycleScope.launch {
//创建一个Flow<T>
flow {
for (i in 1..3) {
delay(200)
emit(i)//从流中发出值
}
}.flowOn(Dispatchers.IO)//将上面的数据发射操作放到 IO 线程中的协程
.collect { value ->
// 具体数据的消费处理
}
}
通过flowOn()
改变的是Flow
函数内部发射数据时的线程,而在collect
收集数据时会自动切回创建Flow
时的线程。
Tips:flowOn()
只影响它之前(上游)的 flow
代码,所以只能用在 flow{}
之后,不能放在 collect{}
之后。
八、异常捕获
8.1使用flow的catch方法
-
catch
不会影响 collect() 的执行,如果发生异常,它会在collect()
之前处理异常。 -
但
catch
不能捕获collect
代码中的异常!
lifecycleScope.launch {
flow {
emit(1)
emit(2)
throw RuntimeException("Flow 出错了!") // ❌ 故意抛异常
}
.catch { e -> // ✅ 只会捕获 Flow emit() 的异常
println("Caught exception: ${e.message}")
}
.collect { value ->
println("Collected: $value")
}
}
Collected: 1
Collected: 2
Caught exception: Flow 出错了!
8.2 try-catch和catch的比较
方式 | 能捕获 emit() 的异常? | 能捕获 collect() 的异常? |
---|---|---|
catch {} | ✅ 能 | ❌ 不能 |
try-catch | ✅ 能 | ✅ 能 |
九、retryWhen()重试
lifecycleScope.launch {
flow {
emit(1)
throw RuntimeException("Flow 出错了!") // ❌ 故意抛异常
}
.retryWhen { e, attempt -> // attempt: 第几次重试
println("Retrying... attempt $attempt")
attempt < 3 // ✅ 只重试 3 次
}
.catch { e ->
println("Caught exception: ${e.message}")
}
.collect { value ->
println("Collected: $value")
}
}
打印:
Collected: 1
Retrying... attempt 0
Collected: 1
Retrying... attempt 1
Collected: 1
Retrying... attempt 2
Caught exception: Flow 出错了!
十、Flow、Channel、RxJava 对比
10.1 Channel(热流)
- 热流(Hot Stream):启动后就会产生数据,即使没有
collect()
也会持续运行。 - 支持多生产者-多消费者(类似消息队列)。
- 需要手动关闭,否则会一直等待数据。
- 适用于高频数据流,但 默认不支持背压(可以用
BufferedChannel
处理)。
val channel = Channel<Int>()
lifecycleScope.launch {
for (i in 1..3) {
delay(200)
channel.send(i) // 发送数据
}
channel.close() // 需要手动关闭
}
lifecycleScope.launch {
for (value in channel) {
println("Received: $value")
}
}
10.2 RxJava(响应式编程)
- 功能丰富(
map()
、flatMap()
、combineLatest()
、throttle()
)。 - 支持背压(
Flowable
)。 - 支持线程切换(
observeOn()
、subscribeOn()
)。 - 支持生命周期管理(
CompositeDisposable
)。
val observable = Observable.create<Int> { emitter ->
for (i in 1..3) {
Thread.sleep(200) // 模拟数据产生
emitter.onNext(i) // 发送数据
}
emitter.onComplete()
}
observable
.subscribeOn(Schedulers.io()) // 切换线程
.observeOn(AndroidSchedulers.mainThread()) // 观察线程
.subscribe { value ->
println("Received: $value")
}
RxJava概括起来说,就是很牛,他的功能绝不是我上面的介绍的Flow就能概括的,但是API有点复杂,生命周期还需要自己去管理,用了一两个项目后就用协程,viewmodel这些东西了。
10.3 比较详情
特性 | Flow | Channel | RxJava |
---|---|---|---|
类型 | 冷流(Cold Stream) | 热流(Hot Stream) | 热流(Hot Stream) |
数据生产方式 | 按需生产(Lazy) | 持续生产(Eager) | 持续生产(Eager) |
是否支持背压 | ✅ 自动支持 | ❌ 默认不支持(需 capacity ) | ✅ Flowable 支持 |
是否支持多生产者-多消费者 | ❌ 不支持 | ✅ 支持 | ✅ 支持 |
生命周期管理 | ✅ 协程自动管理 | ✅ 协程自动管理 | ❌ 需要手动 dispose() |
数据转换(map、flatMap) | ✅ 支持 | ❌ 不支持 | ✅ 强大 |
线程切换 | ✅ flowOn() | ❌ 不支持(手动切换) | ✅ subscribeOn() observeOn() |
适用于 Android? | ✅ 现代化、推荐 | ✅ 适用于协程通信 | ✅ 老项目更常用 |
学习难度 | ⭐ 简单 | ⭐⭐⭐ 中等 | ⭐⭐⭐⭐ 复杂 |
10.4 使用场景
你想要的功能 | 推荐的方案 |
---|---|
按需获取数据(如数据库、API 响应) | ✅ Flow |
生产者-消费者模式(如任务队列、多生产者) | ✅ Channel |
组合数据流、事件流(如 UI 事件、按钮点击) | ✅ RxJava |
支持复杂操作符(如 debounce, throttle, merge) | ✅ RxJava |
简单、现代化、协程友好 | ✅ Flow |
十一、背压
背压(Backpressure) 是指在数据流中,数据生产速度大于消费速度 时,如何处理过量的数据的问题。
如果 数据生产者(Producer) 太快,而 数据消费者(Consumer) 处理不过来,就会导致 OOM(内存溢出) 或 数据丢失。
11.1 Flow 背压
Flow 是挂起(suspend)式流,当 collect()
处理不过来时,Flow 会自动暂停,等消费者处理完再继续。
val flow = flow {
repeat(1000) { i ->
emit(i) // 生产数据
println("Produced: $i")
}
}.flowOn(Dispatchers.IO) // 生产者运行在 IO 线程
lifecycleScope.launch {
flow.collect { value ->
delay(100) // 模拟慢速消费
println("Consumed: $value")
}
}
打印:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
...
- Flow 自动挂起,等
collect()
处理完一个数据后,再生产下一个数据。 - 不会内存溢出,因为它是 按需生产(Cold Stream)。
Flow 默认是 顺序执行(类似同步代码),即:
- 生产者(emit)会被挂起,直到 消费者(collect)处理完当前值 才能继续。
- 不会丢弃数据,但可能会导致生产者等待过久,降低吞吐量。
💡 问题:如果 emit
很快,collect
很慢,生产者就会一直等待。
如何优化?使用 buffer/conflate/collectLatest
buffer(capacity)
- 让生产者继续工作
- 让生产者不被挂起,数据会先存到 缓冲区(默认容量 64)。
- 消费者依旧一个个处理,但不会影响生产者的速度。
flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}.buffer() // 加 buffer,让 emit 不会被挂起
.collect { value ->
delay(1000) // 模拟耗时处理
println("Collected $value")
}
打印:
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Collected 1
Collected 2
...
🔹 适用场景:生产者很快,消费者较慢,但不想丢失数据。
conflate()
- 丢弃旧值,只保留最新
- 生产者继续发射新数据,不会等待消费者。
- 如果消费者来不及处理,就丢弃旧数据,只处理最新的。
flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}.conflate() // 只保留最新值
.collect { value ->
delay(1000) // 处理较慢
println("Collected $value")
}
打印:
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Collected 1
Collected 5
conflate()
不会缓存所有数据,如果 emit
速度快,collect
速度慢,中间的值会被丢弃,消费者只会收到最新的值。
🔹 适用场景:
- UI 频繁更新(比如滑动时的坐标、传感器数据),只关心最新数据,不需要每个中间值
collectLatest
- 取消上一个,处理最新值
- 每次有新数据到来,取消上一个任务,只处理最新的。
- 适用于耗时操作,避免处理过时数据。
flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}.collectLatest { value ->
println("Start collecting $value")
delay(1000) // 模拟耗时操作
println("Collected $value") // 只有最新的值才会被完整处理
}
打印:
Emitting 1
Start collecting 1
Emitting 2
Start collecting 2 // 取消 1
Emitting 3
Start collecting 3 // 取消 2
Emitting 4
Start collecting 4 // 取消 3
Emitting 5
Start collecting 5 // 取消 4
Collected 5
适用场景:
- 搜索框输入(只处理最新的查询)
- 按钮点击事件(只执行最后一次点击)
- 网络请求(上次请求没完成,新的请求来了,取消旧的)
总结:Flow 背压处理方式对比
方法 | 生产者是否等待消费者? | 数据丢失? | 适用场景 |
---|---|---|---|
默认 | 是(生产者被挂起) | 否 | 一般情况 |
buffer() | 否(有缓存) | 否 | 高速生产,低速消费 |
conflate() | 否(不缓存旧数据) | 是(丢弃旧值) | UI 刷新、传感器数据 |
collectLatest() | 否(取消前一个) | 是(取消未完成的) | 搜索、按钮点击、网络请求 |
11.2 Channel 背压
Channel 默认不支持背压,生产者会无限制地发送数据,可能导致 OOM。但可以手动设置 capacity
(缓冲区大小) 解决背压问题。
关键代码在第一行哈!capacity = 10
让生产者最多存 10 个数据,多了就 自动阻塞。
val channel = Channel<Int>(capacity = 10) // 设置缓冲区大小
lifecycleScope.launch {
repeat(1000) { i ->
channel.send(i) // 如果缓冲区满了,send() 会挂起等待
println("Produced: $i")
}
}
lifecycleScope.launch {
for (value in channel) {
delay(100) // 模拟慢速消费
println("Consumed: $value")
}
}
11.3 RxJava 背压
RxJava 默认不支持背压,但 Flowable
可以用不同的策略处理背压:
Flowable.create<Int>({ emitter ->
repeat(1000) { i ->
emitter.onNext(i) // 快速生产数据
println("Produced: $i")
}
}, BackpressureStrategy.DROP) // 选择背压策略
.observeOn(Schedulers.io()) // 线程切换
.subscribe { value ->
Thread.sleep(100) // 模拟慢速消费
println("Consumed: $value")
}
背压策略
策略 | 作用 |
---|---|
BUFFER | 无限制存储,可能导致 OOM |
DROP | 丢弃过量数据 |
LATEST | 只保留最新数据,丢弃旧数据 |
11.4 背压对比总结
方案 | 背压支持 | 处理方式 | 适用场景 |
---|---|---|---|
Flow | ✅ 天然支持 | 挂起(suspend),生产者等待消费者 | API 响应、数据库查询 |
Channel | ❌ 默认不支持 | 需手动 capacity 处理 | 消息队列、多生产者-多消费者 |
RxJava | ✅ Flowable 支持 | BUFFER 、DROP 、LATEST | UI 事件流、高频数据流 |