trySend、Channel 和 Flow 的工作原理
1. Channel 和 Flow 基础概念
Channel(通道)
// Channel 是一个可以发送和接收数据的管道val channel = Channel<String>()// 发送数据channel.send("Hello")// 接收数据val data = channel.receive()
Flow(流)
// Flow 是一个可以发射数据的流val flow = flow {emit("Hello")emit("World")}
2. callbackFlow 的工作原理
callbackFlow 创建了一个 Channel
fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {// callbackFlow 内部创建了一个 Channel// 这个 Channel 可以发送 SSEEvent 类型的数据// 发送数据到 ChanneltrySend(SSEEvent.Message("event", "data"))// 等待 Channel 关闭awaitClose { // 清理资源disconnect() }}
callbackFlow 的完整流程
fun connect(url: String, token: String): Flow<SSEEvent> = callbackFlow {// 1. callbackFlow 创建一个 Channel// 2. 这个 Channel 可以发送 SSEEvent 类型的数据// 3. callbackFlow 返回一个 Flow,这个 Flow 会从这个 Channel 接收数据val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onResponse(call: Call, response: Response) {val body = response.body ?: returntry {var currentEvent = "message"val dataBuffer = StringBuilder()while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("event:") -> currentEvent = line.substring(6).trim()line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")line.isEmpty() -> {if (dataBuffer.isNotEmpty()) {val data = dataBuffer.toString().trim()// 4. 发送数据到 ChanneltrySend(SSEEvent.Message(currentEvent, data))dataBuffer.clear()}}}}// 5. 发送关闭事件trySend(SSEEvent.Closed)} catch (e: Exception) {// 6. 发送错误事件trySend(SSEEvent.Error(e))} finally {response.close()close() // 关闭 Channel}}override fun onFailure(call: Call, e: IOException) {// 7. 发送错误事件trySend(SSEEvent.Error(e))close(e) // 关闭 Channel}})// 8. 等待 Channel 被关闭awaitClose { disconnect() }}
3. trySend 的作用
trySend 发送数据到 Channel
// trySend 尝试发送数据到 ChanneltrySend(SSEEvent.Message(currentEvent, data))// trySend 的特点:// 1. 非阻塞:不会等待接收者// 2. 返回 Boolean:发送成功返回 true,失败返回 false// 3. 如果 Channel 已关闭,返回 false
与 send 的区别
// send:阻塞式发送,会等待接收者channel.send(data) // 如果 Channel 满了,会阻塞// trySend:非阻塞式发送,立即返回结果val success = channel.trySend(data) // 立即返回 true/false
4. Flow 的 onEach 工作原理
onEach 是 Flow 的中间操作符
sseClient.connect(sseUrl, token).onEach { event -> // 对每个事件进行处理when (event) {is SSEClient.SSEEvent.Message -> {// 处理消息事件println("收到消息: ${event.data}")}is SSEClient.SSEEvent.Error -> {// 处理错误事件println("发生错误: ${event.throwable.message}")}SSEClient.SSEEvent.Closed -> {// 处理关闭事件println("连接已关闭")}}}.launchIn(lifecycleScope) // 启动 Flow
onEach 的执行流程
// 1. callbackFlow 创建 Channelfun connect(): Flow<SSEEvent> = callbackFlow {// 发送数据到 ChanneltrySend(SSEEvent.Message("event1", "data1"))trySend(SSEEvent.Message("event2", "data2"))trySend(SSEEvent.Closed)}// 2. onEach 对每个数据进行处理.onEach { event ->// 每当 Channel 中有新数据时,这里就会被调用when (event) {is SSEClient.SSEEvent.Message -> {// 处理消息}}}// 3. launchIn 启动 Flow.launchIn(lifecycleScope)
5. 完整的数据流
数据流向图
HTTP 响应 → 解析 SSE 数据 → trySend → Channel → Flow → onEach → UI 更新
详细步骤
// 步骤 1: HTTP 响应数据data: {"message": "Hello"}// 步骤 2: 解析 SSE 数据line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim())// 步骤 3: 发送到 ChanneltrySend(SSEEvent.Message("message", "{\"message\": \"Hello\"}"))// 步骤 4: Channel 中的数据流向 FlowcallbackFlow { ... } // 返回 Flow// 步骤 5: Flow 的 onEach 处理.onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// 处理消息updateUI(event.data)}}}// 步骤 6: 启动 Flow.launchIn(lifecycleScope)
6. 实际运行示例
服务器发送的数据
data: {"message": "Hello"}
data: {"message": "World"}
data: {"message": "Test"}
代码执行流程
// 1. 服务器发送第一行数据data: {"message": "Hello"}// 2. 代码解析line.startsWith("data:") -> dataBuffer.append("{\"message\": \"Hello\"}")// 3. 遇到空行,发送数据line.isEmpty() -> {val data = dataBuffer.toString().trim()trySend(SSEEvent.Message("message", data)) // 发送到 Channel}// 4. onEach 接收到数据.onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// event.data = "{\"message\": \"Hello\"}"updateUI(event.data)}}}// 5. UI 更新updateUI("{\"message\": \"Hello\"}")
7. 关键概念总结
callbackFlow
- 创建一个 Channel
- 返回一个 Flow
- Flow 从 Channel 接收数据
trySend
- 向 Channel 发送数据
- 非阻塞操作
- 返回发送是否成功
onEach
- Flow 的中间操作符
- 对每个数据进行处理
- 不会改变数据流
launchIn
- 启动 Flow
- 在指定的协程作用域中运行
- 自动管理生命周期
8. 为什么这样设计?
优势
- 异步处理:HTTP 响应在后台处理,UI 不会阻塞
- 结构化数据:使用 sealed class 定义事件类型
- 生命周期管理:自动在 Activity 销毁时取消
- 错误处理:统一的错误处理机制
- 可扩展:易于添加新的事件类型
数据流的好处
// 清晰的数据流向
HTTP 数据 → SSE 解析 → 结构化事件 → UI 更新
// 而不是传统的回调方式
HTTP 数据 → 回调函数 → UI 更新
这样设计使得代码更加清晰、可维护,并且充分利用了 Kotlin 协程和 Flow 的优势!