当前位置: 首页 > news >正文

Kotlin协程(六)Flow流

在 Kotlin 中,Flow 是一种 异步数据流,用于连续或延迟生成数据,并在不同协程之间传输数据。我一般做定时任务或者倒计时会用到这个类。

一、为什么需要 Flow

在协程中,如果我们要连续返回多个值,可以使用:

  1. 集合 (List):一次性返回所有数据,但不能支持异步或无限数据流。
  2. 回调 (Callback):可以异步获取数据,但不够优雅、易用。
  3. Flow:支持异步、序列化、流式数据处理,更适用于长时间运行的任务

二、Flow的基本用法

在创建 Flow 对象的时候我们也需要调用 emit 方法发射数据,collect 方法用来消费收集数据。

  • emit(value):收集上游的值并发出。不是线程安全,不应该并发调用。线程安全请使用channelFlow而不是flow
  • collect():接收给定收集器emit()发出的值。它是一个挂起函数,在所在作用域的线程上执行。
  • flow的代码块只有调用collected()才开始运行
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // 创建 Flow
    val flow = flow {
        for (i in 1..5) {
            delay(1000) // 模拟异步操作
            emit(i) // 发送数据
        }
    }

    // 收集 Flow
    flow.collect { value ->
        println("Received: $value")
    }
}

三、Flow 的特点

  1. 顺序执行(上游 emit() 之后,下游 collect() 才会接收)

  2. 冷流Flow 只有在 collect() 时才会开始执行),所谓冷数据流,就是只有消费时才会生产的数据流

  3. 可异步(可使用 delay() 等挂起函数)

  4. 背压控制(Flow 内部可以控制数据流速)

四、Flow 的构建方式

1.flow {},最常见的方式,使用 emit(value) 发送数据。

val myFlow = flow {
    emit(1)
    emit(2)
    emit(3)
}

2.flowOf(vararg elements),直接创建一个 Flow

val flow = flowOf(1, 2, 3, 4, 5)

3.asFlow()把集合或序列转换成 Flow

val flow = listOf(1, 2, 3, 4, 5).asFlow()

五、Flow 的基础操作符

map {} - 数据转换
flowOf(1, 2, 3, 4)
    .map { it * 10 }
    .collect { println(it) }  // 输出: 10, 20, 30, 40
===============================
filter {} - 过滤数据
flowOf(1, 2, 3, 4, 5)
    .filter { it % 2 == 0 }
    .collect { println(it) }  // 输出: 2, 4
===============================
transform {} - 自定义转换
flowOf(1, 2, 3, 4)
    .transform { emit("Number: $it") }
    .collect { println(it) }
//输出Number: 1,Number: 2,Number: 3,Number: 4
===============================
take(n) - 只获取前 n 个
flowOf(1, 2, 3, 4, 5)
    .take(3)
    .collect { println(it) }  // 输出: 1, 2, 3

六、Flow 取消 & 超时

6.1 cancel取消

Flow没有提供取消操作,Flow的消费依赖于collect末端操作符,而它们又必须在协程当中调用,因此Flow的取消主要依赖于末端操作符所在的协程的状态。

val job = launch {
    flow {
        for (i in 1..10) {
            delay(500)
            emit(i)
        }
    }.collect { println(it) }
}

// 2 秒后取消
delay(2000)
job.cancel()
println("Flow cancelled")

6.2 timeout() - 设置超时

flow {
    for (i in 1..10) {
        delay(500)
        emit(i)
    }
}.timeout(2000) // 2 秒超时
.collect { println(it) }

七、线程切换

RxJava 也是一个基于响应式编程模型的异步框架,牛逼的地方就是切换线程。提供了两个切换调度器的 API 分别是 subscribeOn 和 observeOnFlow也可以设定它运行时所使用的调度器,它更加简单,只需使用flowOn就可以了:

