当前位置: 首页 > news >正文

trySend、Channel 和 Flow 的工作原理

1. Channel 和 Flow 基础概念

Channel(通道)

// Channel 是一个可以发送和接收数据的管道val channel = Channel<String>()// 发送数据channel.send("Hello")// 接收数据val data = channel.receive()

Flow(流)

 

// Flow 是一个可以发射数据的流val flow = flow {emit("Hello")emit("World")}

2. callbackFlow 的工作原理

callbackFlow 创建了一个 Channel

 

fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {// callbackFlow 内部创建了一个 Channel// 这个 Channel 可以发送 SSEEvent 类型的数据// 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// 等待 Channel 关闭awaitClose { // 清理资源disconnect() }}

callbackFlow 的完整流程

 

fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {// 1. callbackFlow 创建一个 Channel// 2. 这个 Channel 可以发送 SSEEvent 类型的数据// 3. callbackFlow 返回一个 Flow,这个 Flow 会从这个 Channel 接收数据val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onResponse(call: Call, response: Response) {val body = response.body ?: returntry {var currentEvent = "message"val dataBuffer = StringBuilder()while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("event:") -> currentEvent = line.substring(6).trim()line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")line.isEmpty() -> {if (dataBuffer.isNotEmpty()) {val data = dataBuffer.toString().trim()// 4. 发送数据到 ChanneltrySend(SSEEvent.Message(currentEvent, data))dataBuffer.clear()}}}}// 5. 发送关闭事件trySend(SSEEvent.Closed)} catch (e: Exception) {// 6. 发送错误事件trySend(SSEEvent.Error(e))} finally {response.close()close() // 关闭 Channel}}override fun onFailure(call: Call, e: IOException) {// 7. 发送错误事件trySend(SSEEvent.Error(e))close(e) // 关闭 Channel}})// 8. 等待 Channel 被关闭awaitClose { disconnect() }}

3. trySend 的作用

trySend 发送数据到 Channel

 

// trySend 尝试发送数据到 ChanneltrySend(SSEEvent.Message(currentEvent, data))// trySend 的特点:// 1. 非阻塞:不会等待接收者// 2. 返回 Boolean:发送成功返回 true,失败返回 false// 3. 如果 Channel 已关闭,返回 false

与 send 的区别

// send:阻塞式发送,会等待接收者channel.send(data) // 如果 Channel 满了,会阻塞// trySend:非阻塞式发送,立即返回结果val success = channel.trySend(data) // 立即返回 true/false

 

4. Flow 的 onEach 工作原理

onEach 是 Flow 的中间操作符

 

sseClient.connect(sseUrl, token).onEach { event ->  // 对每个事件进行处理when (event) {is SSEClient.SSEEvent.Message -> {// 处理消息事件println("收到消息: ${event.data}")}is SSEClient.SSEEvent.Error -> {// 处理错误事件println("发生错误: ${event.throwable.message}")}SSEClient.SSEEvent.Closed -> {// 处理关闭事件println("连接已关闭")}}}.launchIn(lifecycleScope) // 启动 Flow

onEach 的执行流程

 

// 1. callbackFlow 创建 Channelfun connect(): Flow<SSEEvent> = callbackFlow {// 发送数据到 ChanneltrySend(SSEEvent.Message("event1", "data1"))trySend(SSEEvent.Message("event2", "data2"))trySend(SSEEvent.Closed)}// 2. onEach 对每个数据进行处理.onEach { event ->// 每当 Channel 中有新数据时,这里就会被调用when (event) {is SSEClient.SSEEvent.Message -> {// 处理消息}}}// 3. launchIn 启动 Flow.launchIn(lifecycleScope)

5. 完整的数据流

数据流向图

 

HTTP 响应 → 解析 SSE 数据 → trySend → Channel → Flow → onEach → UI 更新

详细步骤

 

// 步骤 1: HTTP 响应数据data: {"message": "Hello"}// 步骤 2: 解析 SSE 数据line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim())// 步骤 3: 发送到 ChanneltrySend(SSEEvent.Message("message", "{\"message\": \"Hello\"}"))// 步骤 4: Channel 中的数据流向 FlowcallbackFlow { ... } // 返回 Flow// 步骤 5: Flow 的 onEach 处理.onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// 处理消息updateUI(event.data)}}}// 步骤 6: 启动 Flow.launchIn(lifecycleScope)

6. 实际运行示例

服务器发送的数据

  

data: {"message": "Hello"}

data: {"message": "World"}

data: {"message": "Test"}

代码执行流程

// 1. 服务器发送第一行数据data: {"message": "Hello"}// 2. 代码解析line.startsWith("data:") -> dataBuffer.append("{\"message\": \"Hello\"}")// 3. 遇到空行,发送数据line.isEmpty() -> {val data = dataBuffer.toString().trim()trySend(SSEEvent.Message("message", data)) // 发送到 Channel}// 4. onEach 接收到数据.onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// event.data = "{\"message\": \"Hello\"}"updateUI(event.data)}}}// 5. UI 更新updateUI("{\"message\": \"Hello\"}")

7. 关键概念总结

callbackFlow

  • 创建一个 Channel
  • 返回一个 Flow
  • Flow 从 Channel 接收数据

trySend

  • 向 Channel 发送数据
  • 非阻塞操作
  • 返回发送是否成功

onEach

  • Flow 的中间操作符
  • 对每个数据进行处理
  • 不会改变数据流

launchIn

  • 启动 Flow
  • 在指定的协程作用域中运行
  • 自动管理生命周期

8. 为什么这样设计?

优势

  1. 异步处理:HTTP 响应在后台处理,UI 不会阻塞
  1. 结构化数据:使用 sealed class 定义事件类型
  1. 生命周期管理:自动在 Activity 销毁时取消
  1. 错误处理:统一的错误处理机制
  1. 可扩展:易于添加新的事件类型

数据流的好处

 

// 清晰的数据流向

HTTP 数据 → SSE 解析 → 结构化事件 → UI 更新

// 而不是传统的回调方式

HTTP 数据 → 回调函数 → UI 更新

这样设计使得代码更加清晰、可维护,并且充分利用了 Kotlin 协程和 Flow 的优势!

http://www.dtcms.com/a/277327.html

相关文章:

  • 【CMake】CMake创建、安装、使用静态库和动态库
  • 操作系统-第四章存储器管理和第五章设备管理-知识点整理(知识点学习 / 期末复习 / 面试 / 笔试)
  • 【hivesql 已知维度父子关系加工层级表】
  • C++每日刷题day2025.7.13
  • 什么是RAG(Retrieval-Augmented Generation)?一文读懂检索增强生成
  • RabbitMQ面试精讲 Day 2:RabbitMQ工作模型与消息流转
  • 12.I/O复用
  • 前端性能与可靠性工程:资源优化 - 加载性能的“低垂果实”
  • 从零开始学习深度学习-水果分类之PyQt5App
  • SpringBoot集成Redis、SpringCache
  • C++ 强制类型转换
  • 【操作系统】strace 跟踪系统调用(一)
  • (LeetCode 每日一题) 2410. 运动员和训练师的最大匹配数(排序、双指针)
  • es里为什么node和shard不是一对一的关系
  • Augment AI 0.502.0版本深度解析:Task、Guidelines、Memory三大核心功能实战指南
  • 将 NumPy 数组展平并转换为 Python 列表
  • 1.1.5 模块与包——AI教你学Django
  • OpenLayers 入门指南【二】:坐标系与投影转换
  • 把 DNA 当 PCIe:一条 365 nt 链实现 64 Gbps 片上光互连——基于链式 Förster 共振的分子级波分复用链路
  • 理解 Robots 协议:爬虫该遵守的“游戏规则”
  • MySQL逻辑删除与唯一索引冲突解决
  • M00224-小范围疫情防控元胞自动机模拟matlab
  • 【unitrix】 5.1 第二套类型级二进制数基本结构体(types2.rs)
  • 深入解析Hadoop架构设计:原理、组件与应用
  • OpenLayers使用
  • (2)从零开发 Chrome 插件:实现 API 登录与本地存储功能
  • 音视频学习(三十八):像素与位深
  • 打破并发瓶颈:虚拟线程实现详解与传统线程模型的性能对比
  • QuickUnion优化及Huffman树
  • JS红宝书pdf完整版