Kotlin Flow 与“天然背压”(完整示例)
关键词:冷流 + 挂起发射 → 下游慢,
emit()
等。需要“让上游不被完全拖慢”时,用buffer / conflate / collectLatest / debounce / sample
。
3.1 基线示例:下游慢 → 上游 emit()
被背压挂起
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*fun main() = runBlocking {val f = flow {repeat(3) { i ->println("emit $i @${System.currentTimeMillis()%100000}")emit(i) // 下游没处理完,这里会挂起等待}}f.collect { v ->delay(300) // 模拟慢消费者println("consume $v @${System.currentTimeMillis()%100000}")}
}
预期日志(顺序交替,证明 emit 在等):
emit 0 @12345
consume 0 @12648
emit 1 @12649
consume 1 @12953
emit 2 @12954
consume 2 @13257
3.2 buffer(n)
:在上/下游之间放小仓库,让上游先跑一段
fun main() = runBlocking {val f = flow {repeat(5) { i ->println("emit $i")emit(i)}}f.buffer(2) // 上游可先放 2 条再等下游.collect { v ->delay(300) // 下游仍慢println("consume $v")}
}
观察点:前两次 emit
基本不会等,下游开始消费后才逐步“跟上”。
3.3 conflate()
:丢弃中间值,只保最新(适合“状态”)
fun main() = runBlocking {val f = flow {(0..5).forEach { i ->println("emit $i"); emit(i); delay(50) // 上游较快}}f.conflate() // 只保最新,可能 0 -> 3 -> 5 这样跳.collect { v ->delay(150) // 下游更慢println("consume $v")}
}
预期现象:consume
会跳号(中间值被合并),只执行最新几次。
3.4 collectLatest{}
:新值到来就取消上一次处理(可覆盖任务)
fun main() = runBlocking {(1..4).asFlow().onEach { println("emit $it") }.collectLatest { v ->println("start $v")try {delay(200) // 模拟耗时处理println("done $v")} finally {println("cancel $v (if not finished)")}}
}
预期现象:对 1、2、3 的处理会在新值到来时被取消,最终只看到 done 4
。
3.5 debounce()/sample()
:防抖与抽样(控频)
fun main() = runBlocking {// 模拟用户输入抖动:每 40ms 一个字符val typing = flow {"android".forEachIndexed { idx, c ->emit("$idx:$c"); delay(40)}}println("---- debounce(100) 只发最后一次稳定输入 ----")typing.debounce(100).collect { println("debounce -> $it") }println("---- sample(120) 每 120ms 采样一次 ----")typing.sample(120).collect { println("sample -> $it") }
}
预期现象:
debounce(100)
:每段输完后只发最后一个;sample(120)
:每 120ms 取一个最近值,可能跳过一些输入。
3.6 flowOn(Dispatchers.IO)
:切换上游线程,但背压语义不变
fun main() = runBlocking {val f = flow {repeat(3) { i ->println("emit $i on ${Thread.currentThread().name}")emit(i)}}f.flowOn(Dispatchers.IO) // 上游在 IO.collect { v ->delay(200)println("consume $v on ${Thread.currentThread().name}")}
}
要点:即便上游在 IO 线程,下游慢时 emit()
仍会等待(天然背压没变)。
3.7 回调源的背压(callbackFlow
+ buffer/conflate
)
回调是“推模式”,要自己决定承压策略:
无
buffer
:trySend
易失败(丢);用
send
:缓冲满会挂起回调线程(主线程慎用)。
fun motorAngles(): Flow<Int> = callbackFlow {val l = object: AngleListener {override fun onAngle(d:Int) { trySend(d).isSuccess } // 非挂起推送}motor.addListener(l)awaitClose { motor.removeListener(l) }
}.buffer(64) // 给回调 64 槽缓冲.conflate() // 只要最新(状态类).flowOn(Dispatchers.IO)
小结(可放在节末的一句话)
天然背压 = 下游慢 → 上游
emit()
挂起等待;想让上游不被完全拖慢:在链路上加
buffer / conflate / collectLatest / debounce / sample
;回调源用
callbackFlow
,配合buffer
与conflate
明确你的承压与合并策略;flowOn(IO)
只换“在哪跑”,不改变背压本质。