lifecycleScope.launch {
    //创建一个Flow<T>
    flow {
        for (i in 1..3) {
            delay(200)
            emit(i)//从流中发出值
        }
    }.flowOn(Dispatchers.IO)//将上面的数据发射操作放到 IO 线程中的协程
            .collect { value ->
                // 具体数据的消费处理
            }
}

通过flowOn()改变的是Flow函数内部发射数据时的线程,而在collect收集数据时会自动切回创建Flow时的线程。
Tips:flowOn() 只影响它之前(上游)的 flow 代码,所以只能用在 flow{} 之后,不能放在 collect{} 之后。

八、异常捕获

8.1使用flow的catch方法

  • catch不会影响 collect() 的执行,如果发生异常,它会在 collect() 之前处理异常。

  • catch不能捕获 collect代码中的异常!

lifecycleScope.launch {
    flow {
        emit(1)
        emit(2)
        throw RuntimeException("Flow 出错了!") // ❌ 故意抛异常
    }
    .catch { e -> // ✅ 只会捕获 Flow emit() 的异常
        println("Caught exception: ${e.message}")
    }
    .collect { value ->
        println("Collected: $value")
    }
}
Collected: 1
Collected: 2
Caught exception: Flow 出错了!

8.2 try-catch和catch的比较

方式能捕获 emit() 的异常?能捕获 collect() 的异常?
catch {}不能
try-catch

九、retryWhen()重试

lifecycleScope.launch {
    flow {
        emit(1)
        throw RuntimeException("Flow 出错了!") // ❌ 故意抛异常
    }
    .retryWhen { e, attempt -> // attempt: 第几次重试
        println("Retrying... attempt $attempt")
        attempt < 3 // ✅ 只重试 3 次
    }
    .catch { e ->
        println("Caught exception: ${e.message}")
    }
    .collect { value ->
        println("Collected: $value")
    }
}
打印:
Collected: 1
Retrying... attempt 0
Collected: 1
Retrying... attempt 1
Collected: 1
Retrying... attempt 2
Caught exception: Flow 出错了!

十、Flow、Channel、RxJava 对比

10.1 Channel(热流)

  • 热流(Hot Stream):启动后就会产生数据,即使没有 collect() 也会持续运行。
  • 支持多生产者-多消费者(类似消息队列)。
  • 需要手动关闭,否则会一直等待数据。
  • 适用于高频数据流,但 默认不支持背压(可以用 BufferedChannel 处理)。
val channel = Channel<Int>()

lifecycleScope.launch {
    for (i in 1..3) {
        delay(200)
        channel.send(i) // 发送数据
    }
    channel.close() // 需要手动关闭
}

lifecycleScope.launch {
    for (value in channel) {
        println("Received: $value")
    }
}

10.2 RxJava(响应式编程)

  • 功能丰富map()flatMap()combineLatest()throttle())。
  • 支持背压Flowable)。
  • 支持线程切换observeOn()subscribeOn())。
  • 支持生命周期管理CompositeDisposable)。
val observable = Observable.create<Int> { emitter ->
    for (i in 1..3) {
        Thread.sleep(200) // 模拟数据产生
        emitter.onNext(i) // 发送数据
    }
    emitter.onComplete()
}

observable
    .subscribeOn(Schedulers.io()) // 切换线程
    .observeOn(AndroidSchedulers.mainThread()) // 观察线程
    .subscribe { value ->
        println("Received: $value")
    }

RxJava概括起来说,就是很牛,他的功能绝不是我上面的介绍的Flow就能概括的,但是API有点复杂,生命周期还需要自己去管理,用了一两个项目后就用协程,viewmodel这些东西了。


10.3 比较详情

