Channel 的核心特点 (Channel vs SharedFlow 选择对比)
Channel 的核心特点
Channel 的本质是一个线程安全的队列,强调点对点的精确传递和消费者竞争。
1. 任务队列场景 ✅
工作协程模式
class TaskProcessor {private val taskChannel = Channel<Task>(capacity = Channel.UNLIMITED)init {// 启动固定数量的工作协程repeat(4) { workerId ->launch {for (task in taskChannel) { // 每个任务只被一个worker处理processTask(task, workerId)}}}}suspend fun submitTask(task: Task) {taskChannel.send(task)}private suspend fun processTask(task: Task, workerId: Int) {println("Worker $workerId 开始处理: $task")delay(task.duration) // 模拟处理时间println("Worker $workerId 完成: $task")}
}// 使用
fun main() = runBlocking {val processor = TaskProcessor()// 提交多个任务repeat(10) { i ->processor.submitTask(Task("任务$i", 1000L))}delay(5000)
}
2. 请求-响应模式 ✅
RPC 式通信
class RequestResponseHandler {private val requestChannel = Channel<Request>(Channel.UNLIMITED)init {launch {for (request in requestChannel) {val response = processRequest(request)// 将响应发送回特定的响应通道request.responseChannel.send(response)}}}suspend fun sendRequest(request: Request): Response {val responseChannel = Channel<Response>(Channel.RENDEZVOUS)request.responseChannel = responseChannelrequestChannel.send(request)return responseChannel.receive() // 等待特定响应}private suspend fun processRequest(request: Request): Response {// 处理请求...return Response("处理结果")}
}data class Request(val data: String,var responseChannel: Channel<Response>? = null
)
3. 背压敏感的生产者-消费者 ✅
有界 Channel 控制内存
class ImageProcessor {// 使用有界Channel防止内存溢出private val imageChannel = Channel<ImageData>(capacity = 10)// 慢速消费者private val processorJob = launch {for (image in imageChannel) {processImageHeavy(image) // 耗时处理}}// 快速生产者fun onCameraFrame(frame: ImageData) {if (!imageChannel.trySend(frame).isSuccess) {// Channel已满,丢弃帧或采取其他策略println("丢弃帧,消费者太慢")}}private suspend fun processImageHeavy(image: ImageData) {delay(200) // 模拟重处理saveToDisk(image)}
}
4. 协程间精确协调 ✅
同步点协调
class PipelineProcessor {private val stage1Channel = Channel<Data>(1)private val stage2Channel = Channel<Data>(1)private val stage3Channel = Channel<Data>(1)fun startPipeline() {// 阶段1:数据加载launch {for (data in loadData()) {stage1Channel.send(data)}stage1Channel.close()}// 阶段2:数据处理launch {for (data in stage1Channel) {val processed = processStage1(data)stage2Channel.send(processed)}stage2Channel.close()}// 阶段3:数据保存launch {for (data in stage2Channel) {saveData(data)}}}
}
5. 替代回调的挂起函数 ✅
将回调API转换为协程
class DialogManager {private val resultChannel = Channel<DialogResult>(Channel.RENDEZVOUS)suspend fun showConfirmationDialog(): Boolean {showDialog()val result = resultChannel.receive() // 挂起直到用户操作return result == DialogResult.CONFIRMED}fun onConfirmClicked() {resultChannel.trySend(DialogResult.CONFIRMED)}fun onCancelClicked() {resultChannel.trySend(DialogResult.CANCELLED)}
}
6. 多个生产者的负载均衡 ✅
class LoadBalancer {private val workChannel = Channel<WorkItem>(Channel.UNLIMITED)init {// 多个消费者竞争处理repeat(3) { workerId ->launch {for (work in workChannel) {processWork(work, workerId)}}}}// 多个生产者fun producer1() = launch {repeat(10) { i ->workChannel.send(WorkItem("Producer1-Item$i"))}}fun producer2() = launch {repeat(10) { i ->workChannel.send(WorkItem("Producer2-Item$i"))}}
}
Channel vs SharedFlow 决策指南
场景特征 | 选择 Channel | 选择 SharedFlow |
---|---|---|
每个消息只需要一个消费者处理 | ✅ | ❌ |
需要任务队列、工作分配 | ✅ | ❌ |
请求-响应模式 | ✅ | ❌ |
需要精确的背压控制 | ✅ | ⚠️ |
所有消费者都需要所有消息 | ❌ | ✅ |
状态广播、事件通知 | ❌ | ✅ |
UI状态管理 | ❌ | ✅ |
实时数据流 | ❌ | ✅ |
具体判断标准
选择 Channel 当:
"这个工作只需要做一次" - 任务处理
"谁有空谁处理" - 负载均衡
"我发请求,等你的响应" - RPC模式
"不要堆积太多,控制内存" - 背压控制
"步骤A完成后再开始步骤B" - 流水线协调
选择 SharedFlow 当:
"所有人都需要知道这个状态" - 状态广播
"新来的也要看到最新状态" - 状态保持
"这个事件发生了,感兴趣的人自己处理" - 事件发布
"实时更新显示" - UI同步
总结
Channel 最适合的场景:
🎯 任务分发和工作队列
🎯 请求-响应通信
🎯 生产者-消费者 with 背压控制
🎯 协程间精确协调
🎯 负载均衡
记住这个简单原则:
用 Channel 当你想"分配工作"
用 SharedFlow 当你想"广播通知"
对于人脸跟随场景,由于需要多个组件(显示、控制、记录)同时处理同一帧数据,所以 SharedFlow 是正确的选择。但如果将来你需要将人脸检测任务分发给多个工作协程并行处理,那时候就可以考虑使用 Channel 了!