当前位置: 首页 > news >正文

deepseek Kotlin Flow 全面详解

deepseek总结的文档,十分靠谱!微调了少量内容。

1. Flow 基础概念

1.1 什么是 Flow

Flow 是 Kotlin 协程库中的异步数据流处理工具,类似于 RxJava 的 Observable,但更简洁、与协程深度集成。

1.2 Flow 的特点

  • 冷流(Cold Stream):有收集者时才执行
  • 可取消的:基于协程的取消机制
  • 背压感知:自动处理生产消费速度不匹配
  • 丰富的操作符:类似集合操作但支持异步

2. Flow 的创建方式

2.1 基础创建方法

// 1. flow{} 构建器 - 最常用
fun simpleFlow(): Flow<Int> = flow {for (i in 1..3) {delay(100) // 可以调用挂起函数emit(i)    // 发射值}
}// 2. asFlow() 扩展 - 从集合转换
fun collectionToFlow(): Flow<Int> = listOf(1, 2, 3).asFlow()// 3. flowOf() - 从固定值创建
fun fixedFlow(): Flow<Int> = flowOf(1, 2, 3)// 4. channelFlow - 更灵活的构建器
fun channelFlowExample(): Flow<Int> = channelFlow {send(1)withContext(Dispatchers.IO) {send(2) // 可以在不同上下文发送}send(3)close()
}

2.2 channelFlow{} vs flow{}

channelFlow和flow是Kotlin协程中处理数据流的两种方式,主要区别体现在以下几个方面:

  1. 数据流特性
    flow‌是冷数据流,只有在订阅者开始收集数据时才会触发数据生产,且每个订阅者会独立获得完整的数据序列。它类似于电梯,只有调用collect时才启动数据生产,结束后立即停止。
    channelFlow‌是热数据流的一种扩展,允许在数据生产时切换协程上下文,并内置缓冲区(默认64)。它更像地铁自动扶梯,数据生产独立于消费存在。
  2. 上下文限制
    flow‌严格要求在同一个协程上下文中发射数据,不允许通过withContext切换调度器。
    channelFlow‌通过send方法支持跨协程发送数据,内部使用Channel实现多协程并发生产。允许使用withContext。
  3. 缓冲机制
    flow‌需要显式调用.buffer()操作符添加缓冲功能。
    channelFlow‌天然具备缓冲区(也可通过.buffer()配置大小),且支持异步合并多个流(如通过launch并发收集)。
  4. 生命周期控制
    flow‌的生命周期与收集操作绑定,收集结束即终止。
    channelFlow‌可通过awaitClose保持通道开放,直到显式关闭。
    典型使用场景
    flow‌适用于单次数据提供场景(如数据库查询,API请求)。
    channelFlow‌适合事件合并、跨协程数据生产或需要背压控制的场景(如合并多个API响应)。

3. Flow 的终端操作符

3.1 收集操作符

suspend fun terminalOperators() {val numberFlow = (1..5).asFlow()// 1. collect - 基本收集numberFlow.collect { value -> println("收集: $value") }// 2. launchIn - 在指定作用域启动收集numberFlow.onEach { println("launchIn: $it") }.launchIn(CoroutineScope(Dispatchers.IO))// 3. toList/toSet - 转换为集合val list = numberFlow.toList()val set = numberFlow.toSet()println("转换为List: $list")// 4. first/firstOrNull - 获取第一个元素val first = numberFlow.first()val firstOrNull = emptyFlow<Int>().firstOrNull()println("第一个元素: $first")// 5. single/singleOrNull - 期望只有一个元素val single = flowOf(42).single()println("唯一元素: $single")// 6. count - 计数val count = numberFlow.count()println("元素数量: $count")// 7. reduce/fold - 聚合操作val sum = numberFlow.reduce { acc, value -> acc + value }val sumWithStart = numberFlow.fold(10) { acc, value -> acc + value }println("累加和: $sum, 带初始值的和: $sumWithStart")
}

4. Flow 的中间操作符

4.1 转换操作符

suspend fun transformOperators() {val flow = (1..3).asFlow()// 1. map - 转换每个元素flow.map { it * it }.collect { println("平方: $it") }// 2. transform - 更灵活的转换,可以发射多个值flow.transform { value ->if (value % 2 == 1) {emit(value)emit(value * 10)}}.collect { println("transform: $it") }// 3. mapLatest - 取消之前的转换,只处理最新的flow.mapLatest { value ->delay(100) // 模拟耗时操作"映射后的$value"}.collect { println("mapLatest: $it") }
}

4.2 过滤操作符

suspend fun filterOperators() {val flow = (1..10).asFlow()// 1. filter - 条件过滤flow.filter { it % 2 == 0 }.collect { println("偶数: $it") }// 2. filterNot - 反向过滤flow.filterNot { it > 5 }.collect { println("小于等于5: $it") }// 3. filterIsInstance - 类型过滤flowOf(1, "hello", 2, "world", 3).filterIsInstance<String>().collect { println("字符串: $it") }// 4. take - 取前n个flow.take(3).collect { println("前3个: $it") }// 5. drop - 丢弃前n个flow.drop(5).collect { println("丢弃前5个后: $it") }// 6. distinctUntilChanged - 去重连续重复flowOf(1, 1, 2, 2, 3, 3, 1).distinctUntilChanged().collect { println("去重连续重复: $it") }
}

4.3 组合操作符

suspend fun combinationOperators() {val flow1 = flowOf("A", "B", "C")val flow2 = flowOf(1, 2, 3)// 1. zip - 一对一组合flow1.zip(flow2) { a, b -> "$a$b" }.collect { println("zip: $it") } // A1, B2, C3// 2. combine - 最新值组合val numbers = flowOf(1, 2, 3).onEach { delay(100) }val letters = flowOf("A", "B", "C").onEach { delay(150) }numbers.combine(letters) { number, letter -> "$number$letter" }.collect { println("combine: $it") } // 1A, 2A, 2B, 3B, 3C// 3. merge - 合并多个流merge(flow1, flow2.map { it.toString() }).collect { println("merge: $it") } // A, B, C, 1, 2, 3// 4. flatMapConcat - 顺序展开flow1.flatMapConcat { value -> flowOf("$value1", "$value2") }.collect { println("flatMapConcat: $it") } // A1, A2, B1, B2, C1, C2// 5. flatMapMerge - 并发展开flow1.flatMapMerge { value ->flow {delay(100)emit("$value-快速")delay(200)emit("$value-慢速")}}.collect { println("flatMapMerge: $it") }// 6. flatMapLatest - 最新值展开flow1.flatMapLatest { value ->flow {emit("开始$value")delay(200) // 如果新值到来,会取消这个流emit("完成$value")}}.collect { println("flatMapLatest: $it") }
}

4.4 异常处理操作符

suspend fun exceptionHandling() {val flow = flow {emit(1)throw RuntimeException("模拟错误")emit(2) // 不会执行}// 1. catch - 捕获异常flow.catch { e -> println("捕获异常: ${e.message}")emit(-1) // 可以恢复发射}.collect { println("catch: $it") }// 2. retry - 重试var attempt = 0flow.retry(2) { cause ->attempt++println("第${attempt}次重试,原因: ${cause.message}")attempt < 2 // 条件重试}.catch { e -> emit(999) }.collect { println("retry: $it") }// 3. retryWhen - 更灵活的重试flow.retryWhen { cause, attempt ->if (attempt < 3 && cause is RuntimeException) {delay(1000 * attempt)true} else {false}}.catch { e -> println("最终失败: ${e.message}")}.collect { println("retryWhen: $it") }
}

4.5 回调操作符

suspend fun callbackOperators() {val flow = (1..3).asFlow()// 1. onEach - 每个元素处理前的回调flow.onEach { println("即将发射: $it") }.collect { println("收集到: $it") }// 2. onStart - 流开始前的回调flow.onStart { println("流开始执行")emit(0) // 可以提前发射值}.collect { println("onStart: $it") }// 3. onCompletion - 流完成后的回调flow.onCompletion { cause ->if (cause == null) {println("流正常完成")} else {println("流异常完成: ${cause.message}")}}.collect { println("onCompletion: $it") }// 4. onEmpty - 流为空时的回调emptyFlow<Int>().onEmpty { println("流为空,发射默认值")emit(-1)}.collect { println("onEmpty: $it") }
}

5. 上下文和调度

5.1 flowOn 操作符

suspend fun flowOnExample() {// flowOn 影响上游操作的上下文flow {// 这个块在 IO 线程执行println("发射线程: ${Thread.currentThread().name}")emit(1)emit(2)}.flowOn(Dispatchers.IO) // 指定上游上下文.map { // 这个map在默认上下文执行(因为后面没有flowOn)println("映射线程: ${Thread.currentThread().name}")it * 2 }.flowOn(Dispatchers.Default) // 可以再次改变上游上下文.collect { // 收集在调用协程的上下文println("收集线程: ${Thread.currentThread().name}, 值: $it")}
}

5.2 buffer 操作符 - 背压处理

suspend fun bufferOperators() {val slowFlow = flow {for (i in 1..3) {delay(100) // 模拟慢生产者emit(i)println("发射: $i")}}println("=== 无缓冲 ===")val time1 = measureTimeMillis {slowFlow.collect { value ->delay(300) // 模拟慢消费者println("处理: $value")}}println("总时间: ${time1}ms")println("=== 有缓冲 ===")val time2 = measureTimeMillis {slowFlow.buffer() // 添加缓冲,生产消费可以并发.collect { value ->delay(300)println("处理: $value")}}println("总时间: ${time2}ms")// 其他缓冲策略slowFlow.conflate() // 合并,只处理最新值.collect { println("conflate: $it") }slowFlow.collectLatest { value -> // 取消慢的处理,处理最新值println("开始处理: $value")delay(400)println("完成处理: $value") // 可能被取消}
}

6. 共享流(SharedFlow 和 StateFlow)

6.1 shareIn - 冷流转热流

class DataRepository {// 冷流:每次收集都会重新执行private val dataFlow = flow {println("执行数据获取")emit(fetchDataFromNetwork())}// 使用 shareIn 转换为热流private val sharedFlow = dataFlow.shareIn(scope = CoroutineScope(Dispatchers.IO),started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000, // 最后一个订阅者取消后5秒停止replayExpirationMillis = 0 // 立即过期重放数据),replay = 1 // 新订阅者重放1个最新值)fun getData(): Flow<String> = sharedFlowprivate suspend fun fetchDataFromNetwork(): String {delay(1000)return "网络数据 ${System.currentTimeMillis()}"}
}suspend fun shareInExample() {val repository = DataRepository()// 第一个订阅者val job1 = launch {repository.getData().collect { println("订阅者1: $it") }}delay(500)// 第二个订阅者 - 共享同一个数据流val job2 = launch {repository.getData().collect { println("订阅者2: $it") }}delay(2000)job1.cancel()job2.cancel()
}

6.2 stateIn - 转换为状态流

class CounterViewModel : ViewModel() {private val tickFlow = flow {var count = 0while (true) {delay(1000)emit(count++)}}// 使用 stateIn 转换为状态流val counterState = tickFlow.stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000),initialValue = 0)
}suspend fun stateInExample() {val viewModel = CounterViewModel()// 状态流总是有当前值viewModel.counterState.collect { value ->println("当前计数: $value")}
}

6.3 launchIn - 在指定作用域启动

suspend fun launchInExample() {val eventFlow = flow {repeat(5) {delay(500)emit("事件$it")}}val scope = CoroutineScope(Dispatchers.IO + CoroutineName("MyScope"))// 使用 launchIn 在指定作用域启动收集val job = eventFlow.onEach { event -> println("处理事件: $event 在线程: ${Thread.currentThread().name}")}.launchIn(scope) // 返回Job可以取消delay(2000)println("取消收集")job.cancel()
}

7. 高级特性

7.1 自定义操作符

// 自定义过滤操作符
fun <T> Flow<T>.filterWithLog(predicate: (T) -> Boolean): Flow<T> = flow {collect { value ->if (predicate(value)) {println("过滤通过: $value")emit(value)} else {println("过滤拒绝: $value")}}
}// 自定义转换操作符
fun Flow<Int>.runningSum(): Flow<Int> = flow {var sum = 0collect { value ->sum += valueemit(sum)}
}suspend fun customOperators() {(1..5).asFlow().filterWithLog { it % 2 == 0 }.runningSum().collect { println("运行和: $it") }
}

7.2 复杂组合示例

suspend fun complexFlowExample() {// 模拟用户输入流val userInputs = flowOf("A", "B", "C", "D", "E").onEach { delay(200) } // 模拟用户输入间隔// 模拟网络请求流fun simulateNetworkRequest(input: String): Flow<String> = flow {delay(500) // 模拟网络延迟emit("响应: $input")}userInputs.onStart { println("开始监听用户输入") }.onEach { println("用户输入: $it") }.flatMapMerge(concurrency = 3) { input -> // 并发处理最多3个请求simulateNetworkRequest(input).onStart { println("开始请求: $input") }.catch { e -> emit("错误: ${e.message}") }}.onEach { println("收到响应: $it") }.buffer() // 防止背压.catch { e -> println("流异常: ${e.message}") }.onCompletion { println("流完成") }.collect()
}

8. 实际应用案例

8.1 搜索建议功能

class SearchViewModel : ViewModel() {private val searchQuery = MutableStateFlow("")val searchResults = searchQuery.debounce(300) // 防抖300ms.filter { it.length >= 3 } // 至少3个字符.distinctUntilChanged() // 查询相同时不重复搜索.flatMapLatest { query -> // 取消之前的搜索performSearch(query).catch { e -> emit(emptyList()) }}.stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000),initialValue = emptyList())fun onSearchQueryChanged(query: String) {searchQuery.value = query}private fun performSearch(query: String): Flow<List<String>> = flow {// 模拟网络搜索delay(1000)val results = listOf("${query}结果1", "${query}结果2", "${query}结果3")emit(results)}
}

8.2 实时数据更新

class StockMonitor {private val stockPrices = MutableSharedFlow<StockPrice>(replay = 10, // 重放最近10个价格extraBufferCapacity = 50)// 模拟实时价格更新fun startMonitoring() {CoroutineScope(Dispatchers.IO).launch {var price = 100.0while (true) {delay(1000)price += (Math.random() - 0.5) * 10 // 随机波动stockPrices.emit(StockPrice("AAPL", price, System.currentTimeMillis()))}}}fun getPriceUpdates(): Flow<StockPrice> = stockPrices// 价格变化率计算fun getPriceChanges(): Flow<Double> = stockPrices.take(2) // 取最近两个价格.map { it.price }.runningReduce { old, new -> ((new - old) / old) * 100 } // 计算变化率
}data class StockPrice(val symbol: String, val price: Double, val timestamp: Long)

9. 性能优化和最佳实践

9.1 性能优化技巧

suspend fun performanceTips() {val flow = (1..1000).asFlow()// 1. 合理使用 bufferflow.map { heavyOperation(it) }.buffer() // 允许map并发执行.collect { println(it) }// 2. 避免不必要的操作链flow.filter { it % 2 == 0 }.map { it * 2 } // 先过滤再映射,减少操作次数.collect()// 3. 使用 flowOn 合理分配上下文flow.map { cpuIntensiveOperation(it) }.flowOn(Dispatchers.Default) // CPU密集型在Default调度器.map { ioOperation(it) }.flowOn(Dispatchers.IO) // IO操作在IO调度器.collect()
}suspend fun heavyOperation(value: Int): Int {delay(10)return value * 2
}suspend fun cpuIntensiveOperation(value: Int): Int {// 模拟CPU密集型操作return value * value
}suspend fun ioOperation(value: Int): String {delay(50) // 模拟IO操作return "结果: $value"
}

9.2 测试 Flow

@Test
fun `test flow operations`() = runTest { // 使用TestScopeval flow = flowOf(1, 2, 3, 4, 5)val result = flow.filter { it % 2 == 0 }.map { it * 2 }.toList()assertEquals(listOf(4, 8), result)
}// 测试超时和异常
@Test
fun `test flow timeout`() = runTest {val slowFlow = flow {emit(1)delay(1000) // 长时间延迟emit(2)}assertFailsWith<TimeoutCancellationException> {withTimeout(500) { // 设置超时slowFlow.collect()}}
}

10. Flow 与 Channel 的比较

特性FlowChannel
数据模式数据流(可能无限)点对点通信
背压处理自动处理需要手动配置缓冲
冷热性冷流(默认)热流
使用场景数据处理管道协程间通信
多个消费者每个消费者独立流共享数据

通过全面掌握 Flow 的各种操作符和特性,你可以构建出高效、响应式的异步数据处理管道,充分发挥 Kotlin 协程在异步编程中的优势。

http://www.dtcms.com/a/415163.html

相关文章:

  • MTK-Android13-Dialer 通话界面定制修改
  • 化妆品电子商务网站开发流程描述中山网站建设推荐
  • 宿州移动网站建设广州模板网站
  • 旅游景区网站建设哈尔滨发布信息的网站
  • RVC WebUI(Retrieval-based-Voice-Conversion-WebUI)配置
  • 在线制作简历网站网页结构布局
  • 建网站要备案东莞网站制作品牌祥奔科技
  • 棋盘覆盖问题
  • 大邑网站建设百合居装饰公司官网
  • C++基础(3)-类的6个默认成员函数
  • 做营销型网站需要注意哪些点开发小程序费用
  • AI“点亮”萤火虫:边缘机器学习让微光成像走进4K时代
  • 【手撕机器学习 02】手撕算法的基石:精通NumPy与Pandas向量化思维
  • 一种好用开发的轻量级 Markdown 编辑器
  • 网站用户管理系统徐州市城乡建设局网站
  • 花店网站首页模版帝国cms使用教程
  • React-router v6学生管理系统笔记
  • 手写签名太麻烦?智能签名生成器免费实测 智能签名生成器、智能签名生成器使用、免费电子签名工具、Windows 电子签名软件、办公效率工具
  • 建设银行六安市分行网站hreflang wordpress
  • N8N Workflow Collection - 专业级自动化工作流库
  • 有没有专业做特产的网站小企业如何建网站
  • Android 6.0+ 动态权限请求模块,这个模块会包含 权限检查、请求、结果处理 等核心功能,并且支持 单个 / 多个权限请求、权限拒绝后的引导
  • Android -自定义Binding Adapter实战应用
  • 网站优化提升速度网站建设权利义务
  • 【复现】一种基于价格弹性矩阵的居民峰谷分时电价激励策略【需求响应】
  • 【怎么复制cmd命令行里面的文字输出和报错】
  • Oracle体系结构-RECO详解
  • 2025.9.27总结
  • 做网站怎么跟客户谈话搜狗优化好的网站
  • 乐清网站推广公司怎样取消2345网址导航