特性FlowChannelRxJava
类型冷流(Cold Stream)热流(Hot Stream)热流(Hot Stream)
数据生产方式按需生产(Lazy)持续生产(Eager)持续生产(Eager)
是否支持背压自动支持默认不支持(需 capacityFlowable 支持
是否支持多生产者-多消费者不支持支持支持
生命周期管理协程自动管理协程自动管理需要手动 dispose()
数据转换(map、flatMap)支持不支持强大
线程切换flowOn()不支持(手动切换)subscribeOn() observeOn()
适用于 Android?现代化、推荐适用于协程通信老项目更常用
学习难度简单⭐⭐⭐ 中等⭐⭐⭐⭐ 复杂

10.4 使用场景

你想要的功能推荐的方案
按需获取数据(如数据库、API 响应)Flow
生产者-消费者模式(如任务队列、多生产者)Channel
组合数据流、事件流(如 UI 事件、按钮点击)RxJava
支持复杂操作符(如 debounce, throttle, merge)RxJava
简单、现代化、协程友好Flow

十一、背压

背压(Backpressure) 是指在数据流中,数据生产速度大于消费速度 时,如何处理过量的数据的问题。
如果 数据生产者(Producer) 太快,而 数据消费者(Consumer) 处理不过来,就会导致 OOM(内存溢出)数据丢失

11.1 Flow 背压

Flow 是挂起(suspend)式流,当 collect() 处理不过来时,Flow 会自动暂停,等消费者处理完再继续。

val flow = flow {
    repeat(1000) { i ->
        emit(i) // 生产数据
        println("Produced: $i")
    }
}.flowOn(Dispatchers.IO) // 生产者运行在 IO 线程

lifecycleScope.launch {
    flow.collect { value ->
        delay(100) // 模拟慢速消费
        println("Consumed: $value")
    }
}
打印:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
...
  • Flow 自动挂起,等 collect() 处理完一个数据后,再生产下一个数据。
  • 不会内存溢出,因为它是 按需生产(Cold Stream)

Flow 默认是 顺序执行(类似同步代码),即:

  • 生产者(emit)会被挂起,直到 消费者(collect)处理完当前值 才能继续。
  • 不会丢弃数据,但可能会导致生产者等待过久,降低吞吐量。

💡 问题:如果 emit 很快,collect 很慢,生产者就会一直等待。

如何优化?使用 buffer/conflate/collectLatest

buffer(capacity) - 让生产者继续工作

  • 让生产者不被挂起,数据会先存到 缓冲区(默认容量 64)。
  • 消费者依旧一个个处理,但不会影响生产者的速度。
flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}.buffer() // 加 buffer,让 emit 不会被挂起
.collect { value ->
    delay(1000) // 模拟耗时处理
    println("Collected $value")
}
打印:
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Collected 1
Collected 2
...

🔹 适用场景:生产者很快,消费者较慢,但不想丢失数据。

conflate() - 丢弃旧值,只保留最新

  • 生产者继续发射新数据,不会等待消费者
  • 如果消费者来不及处理,就丢弃旧数据,只处理最新的。
flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}.conflate() // 只保留最新值
.collect { value ->
    delay(1000) // 处理较慢
    println("Collected $value")
}
打印:
Emitting 1
Emitting 2
Emitting 3
Emitting 4
Emitting 5
Collected 1
Collected 5

 conflate() 不会缓存所有数据,如果 emit 速度快,collect 速度慢,中间的值会被丢弃,消费者只会收到最新的值。

🔹 适用场景

  • UI 频繁更新(比如滑动时的坐标、传感器数据),只关心最新数据,不需要每个中间值

collectLatest - 取消上一个,处理最新值

  • 每次有新数据到来,取消上一个任务,只处理最新的。
  • 适用于耗时操作,避免处理过时数据。
flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}.collectLatest { value ->
    println("Start collecting $value")
    delay(1000) // 模拟耗时操作
    println("Collected $value") // 只有最新的值才会被完整处理
}
打印:
Emitting 1
Start collecting 1
Emitting 2
Start collecting 2  // 取消 1
Emitting 3
Start collecting 3  // 取消 2
Emitting 4
Start collecting 4  // 取消 3
Emitting 5
Start collecting 5  // 取消 4
Collected 5

适用场景

  • 搜索框输入(只处理最新的查询)
  • 按钮点击事件(只执行最后一次点击)
  • 网络请求(上次请求没完成,新的请求来了,取消旧的)

