协程的 callbackFlow 函数的使用和原理
1. callbackFlow 是什么?
基本概念
callbackFlow 是一个函数,用于将基于回调的 API 转换为 Flow。
简单理解
// 传统回调方式(不好用)api.getData { result ->// 处理结果}// 使用 callbackFlow 转换为 Flow(好用)val flow = api.getDataFlow()flow.collect { result ->// 处理结果}
2. callbackFlow 的基本结构
基本语法
fun someFunction(): Flow<DataType> = callbackFlow {// 1. 发送数据trySend(data)// 2. 等待关闭awaitClose {// 清理资源}}
实际例子
// 将网络请求转换为 Flowfun fetchData(): Flow<String> = callbackFlow {// 发送数据trySend("Hello")trySend("World")// 等待关闭awaitClose {println("Flow 被关闭,清理资源")}}// 使用方式fetchData().collect { data ->println("收到数据: $data")}
3. callbackFlow 的工作原理
内部机制
fun connect(url: String): Flow<SSEEvent> = callbackFlow {// callbackFlow 内部创建了一个 Channel// 这个 Channel 可以发送 SSEEvent 类型的数据// 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// 等待 Channel 关闭awaitClose { // 清理资源}}
详细流程
// 步骤 1: callbackFlow 创建 Channelfun connect(): Flow<SSEEvent> = callbackFlow {// 这里创建了一个 Channel<SSEEvent>// 步骤 2: 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// 步骤 3: 等待关闭awaitClose { // 清理代码}}// 步骤 4: 使用 Flowconnect().collect { event ->// 从 Channel 接收数据println("收到事件: $event")}
4. 为什么要用 callbackFlow?
问题:传统回调的缺点
// 传统方式(不好)class TraditionalCallback {fun getData(callback: (String) -> Unit) {// 网络请求callback("数据")}}// 使用方式(复杂)val api = TraditionalCallback()api.getData { data ->// 处理数据println("收到数据: $data")}
解决方案:使用 callbackFlow
// 现代方式(好)fun getData(): Flow<String> = callbackFlow {// 网络请求trySend("数据")awaitClose {// 清理资源}}// 使用方式(简单)getData().collect { data ->println("收到数据: $data")}
5. 你的 SSE 代码中的 callbackFlow
详细分析
fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {// 1. callbackFlow 创建了一个 Channel<SSEEvent>val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onResponse(call: Call, response: Response) {val body = response.body ?: returntry {while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("data:") -> {val data = line.substring(5).trim()// 2. 发送数据到 ChanneltrySend(SSEEvent.Message("message", data))}}}// 3. 发送关闭事件trySend(SSEEvent.Closed)} catch (e: Exception) {// 4. 发送错误事件trySend(SSEEvent.Error(e))} finally {response.close()close() // 5. 关闭 Channel}}override fun onFailure(call: Call, e: IOException) {// 6. 发送错误事件trySend(SSEEvent.Error(e))close(e) // 7. 关闭 Channel}})// 8. 等待 Channel 被关闭awaitClose { disconnect() }}
数据流向
HTTP 响应 → 解析数据 → trySend → Channel → Flow → collect
6. callbackFlow 的关键组件
trySend
// 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// trySend 的特点:// - 非阻塞// - 立即返回 true/false// - 如果 Channel 已关闭,返回 false
awaitClose
// 等待 Channel 关闭awaitClose { // 清理代码disconnect()}// awaitClose 的作用:// - 等待 Channel 被关闭// - 在关闭时执行清理代码// - 防止资源泄漏
close
// 关闭 Channelclose() // 正常关闭close(exception) // 异常关闭
7. 实际运行示例
服务器发送数据
data: {"message": "Hello"}
data: {"message": "World"}
callbackFlow 处理流程
// 1. 收到第一行数据data: {"message": "Hello"}// 2. 解析数据line.startsWith("data:") -> {val data = line.substring(5).trim() // "{\"message\": \"Hello\"}"trySend(SSEEvent.Message("message", data)) // 发送到 Channel}// 3. Flow 接收到数据connect().collect { event ->when (event) {is SSEClient.SSEEvent.Message -> {println("收到消息: ${event.data}") // 打印: 收到消息: {"message": "Hello"}}}}// 4. 收到第二行数据data: {"message": "World"}// 5. 重复步骤 2-3
8. callbackFlow 的优势
1. 将回调转换为 Flow
// 传统回调api.getData { result ->// 处理结果}// 使用 callbackFlowapi.getDataFlow().collect { result ->// 处理结果}
2. 支持协程操作
// 可以在 Flow 中使用协程connect().onEach { event ->delay(100) // 协程操作}.collect { event ->// 处理事件}
3. 生命周期管理
// 自动在生命周期结束时取消
connect().launchIn(lifecycleScope)
4. 错误处理
connect().catch { error ->// 处理错误}.collect { event ->// 处理事件}
9. 总结
callbackFlow 的核心作用
- 转换:将回调 API 转换为 Flow
- 简化:简化异步代码
- 统一:统一异步处理方式
- 管理:更好的生命周期管理
在你的 SSE 代码中
// callbackFlow 将 HTTP 回调转换为 Flowfun connect(): Flow<SSEEvent> = callbackFlow {// HTTP 回调在这里处理// 数据通过 trySend 发送到 Flow// 外部通过 collect 接收数据}
这样设计使得你的 SSE 代码更加现代化、易于使用和维护!
扩展知识点:
awaitClose { disconnect() } 是一个很好的写法,因为:
- ✅ disconnect() 方法存在且实现正确
- ✅ 在 Flow 取消时自动清理资源
- ✅ 防止 HTTP 请求泄漏
- ✅ 代码简洁清晰
在 Flow 取消时自动清理资源 ----》在 Flow 取消时自动清理资源-CSDN博客