当前位置: 首页 > news >正文

协程的 callbackFlow 函数的使用和原理

1. callbackFlow 是什么?

基本概念

callbackFlow 是一个函数,用于将基于回调的 API 转换为 Flow。

简单理解

// 传统回调方式(不好用)api.getData { result ->// 处理结果}// 使用 callbackFlow 转换为 Flow(好用)val flow = api.getDataFlow()flow.collect { result ->// 处理结果}

2. callbackFlow 的基本结构

基本语法

fun someFunction(): Flow<DataType> = callbackFlow {// 1. 发送数据trySend(data)// 2. 等待关闭awaitClose {// 清理资源}}

实际例子

// 将网络请求转换为 Flowfun fetchData(): Flow<String> = callbackFlow {// 发送数据trySend("Hello")trySend("World")// 等待关闭awaitClose {println("Flow 被关闭,清理资源")}}// 使用方式fetchData().collect { data ->println("收到数据: $data")}

3. callbackFlow 的工作原理

内部机制

fun connect(url: String): Flow<SSEEvent> = callbackFlow {// callbackFlow 内部创建了一个 Channel// 这个 Channel 可以发送 SSEEvent 类型的数据// 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// 等待 Channel 关闭awaitClose { // 清理资源}}

详细流程

 // 步骤 1: callbackFlow 创建 Channelfun connect(): Flow<SSEEvent> = callbackFlow {// 这里创建了一个 Channel<SSEEvent>// 步骤 2: 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// 步骤 3: 等待关闭awaitClose { // 清理代码}}// 步骤 4: 使用 Flowconnect().collect { event ->// 从 Channel 接收数据println("收到事件: $event")}

4. 为什么要用 callbackFlow?

问题:传统回调的缺点

// 传统方式(不好)class TraditionalCallback {fun getData(callback: (String) -> Unit) {// 网络请求callback("数据")}}// 使用方式(复杂)val api = TraditionalCallback()api.getData { data ->// 处理数据println("收到数据: $data")}

解决方案:使用 callbackFlow

 // 现代方式(好)fun getData(): Flow<String> = callbackFlow {// 网络请求trySend("数据")awaitClose {// 清理资源}}// 使用方式(简单)getData().collect { data ->println("收到数据: $data")}

5. 你的 SSE 代码中的 callbackFlow

详细分析

fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {// 1. callbackFlow 创建了一个 Channel<SSEEvent>val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onResponse(call: Call, response: Response) {val body = response.body ?: returntry {while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("data:") -> {val data = line.substring(5).trim()// 2. 发送数据到 ChanneltrySend(SSEEvent.Message("message", data))}}}// 3. 发送关闭事件trySend(SSEEvent.Closed)} catch (e: Exception) {// 4. 发送错误事件trySend(SSEEvent.Error(e))} finally {response.close()close() // 5. 关闭 Channel}}override fun onFailure(call: Call, e: IOException) {// 6. 发送错误事件trySend(SSEEvent.Error(e))close(e) // 7. 关闭 Channel}})// 8. 等待 Channel 被关闭awaitClose { disconnect() }}

数据流向

HTTP 响应 → 解析数据 → trySend → Channel → Flow → collect

6. callbackFlow 的关键组件

trySend

// 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// trySend 的特点:// - 非阻塞// - 立即返回 true/false// - 如果 Channel 已关闭,返回 false

awaitClose

// 等待 Channel 关闭awaitClose { // 清理代码disconnect()}// awaitClose 的作用:// - 等待 Channel 被关闭// - 在关闭时执行清理代码// - 防止资源泄漏

close

// 关闭 Channelclose() // 正常关闭close(exception) // 异常关闭

7. 实际运行示例

服务器发送数据

data: {"message": "Hello"}

data: {"message": "World"}

callbackFlow 处理流程

// 1. 收到第一行数据data: {"message": "Hello"}// 2. 解析数据line.startsWith("data:") -> {val data = line.substring(5).trim() // "{\"message\": \"Hello\"}"trySend(SSEEvent.Message("message", data)) // 发送到 Channel}// 3. Flow 接收到数据connect().collect { event ->when (event) {is SSEClient.SSEEvent.Message -> {println("收到消息: ${event.data}") // 打印: 收到消息: {"message": "Hello"}}}}// 4. 收到第二行数据data: {"message": "World"}// 5. 重复步骤 2-3

8. callbackFlow 的优势

1. 将回调转换为 Flow

// 传统回调api.getData { result ->// 处理结果}// 使用 callbackFlowapi.getDataFlow().collect { result ->// 处理结果}

2. 支持协程操作

// 可以在 Flow 中使用协程connect().onEach { event ->delay(100) // 协程操作}.collect { event ->// 处理事件}

3. 生命周期管理

// 自动在生命周期结束时取消

connect().launchIn(lifecycleScope)

4. 错误处理

connect().catch { error ->// 处理错误}.collect { event ->// 处理事件}

9. 总结

callbackFlow 的核心作用

  1. 转换:将回调 API 转换为 Flow
  1. 简化:简化异步代码
  1. 统一:统一异步处理方式
  1. 管理:更好的生命周期管理

在你的 SSE 代码中 

// callbackFlow 将 HTTP 回调转换为 Flowfun connect(): Flow<SSEEvent> = callbackFlow {// HTTP 回调在这里处理// 数据通过 trySend 发送到 Flow// 外部通过 collect 接收数据}

这样设计使得你的 SSE 代码更加现代化、易于使用和维护!

扩展知识点:

 awaitClose { disconnect() } 是一个很好的写法,因为:

  • ✅ disconnect() 方法存在且实现正确
  • ✅ 在 Flow 取消时自动清理资源
  • ✅ 防止 HTTP 请求泄漏
  • ✅ 代码简洁清晰

在 Flow 取消时自动清理资源 ----》在 Flow 取消时自动清理资源-CSDN博客

http://www.dtcms.com/a/277920.html

相关文章:

  • 认识数据分析
  • 第一,二次作业
  • LAN-401 linux操作系统的移植
  • DHS及HTTPS工作过程
  • 【Claude Code】 AI 编程指南
  • sql初学见解
  • 多线程死锁
  • 飞算Java AI开发助手:引领智能编程新风尚
  • Llama系列:Llama1, Llama2,Llama3内容概述
  • 【读书笔记】《C++ Software Design》第九章:The Decorator Design Pattern
  • HTML 基本骨架
  • [GWCTF 2019]我有一个数据库
  • SOMEIP协议与测试
  • LeetCode 2401.最长优雅子数组
  • C++数组指针与函数指针
  • 为什么要有延时回调?
  • 2024-2025-2 山东大学《软件工程与实践》期末(回忆版)
  • p4 大小写检查
  • C++高级编程,类模版成员函数类外实现
  • windows10如何安装vue开发环境
  • JAVA-springboot 整合Activemq
  • ECU(电子控制单元)是什么?
  • C++中顶层const与底层const
  • JSX 语法
  • 【前端知识】移动端APP原生应用与H5交互底层逻辑
  • Dubbo跨越分布式事务的最终一致性陷阱
  • 有效感受野(ERF)可视化工具
  • hash表的模拟--开放定址法
  • 如何将本地代码同步到远程Github仓库
  • 【Docker基础】Dockerfile指令速览:环境与元数据指令详解