总结:Flow 背压处理方式对比

方法生产者是否等待消费者?数据丢失?适用场景
默认是(生产者被挂起)一般情况
buffer()否(有缓存)高速生产,低速消费
conflate()否(不缓存旧数据)是(丢弃旧值)UI 刷新、传感器数据
collectLatest()否(取消前一个)是(取消未完成的)搜索、按钮点击、网络请求
11.2 Channel 背压

Channel 默认不支持背压,生产者会无限制地发送数据,可能导致 OOM。但可以手动设置 capacity(缓冲区大小) 解决背压问题。

关键代码在第一行哈!capacity = 10 让生产者最多存 10 个数据,多了就 自动阻塞

val channel = Channel<Int>(capacity = 10) // 设置缓冲区大小

lifecycleScope.launch {
    repeat(1000) { i ->
        channel.send(i) // 如果缓冲区满了,send() 会挂起等待
        println("Produced: $i")
    }
}

lifecycleScope.launch {
    for (value in channel) {
        delay(100) // 模拟慢速消费
        println("Consumed: $value")
    }
}
11.3 RxJava 背压

RxJava 默认不支持背压,但 Flowable 可以用不同的策略处理背压:

Flowable.create<Int>({ emitter ->
    repeat(1000) { i ->
        emitter.onNext(i) // 快速生产数据
        println("Produced: $i")
    }
}, BackpressureStrategy.DROP) // 选择背压策略
    .observeOn(Schedulers.io()) // 线程切换
    .subscribe { value ->
        Thread.sleep(100) // 模拟慢速消费
        println("Consumed: $value")
    }

背压策略

策略作用
BUFFER无限制存储,可能导致 OOM
DROP丢弃过量数据
LATEST只保留最新数据,丢弃旧数据
11.4 背压对比总结
方案背压支持处理方式适用场景
Flow天然支持挂起(suspend),生产者等待消费者API 响应、数据库查询
Channel默认不支持需手动 capacity 处理消息队列、多生产者-多消费者
RxJavaFlowable 支持BUFFERDROPLATESTUI 事件流、高频数据流

 

相关文章:

  • 掌握高效大模型任务流搭建术(二):链式流程如何赋能 AI 处理能力提升
  • Chapter 1 Introduction
  • qt open3dAlpha重建
  • proto3语法
  • 修改git fetch后引用没更新
  • 1493. 删掉一个元素以后全为 1 的最长子数组
  • Redis - 核心原理深度解析:线程模型、持久化与高可用性
  • TensorFlow深度学习实战(10)——迁移学习详解
  • Swagger UI界面的使用
  • 系统架构设计师教材:数据库设计基础知识
  • 从零开始的 Kafka 学习(二)| 集群启动
  • 深入解析 Umi-OCR:高效的免费开源 OCR 文字识别工具
  • OpenCV计算摄影学(15)无缝克隆(Seamless Cloning)调整图像颜色的函数colorChange()
  • Spring实战spring-ai运行
  • fastapi+mysql实现问卷调查系统
  • DeepSeek+知识库+鸿蒙,助力鸿蒙高效开发
  • Java集合面试题(持续更新)
  • 动态规划01背包问题系列一>最后一块石头的重量II
  • Spring Boot使用JDBC /JPA访问达梦数据库
  • 安卓免费多功能工具:一站式解决 PDF 阅读、编辑、转换等需求
  • 益阳通报“河水颜色异常有死鱼”:未发现排污,原因待鉴定
  • 全球前瞻|特朗普19日将与俄乌总统分别通话,英国脱欧后首开英欧峰会
  • 家庭医生可提前5天预约三甲医院号源,上海常住人口签约率达45%,
  • 全中国最好的十个博物馆展陈选出来了!
  • 新华每日电讯:博物馆正以可亲可近替代“高冷范儿”
  • 被围观的“英之园”,谁建了潮汕天价违建?