deepseek Kotlin Flow 全面详解
deepseek总结的文档,十分靠谱!微调了少量内容。
1. Flow 基础概念
1.1 什么是 Flow
Flow 是 Kotlin 协程库中的异步数据流处理工具,类似于 RxJava 的 Observable,但更简洁、与协程深度集成。
1.2 Flow 的特点
- 冷流(Cold Stream):有收集者时才执行
- 可取消的:基于协程的取消机制
- 背压感知:自动处理生产消费速度不匹配
- 丰富的操作符:类似集合操作但支持异步
2. Flow 的创建方式
2.1 基础创建方法
// 1. flow{} 构建器 - 最常用
fun simpleFlow(): Flow<Int> = flow {for (i in 1..3) {delay(100) // 可以调用挂起函数emit(i) // 发射值}
}// 2. asFlow() 扩展 - 从集合转换
fun collectionToFlow(): Flow<Int> = listOf(1, 2, 3).asFlow()// 3. flowOf() - 从固定值创建
fun fixedFlow(): Flow<Int> = flowOf(1, 2, 3)// 4. channelFlow - 更灵活的构建器
fun channelFlowExample(): Flow<Int> = channelFlow {send(1)withContext(Dispatchers.IO) {send(2) // 可以在不同上下文发送}send(3)close()
}
2.2 channelFlow{} vs flow{}
channelFlow和flow是Kotlin协程中处理数据流的两种方式,主要区别体现在以下几个方面:
- 数据流特性
flow是冷数据流,只有在订阅者开始收集数据时才会触发数据生产,且每个订阅者会独立获得完整的数据序列。它类似于电梯,只有调用collect时才启动数据生产,结束后立即停止。
channelFlow是热数据流的一种扩展,允许在数据生产时切换协程上下文,并内置缓冲区(默认64)。它更像地铁自动扶梯,数据生产独立于消费存在。 - 上下文限制
flow严格要求在同一个协程上下文中发射数据,不允许通过withContext切换调度器。
channelFlow通过send方法支持跨协程发送数据,内部使用Channel实现多协程并发生产。允许使用withContext。 - 缓冲机制
flow需要显式调用.buffer()操作符添加缓冲功能。
channelFlow天然具备缓冲区(也可通过.buffer()配置大小),且支持异步合并多个流(如通过launch并发收集)。 - 生命周期控制
flow的生命周期与收集操作绑定,收集结束即终止。
channelFlow可通过awaitClose保持通道开放,直到显式关闭。
典型使用场景
flow适用于单次数据提供场景(如数据库查询,API请求)。
channelFlow适合事件合并、跨协程数据生产或需要背压控制的场景(如合并多个API响应)。
3. Flow 的终端操作符
3.1 收集操作符
suspend fun terminalOperators() {val numberFlow = (1..5).asFlow()// 1. collect - 基本收集numberFlow.collect { value -> println("收集: $value") }// 2. launchIn - 在指定作用域启动收集numberFlow.onEach { println("launchIn: $it") }.launchIn(CoroutineScope(Dispatchers.IO))// 3. toList/toSet - 转换为集合val list = numberFlow.toList()val set = numberFlow.toSet()println("转换为List: $list")// 4. first/firstOrNull - 获取第一个元素val first = numberFlow.first()val firstOrNull = emptyFlow<Int>().firstOrNull()println("第一个元素: $first")// 5. single/singleOrNull - 期望只有一个元素val single = flowOf(42).single()println("唯一元素: $single")// 6. count - 计数val count = numberFlow.count()println("元素数量: $count")// 7. reduce/fold - 聚合操作val sum = numberFlow.reduce { acc, value -> acc + value }val sumWithStart = numberFlow.fold(10) { acc, value -> acc + value }println("累加和: $sum, 带初始值的和: $sumWithStart")
}
4. Flow 的中间操作符
4.1 转换操作符
suspend fun transformOperators() {val flow = (1..3).asFlow()// 1. map - 转换每个元素flow.map { it * it }.collect { println("平方: $it") }// 2. transform - 更灵活的转换,可以发射多个值flow.transform { value ->if (value % 2 == 1) {emit(value)emit(value * 10)}}.collect { println("transform: $it") }// 3. mapLatest - 取消之前的转换,只处理最新的flow.mapLatest { value ->delay(100) // 模拟耗时操作"映射后的$value"}.collect { println("mapLatest: $it") }
}
4.2 过滤操作符
suspend fun filterOperators() {val flow = (1..10).asFlow()// 1. filter - 条件过滤flow.filter { it % 2 == 0 }.collect { println("偶数: $it") }// 2. filterNot - 反向过滤flow.filterNot { it > 5 }.collect { println("小于等于5: $it") }// 3. filterIsInstance - 类型过滤flowOf(1, "hello", 2, "world", 3).filterIsInstance<String>().collect { println("字符串: $it") }// 4. take - 取前n个flow.take(3).collect { println("前3个: $it") }// 5. drop - 丢弃前n个flow.drop(5).collect { println("丢弃前5个后: $it") }// 6. distinctUntilChanged - 去重连续重复flowOf(1, 1, 2, 2, 3, 3, 1).distinctUntilChanged().collect { println("去重连续重复: $it") }
}
4.3 组合操作符
suspend fun combinationOperators() {val flow1 = flowOf("A", "B", "C")val flow2 = flowOf(1, 2, 3)// 1. zip - 一对一组合flow1.zip(flow2) { a, b -> "$a$b" }.collect { println("zip: $it") } // A1, B2, C3// 2. combine - 最新值组合val numbers = flowOf(1, 2, 3).onEach { delay(100) }val letters = flowOf("A", "B", "C").onEach { delay(150) }numbers.combine(letters) { number, letter -> "$number$letter" }.collect { println("combine: $it") } // 1A, 2A, 2B, 3B, 3C// 3. merge - 合并多个流merge(flow1, flow2.map { it.toString() }).collect { println("merge: $it") } // A, B, C, 1, 2, 3// 4. flatMapConcat - 顺序展开flow1.flatMapConcat { value -> flowOf("$value1", "$value2") }.collect { println("flatMapConcat: $it") } // A1, A2, B1, B2, C1, C2// 5. flatMapMerge - 并发展开flow1.flatMapMerge { value ->flow {delay(100)emit("$value-快速")delay(200)emit("$value-慢速")}}.collect { println("flatMapMerge: $it") }// 6. flatMapLatest - 最新值展开flow1.flatMapLatest { value ->flow {emit("开始$value")delay(200) // 如果新值到来,会取消这个流emit("完成$value")}}.collect { println("flatMapLatest: $it") }
}
4.4 异常处理操作符
suspend fun exceptionHandling() {val flow = flow {emit(1)throw RuntimeException("模拟错误")emit(2) // 不会执行}// 1. catch - 捕获异常flow.catch { e -> println("捕获异常: ${e.message}")emit(-1) // 可以恢复发射}.collect { println("catch: $it") }// 2. retry - 重试var attempt = 0flow.retry(2) { cause ->attempt++println("第${attempt}次重试,原因: ${cause.message}")attempt < 2 // 条件重试}.catch { e -> emit(999) }.collect { println("retry: $it") }// 3. retryWhen - 更灵活的重试flow.retryWhen { cause, attempt ->if (attempt < 3 && cause is RuntimeException) {delay(1000 * attempt)true} else {false}}.catch { e -> println("最终失败: ${e.message}")}.collect { println("retryWhen: $it") }
}
4.5 回调操作符
suspend fun callbackOperators() {val flow = (1..3).asFlow()// 1. onEach - 每个元素处理前的回调flow.onEach { println("即将发射: $it") }.collect { println("收集到: $it") }// 2. onStart - 流开始前的回调flow.onStart { println("流开始执行")emit(0) // 可以提前发射值}.collect { println("onStart: $it") }// 3. onCompletion - 流完成后的回调flow.onCompletion { cause ->if (cause == null) {println("流正常完成")} else {println("流异常完成: ${cause.message}")}}.collect { println("onCompletion: $it") }// 4. onEmpty - 流为空时的回调emptyFlow<Int>().onEmpty { println("流为空,发射默认值")emit(-1)}.collect { println("onEmpty: $it") }
}
5. 上下文和调度
5.1 flowOn 操作符
suspend fun flowOnExample() {// flowOn 影响上游操作的上下文flow {// 这个块在 IO 线程执行println("发射线程: ${Thread.currentThread().name}")emit(1)emit(2)}.flowOn(Dispatchers.IO) // 指定上游上下文.map { // 这个map在默认上下文执行(因为后面没有flowOn)println("映射线程: ${Thread.currentThread().name}")it * 2 }.flowOn(Dispatchers.Default) // 可以再次改变上游上下文.collect { // 收集在调用协程的上下文println("收集线程: ${Thread.currentThread().name}, 值: $it")}
}
5.2 buffer 操作符 - 背压处理
suspend fun bufferOperators() {val slowFlow = flow {for (i in 1..3) {delay(100) // 模拟慢生产者emit(i)println("发射: $i")}}println("=== 无缓冲 ===")val time1 = measureTimeMillis {slowFlow.collect { value ->delay(300) // 模拟慢消费者println("处理: $value")}}println("总时间: ${time1}ms")println("=== 有缓冲 ===")val time2 = measureTimeMillis {slowFlow.buffer() // 添加缓冲,生产消费可以并发.collect { value ->delay(300)println("处理: $value")}}println("总时间: ${time2}ms")// 其他缓冲策略slowFlow.conflate() // 合并,只处理最新值.collect { println("conflate: $it") }slowFlow.collectLatest { value -> // 取消慢的处理,处理最新值println("开始处理: $value")delay(400)println("完成处理: $value") // 可能被取消}
}
6. 共享流(SharedFlow 和 StateFlow)
6.1 shareIn - 冷流转热流
class DataRepository {// 冷流:每次收集都会重新执行private val dataFlow = flow {println("执行数据获取")emit(fetchDataFromNetwork())}// 使用 shareIn 转换为热流private val sharedFlow = dataFlow.shareIn(scope = CoroutineScope(Dispatchers.IO),started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000, // 最后一个订阅者取消后5秒停止replayExpirationMillis = 0 // 立即过期重放数据),replay = 1 // 新订阅者重放1个最新值)fun getData(): Flow<String> = sharedFlowprivate suspend fun fetchDataFromNetwork(): String {delay(1000)return "网络数据 ${System.currentTimeMillis()}"}
}suspend fun shareInExample() {val repository = DataRepository()// 第一个订阅者val job1 = launch {repository.getData().collect { println("订阅者1: $it") }}delay(500)// 第二个订阅者 - 共享同一个数据流val job2 = launch {repository.getData().collect { println("订阅者2: $it") }}delay(2000)job1.cancel()job2.cancel()
}
6.2 stateIn - 转换为状态流
class CounterViewModel : ViewModel() {private val tickFlow = flow {var count = 0while (true) {delay(1000)emit(count++)}}// 使用 stateIn 转换为状态流val counterState = tickFlow.stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000),initialValue = 0)
}suspend fun stateInExample() {val viewModel = CounterViewModel()// 状态流总是有当前值viewModel.counterState.collect { value ->println("当前计数: $value")}
}
6.3 launchIn - 在指定作用域启动
suspend fun launchInExample() {val eventFlow = flow {repeat(5) {delay(500)emit("事件$it")}}val scope = CoroutineScope(Dispatchers.IO + CoroutineName("MyScope"))// 使用 launchIn 在指定作用域启动收集val job = eventFlow.onEach { event -> println("处理事件: $event 在线程: ${Thread.currentThread().name}")}.launchIn(scope) // 返回Job可以取消delay(2000)println("取消收集")job.cancel()
}
7. 高级特性
7.1 自定义操作符
// 自定义过滤操作符
fun <T> Flow<T>.filterWithLog(predicate: (T) -> Boolean): Flow<T> = flow {collect { value ->if (predicate(value)) {println("过滤通过: $value")emit(value)} else {println("过滤拒绝: $value")}}
}// 自定义转换操作符
fun Flow<Int>.runningSum(): Flow<Int> = flow {var sum = 0collect { value ->sum += valueemit(sum)}
}suspend fun customOperators() {(1..5).asFlow().filterWithLog { it % 2 == 0 }.runningSum().collect { println("运行和: $it") }
}
7.2 复杂组合示例
suspend fun complexFlowExample() {// 模拟用户输入流val userInputs = flowOf("A", "B", "C", "D", "E").onEach { delay(200) } // 模拟用户输入间隔// 模拟网络请求流fun simulateNetworkRequest(input: String): Flow<String> = flow {delay(500) // 模拟网络延迟emit("响应: $input")}userInputs.onStart { println("开始监听用户输入") }.onEach { println("用户输入: $it") }.flatMapMerge(concurrency = 3) { input -> // 并发处理最多3个请求simulateNetworkRequest(input).onStart { println("开始请求: $input") }.catch { e -> emit("错误: ${e.message}") }}.onEach { println("收到响应: $it") }.buffer() // 防止背压.catch { e -> println("流异常: ${e.message}") }.onCompletion { println("流完成") }.collect()
}
8. 实际应用案例
8.1 搜索建议功能
class SearchViewModel : ViewModel() {private val searchQuery = MutableStateFlow("")val searchResults = searchQuery.debounce(300) // 防抖300ms.filter { it.length >= 3 } // 至少3个字符.distinctUntilChanged() // 查询相同时不重复搜索.flatMapLatest { query -> // 取消之前的搜索performSearch(query).catch { e -> emit(emptyList()) }}.stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000),initialValue = emptyList())fun onSearchQueryChanged(query: String) {searchQuery.value = query}private fun performSearch(query: String): Flow<List<String>> = flow {// 模拟网络搜索delay(1000)val results = listOf("${query}结果1", "${query}结果2", "${query}结果3")emit(results)}
}
8.2 实时数据更新
class StockMonitor {private val stockPrices = MutableSharedFlow<StockPrice>(replay = 10, // 重放最近10个价格extraBufferCapacity = 50)// 模拟实时价格更新fun startMonitoring() {CoroutineScope(Dispatchers.IO).launch {var price = 100.0while (true) {delay(1000)price += (Math.random() - 0.5) * 10 // 随机波动stockPrices.emit(StockPrice("AAPL", price, System.currentTimeMillis()))}}}fun getPriceUpdates(): Flow<StockPrice> = stockPrices// 价格变化率计算fun getPriceChanges(): Flow<Double> = stockPrices.take(2) // 取最近两个价格.map { it.price }.runningReduce { old, new -> ((new - old) / old) * 100 } // 计算变化率
}data class StockPrice(val symbol: String, val price: Double, val timestamp: Long)
9. 性能优化和最佳实践
9.1 性能优化技巧
suspend fun performanceTips() {val flow = (1..1000).asFlow()// 1. 合理使用 bufferflow.map { heavyOperation(it) }.buffer() // 允许map并发执行.collect { println(it) }// 2. 避免不必要的操作链flow.filter { it % 2 == 0 }.map { it * 2 } // 先过滤再映射,减少操作次数.collect()// 3. 使用 flowOn 合理分配上下文flow.map { cpuIntensiveOperation(it) }.flowOn(Dispatchers.Default) // CPU密集型在Default调度器.map { ioOperation(it) }.flowOn(Dispatchers.IO) // IO操作在IO调度器.collect()
}suspend fun heavyOperation(value: Int): Int {delay(10)return value * 2
}suspend fun cpuIntensiveOperation(value: Int): Int {// 模拟CPU密集型操作return value * value
}suspend fun ioOperation(value: Int): String {delay(50) // 模拟IO操作return "结果: $value"
}
9.2 测试 Flow
@Test
fun `test flow operations`() = runTest { // 使用TestScopeval flow = flowOf(1, 2, 3, 4, 5)val result = flow.filter { it % 2 == 0 }.map { it * 2 }.toList()assertEquals(listOf(4, 8), result)
}// 测试超时和异常
@Test
fun `test flow timeout`() = runTest {val slowFlow = flow {emit(1)delay(1000) // 长时间延迟emit(2)}assertFailsWith<TimeoutCancellationException> {withTimeout(500) { // 设置超时slowFlow.collect()}}
}
10. Flow 与 Channel 的比较
特性 | Flow | Channel |
---|---|---|
数据模式 | 数据流(可能无限) | 点对点通信 |
背压处理 | 自动处理 | 需要手动配置缓冲 |
冷热性 | 冷流(默认) | 热流 |
使用场景 | 数据处理管道 | 协程间通信 |
多个消费者 | 每个消费者独立流 | 共享数据 |
通过全面掌握 Flow 的各种操作符和特性,你可以构建出高效、响应式的异步数据处理管道,充分发挥 Kotlin 协程在异步编程中的优势。