Kotlin 协程之 突破 Flow 限制:Channel 与 Flow 的结合之道
前言
上一篇文章介绍了 Flow
的基本概念和使用,也知道了Flow
强调单一执行流与上下文一致性,不允许跨协程并发 emit
。
但在实际开发中,我们可能会遇到需要"多协程并发生产 + 按需收集",同时还想复用 Flow
的操作符体系的场景。也就是说,需要满足以下条件:
- 可以多协程并发生产数据
- 具备冷流的特性,有消费者消费才开始生产,并且每次消费时,生产逻辑会重新开始执行
- 能利用 flow 强大的操作符
这就需要用到 ChannelFlow
了, 它是将 Flow
的特性与 Channel
的特性结合,实现多协程并发生产 + 按需收集。
ChannelFlow
ChannelFlow
的使用非常简单
使用示例
suspend fun channelFlowExample() {val channelFlow = channelFlow {println("\u001B[32m[ChannelFlow] 开始在独立协程中生产数据\u001B[0m")// 可以在这里启动多个协程launch {repeat(2) { i ->delay(100)send("协程1-数据$i")println("\u001B[32m[ChannelFlow] 协程1发送: 数据$i\u001B[0m")}}launch {repeat(3) { i ->delay(200)send("协程2-数据$i")println("\u001B[32m[ChannelFlow] 协程2发送: 数据$i\u001B[0m")}}}delay(2000)println("当有消费者开始收集数据时,上面的生产才会开始工作")channelFlow.collect { data ->delay(100)println("\u001B[34m[消费者1] 收到: $data\u001B[0m")}println()// 再一次收集,此时生产端会重新执行,得到全新数据channelFlow.collect { data ->delay(100)println("\u001B[34m[消费者2] 收到: $data\u001B[0m")}
}
执行结果:
使用还是非常简单的:像
Channel
一样用send
发送数据,像Flow
一样用collect
收集数据;有收集才生产,每次收集都“从头再来”。
ChannelFlow 的特点
在 API 使用层也能感受到:ChannelFlow
就是 Channel
与 Flow
的结合体,发送用 send
,接收用 collect
。
-
生产层(热):
channelFlow {}
提供ProducerScope
,它既是CoroutineScope
又是SendChannel
,因此可以launch{}
并send()
。 -
消费层(冷):只有调用
collect
时,成产层才真正开始生产;并且每次collect
都是一次全新的生产过程。 -
桥接层:内部用
Channel
承载并发生产的数据,再统一通过Flow
的收集端按序发送出去,天然的支持Flow
的特性
源码分析
channelFlow
构建器与 ProducerScope
// kotlinx-coroutines-core/common/src/flow/Builders.kt
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =ChannelFlowBuilder(block)
- 参数类型:
suspend ProducerScope<T>.() -> Unit
- 通过
ChannelFlowBuilder
构建ChannelFlow
// kotlinx-coroutines-core/common/src/channels/Produce.kt
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {public val channel: SendChannel<E>
}
ProducerScope
既是 CoroutineScope
,又是 SendChannel<T>
, 因此可以在代码块中开启协程并调用 send
。
ChannelFlowBuilder
// kotlinx-coroutines-core/common/src/flow/Builders.kt
private open class ChannelFlowBuilder<T>(private val block: suspend ProducerScope<T>.() -> Unit,context: CoroutineContext = EmptyCoroutineContext,capacity: Int = BUFFERED,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =ChannelFlowBuilder(block, context, capacity, onBufferOverflow)override suspend fun collectTo(scope: ProducerScope<T>) =block(scope) // 直接执行用户的生产代码块override fun toString(): String ="block[$block] -> ${super.toString()}"
}
-
ChannelFlowBuilder
返回ChannelFlow
类型,同时,持有了block
参数,也就是ProducerScope
-
提供了
collectTo(scope)
,实际上就是执行我们传入的生产逻辑代码块
ChannelFlow
// kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@InternalCoroutinesApi
public abstract class ChannelFlow<T>(@JvmField public val context: CoroutineContext,@JvmField public val capacity: Int,@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {//当执行 collect 时,这里的 collector 就是消费逻辑代码块override suspend fun collect(collector: FlowCollector<T>): Unit =coroutineScope {// 先新建一个生产者协程,用它来生产数据,然后调用FlowCollector.emitAll,把 Channel 中的数据发送给 FlowCollectorcollector.emitAll(produceImpl(this))}// 创建一个生产者协程,内部就是个 Channelpublic open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)protected abstract suspend fun collectTo(scope: ProducerScope<T>)
}
internal fun <E> CoroutineScope.produce(context: CoroutineContext = EmptyCoroutineContext,capacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,start: CoroutineStart = CoroutineStart.DEFAULT,onCompletion: CompletionHandler? = null,@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {val channel = Channel<E>(capacity, onBufferOverflow)val newContext = newCoroutineContext(context)val coroutine = ProducerCoroutine(newContext, channel)if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)coroutine.start(start, coroutine, block)return coroutine
}
-
collect
:每次collect
都会通过produceImpl
调用scope.produce
创建一个新的ReceiveChannel<T>
,并且开始执行生产逻辑。 -
collectToFun
:最终转调到子类(ChannelFlowBuilder
)的collectTo
,执行我们写在channelFlow {}
中的生产逻辑。
emitAllImpl
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =emitAllImpl(channel, consume = true)private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {ensureActive()var cause: Throwable? = nulltry {// 从 channel 中取数据for (element in channel) {// 再通过 FlowCollector 把数据 emit 出去emit(element)}} catch (e: Throwable) {cause = ethrow e} finally {if (consume) channel.cancelConsumed(cause)}
}
for (element in channel)
:顺序消费 Channel 中的数据并emit
给下游,一句话总结就是把数据从Channel
中取出来,并emit
转发到Flow
收集器
整体流程
channelFlow{}
内部通过ChannelFlowBuilder
构建ChannelFlow
, 并把ProducerScope
传给ChannelFlowBuilder
,此时,不会启动生产逻辑- 外部调用
channelFlow.collect { ... }
,开启消费 ChannelFlow.collect
会先创建ReceiveChannel
(produceImpl
)produce
启动ProducerCoroutine
,在ProducerScope
内执行channelFlow { ... }
的生产逻辑,内部是个Channel
,可并发launch{}
+send()
emitAll(channel)
逐个读取 Channel 中的元素并转发给下游收集器- 收集结束或异常/取消时,Channel 被适当取消或关闭,生产方协程随之停止
如果把 Flow
内部的流程搞清楚后,channelFlow
就会非常容易理解了,其实就是 Channel
跟
Flow
的结合,把数据的生产交给 Channel
,然后把数据消费交给 Flow
,中间做了一层缓冲,channelFlow
的作用就是把数据从Channel
读取出来,并 emit
给下游的 FlowCollector
CallbackFlow:回调转 Flow
channelFlow
解决了多协程并发生产数据的问题。但在实际开发中还有一类高频场景:将传统回调(Listener/Callback)转换为响应式Flow,例如:传感器监听、网络状态变化、UI 事件等。这就需要用到 callbackFlow
了
与“回调转挂起”的区别
我在另一篇文章系统讲解了“回调转挂起”,还不清楚的同学请先阅读 ——回调转挂起
-
suspendCancellableCoroutine(回调→挂起函数)
- 面向:一次性回调结果(single-shot)。
- 产物:一个
suspend
函数;每次调用只产出一次结果。 - 适用:如登录返回一次 Token、文件选择回调一次路径。
-
callbackFlow(回调→多次发射的 Flow)
- 面向:多次/持续性回调(multi-shot)。
- 产物:一个可被收集的
Flow
;回调每次触发就向流中发射一次。 - 适用:如传感器/网络状态/事件总线等持续产生事件的来源。
用哪一个:
- 单次回调 →
suspendCancellableCoroutine
- 多次/持续回调 →
callbackFlow
基本使用
suspend fun callbackFlowExample() {val cf = callbackFlow {println("\u001B[32m[CallbackFlow] 注册定时回调,开始生产数据\u001B[0m")val timer = Timer()val task = object : TimerTask() {override fun run() {val tick = "tick-" + System.currentTimeMillis()//使用trySendval ok = trySend(tick).isSuccessif (ok) println("\u001B[32m[CallbackFlow] 发送: $tick\u001B[0m")}}timer.scheduleAtFixedRate(task, 0L, 200L)// 手动处理关闭资源awaitClose {println("\u001B[31m[CallbackFlow] 取消定时回调并清理资源\u001B[0m")timer.cancel()}}println("\n\u001B[36m[CallbackFlow] 第一次收集(take 5)\u001B[0m")cf.take(5).collect { v -> println("\u001B[34m[消费者] 收到: $v\u001B[0m") }println("\n\u001B[36m[CallbackFlow] 第二次收集(take 3)\u001B[0m")cf.take(3).collect { v -> println("\u001B[34m[消费者] 收到: $v\u001B[0m") }
}
CallbackFlow 使用的套路
- 使用 trySend:在回调中使用
trySend
而不是send
- 必须调用 awaitClose:让生产侧与资源的生命周期与
Flow
收集对齐- 若不调用,
callbackFlow { ... }
的代码块很快返回,内部通道会被关闭,生产协程(或回调注册)被取消,后续发送被丢弃 - 在
awaitClose { ... }
中做资源释放(注销回调/关闭传感器/停止定时器),并保证在收集取消/完成前保持存活
- 若不调用,
- 异常处理:在回调中捕获异常,避免崩溃
- 生命周期:确保 Flow 的生命周期与回调资源绑定
为什么必须 callbackFlow
必须要手动 awaitClose
,而 channelFlow
不需要
-
callbackFlow
的代码块返回即表示生产端搭建完成;内部使用trySend
而不是send
, 如果没有awaitClose
持续挂起等待,下游一旦还未开始或很快结束,内部通道会被关闭,生产协程/回调被取消,后续发送丢失。awaitClose
的作用如下- 保持生产端“存活”,直到收集侧取消/完成;
- 在关闭前执行资源清理
-
channelFlow
:不需要awaitClose
- 生产逻辑完全运行在
ProducerScope
内,使用send/launch/delay
等挂起机制; - 下游
collect
结束/取消时,ProducerScope
被取消,其子协程与内部通道随之关闭并清理; - 代码块返回即代表生产结束,生命周期由作用域自动托管,无需手动“挂住”。
- 生产逻辑完全运行在
简要对比:
// channelFlow:结构化并发,作用域关闭即清理
val f1 = channelFlow {launch { repeat(3) { send(it); delay(100) } }// 无需 awaitClose
}// callbackFlow:外部回调,必须 awaitClose 维持生命周期
val f2 = callbackFlow {val cb = object : Listener {override fun onEvent(v: Int) {trySend(v)}}register(cb)awaitClose { unregister(cb) }
}
应用场景
1. 定时器回调转Flow
val timerFlow = callbackFlow<Long> {val timer = Timer()val task = object : TimerTask() {override fun run() {trySend(System.currentTimeMillis())}}timer.scheduleAtFixedRate(task, 0, 1000)awaitClose {timer.cancel() // 清理定时器资源}
}
2. 网络状态监听
val networkStatusFlow = callbackFlow<NetworkStatus> {val callback = object : NetworkCallback {override fun onStatusChanged(status: NetworkStatus) {trySend(status)}}networkManager.registerCallback(callback)awaitClose {networkManager.unregisterCallback(callback)}
}
源码分析
callbackFlow
并没有“新增能力”,它是 channelFlow
的专用语义版本,核心价值在于:更清晰的意图表达、更一致的最佳实践与更好的可维护性。
回调转 Flow
这件事用 channelFlow
也能实现,但用 callbackFlow
一眼就能看出“这是在包装回调”的意图,团队协作与规范更友好。
由于机制与 channelFlow
完全一致,源码部分就不再赘述了。
- 构建器:
callbackFlow { ... } -> CallbackFlowBuilder(block)
(继承自ChannelFlowBuilder
)。 - 收集链路:
collect -> produceImpl -> scope.produce -> 内部 Channel -> emitAll(channel)
。
Channel 转 Flow
ChannelFlow
和 CallbackFlow
都是对 Channel
的包装:在收集阶段内部创建一个 Channel
,将生产端写入的数据,按序转发给下游的FlowCollector
。
还有另一种常见诉求:将已有的 Channel
直接转换为 Flow
。转换为 Flow
后,就可以使用 Flow
丰富的操作符体系来处理数据。
Kotlin 协程提供了两个扩展函数:consumeAsFlow()
和 receiveAsFlow()
,它们都能将 Channel
转换为 Flow
,但行为存在重要差异。
consumeAsFlow:一次性消费
consumeAsFlow
会消费并关闭 Channel
,转换得到的 Flow
只能使用一次。
基本使用:
suspend fun consumeAsFlowExample() {val channel = Channel<String>(capacity = 5)// 生产者GlobalScope.launch {println("\u001B[32m[生产者][consumeAsFlow] 启动\u001B[0m")repeat(5) { i ->val v = "数据-$i"channel.send(v)println("\u001B[32m[生产者][consumeAsFlow] 发送: $v\u001B[0m")}channel.close()println("\u001B[31m[生产者][consumeAsFlow] 关闭 Channel\u001B[0m")}// 转换为Flow并消费val consumeAsFlow = channel.consumeAsFlow()println("\u001B[36m[系统] 第一次收集 consumeAsFlow\u001B[0m")consumeAsFlow.collect { data ->println("\u001B[34m[消费者][consumeAsFlow] 收到: $data\u001B[0m")}println("\u001B[36m[系统] 第二次收集(预期失败)\u001B[0m")try {consumeAsFlow.collect { println("\u001B[34m[消费者][consumeAsFlow] 收到: $it\u001B[0m") }} catch (e: Throwable) {println("\u001B[31m[错误][consumeAsFlow] ${e.message}\u001B[0m")}
}
源码分析
// 首先限定扩展类型为ReceiveChannel, 保证只能使用 ReceiveChannel,然后把自身作为参数传递给ChannelAsFlow,并且标记是一次性消费
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)
private class ChannelAsFlow<T>(private val channel: ReceiveChannel<T>, // 原始 Channel 实例private val consume: Boolean, // 是否一次性消费:consumeAsFlow() 下为 truecontext: CoroutineContext = EmptyCoroutineContext,capacity: Int = Channel.OPTIONAL_CHANNEL,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {// 记录是否已被消费private val consumed = atomic(false)private fun markConsumed() {if (consume) {// 已消费则报错,保证一次性语义check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }}}override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow)override fun dropChannelOperators(): Flow<T> =ChannelAsFlow(channel, consume)override suspend fun collectTo(scope: ProducerScope<T>) =SendingCollector(scope).emitAllImpl(channel, consume) // 直接将 Channel 中的数据逐个转发到下游override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {markConsumed() // 重复收集快速失败return if (capacity == Channel.OPTIONAL_CHANNEL) {channel // 直连通道:不再额外创建缓冲 Channel} else {super.produceImpl(scope) // 走额外缓冲通道路径}}override suspend fun collect(collector: FlowCollector<T>) {if (capacity == Channel.OPTIONAL_CHANNEL) {markConsumed()collector.emitAllImpl(channel, consume) // 直连读取原始 Channel} else {super.collect(collector) // 额外缓冲通道路径,produceImpl 会负责标记已消费}}override fun additionalToStringProps(): String = "channel=$channel"
}
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {ensureActive()var cause: Throwable? = nulltry {for (element in channel) {emit(element)}} catch (e: Throwable) {cause = ethrow e} finally {// 如果是一次性消费,则取消已消费的元素if (consume) channel.cancelConsumed(cause)}
}
重点
-
consume 标记:
consume
在consumeAsFlow()
下为true
,表示“一次性消费,并在结束后关闭”。
-
保证只能收集一次:
markConsumed()
使用原子标志consumed
抢占置位;若再次收集立刻抛出异常。- 因此第一次
collect
正常,第二次collect
会立即失败。
-
直连优化(不创建额外
Channel
):- 当
capacity == Channel.OPTIONAL_CHANNEL
时,启用“直连通道”优化:直接复用原始channel
,不再重新创建 Channel。 - 在直连模式下,
produceImpl/collect
都会优先读原始channel
,并在入口处调用markConsumed()
,一次性语义即时生效。
- 当
-
为什么限制“只能收集一次”:
- 选择
consumeAsFlow
意味着Flow
对该Channel
拥有“消费并关闭”的所有权;重复收集会引入所有权与关闭语义的冲突,因此在源码层面禁止第二次收集。
- 选择
-
cancelConsumed(cause)
的作用:- 在
emitAllImpl(channel, consume)
的finally
中,如果consume=true
会调用channel.cancelConsumed(cause)
; - 这一步用于在消费完成或出现异常时,自动取消/关闭底层 Channel,并传递可选的异常原因
cause
; - 与
markConsumed()
配合,共同形成“一次性消费 + 自动关闭”的完整语义闭环。
- 在
consumeAsFlow
本质是用ChannelFlow
外壳接管ReceiveChannel
,通过consume
与原子标志强制“一次性消费 + 自动关闭”。
receiveAsFlow:共享接收(竞争消费)
receiveAsFlow
会将 Channel
持续暴露为 Flow
,但不会自动关闭 Channel。你可以多次收集,或并发收集者“竞争式”接收同一条底层通道上的数据。
基本使用:
suspend fun receiveAsFlowExample() {val channel = Channel<Int>(capacity = Channel.UNLIMITED)// 生产者持续生产GlobalScope.launch {println("\u001B[32m[生产者][receiveAsFlow] 启动\u001B[0m")repeat(10) { i ->val value = i + 1channel.send(value)println("\u001B[32m[生产者][receiveAsFlow] 发送: $value\u001B[0m")delay(20)}// 不关闭,交由外部控制println("\u001B[33m[生产者][receiveAsFlow] 生产结束(未关闭通道)\u001B[0m")}val receiveAsFlow = channel.receiveAsFlow()// 第一个消费者GlobalScope.launch {receiveAsFlow.collect {delay(100)println("\u001B[34m[消费者A][receiveAsFlow]: $it\u001B[0m")}}// 第二个消费者GlobalScope.launch {receiveAsFlow.collect {delay(200)println("\u001B[35m[消费者B][receiveAsFlow]: $it\u001B[0m")}}// 一段时间后手动关闭 Channeldelay(1000)println("\u001B[31m[系统] 关闭 Channel\u001B[0m")//自行决定关闭时机channel.close()
}
源码分析
// 直接包装底层 Channel,但不标记一次性消费
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)
由于底层同样复用 ChannelAsFlow
,与 consumeAsFlow
的差异点在于 consume=false
:
- 不会通过
markConsumed()
施加“一次性消费”的限制; - 不会在流程结束时自动
cancel/close
底层Channel
; - 仍可能走
capacity == Channel.OPTIONAL_CHANNEL
的“直连通道”优化路径。
进一步结合 emitAllImpl(channel, consume)
看差异:
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {ensureActive()var cause: Throwable? = nulltry {for (element in channel) {emit(element)}} catch (e: Throwable) {cause = ethrow e} finally {if (consume) channel.cancelConsumed(cause) // receiveAsFlow 下为 false,不会自动关闭}
}
也就是说,在 receiveAsFlow
下,遍历结束后不会触发 cancelConsumed
,生命周期由调用方或生产方控制。这正是它能“持续使用/多次收集/并发收集”的关键差异。
总结
至此,ChannelFlow
, CallbackFlow
, consumeAsFlow
, receiveAsFlow
就都介绍完毕了,来整体对比看下
方式 | 角色定位 | 是否自动关闭 | 是否可多次收集 | 典型场景 |
---|---|---|---|---|
ChannelFlow | 并发生产 + 冷流收集 | 否 | 是(每次重放) | 多协程并发生产且按需收集 |
CallbackFlow | 回调→Flow 语义外壳 | 否 | 是(每次重放) | 回调/监听封装(trySend/awaitClose) |
consumeAsFlow | 一次性消费 + 自动关闭 | 是 | 否 | 批处理/一次性转换后释放资源 |
receiveAsFlow | 共享接收(竞争消费) | 否 | 是(竞争式) | 多消费者竞争同一来源,自行管理生命周期 |
好了, 本篇文章就是这些,希望能帮到你。
感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客