当前位置: 首页 > news >正文

Kotlin Flow 与“天然背压”(完整示例)

关键词:冷流 + 挂起发射下游慢,emit()。需要“让上游不被完全拖慢”时,用 buffer / conflate / collectLatest / debounce / sample

3.1 基线示例:下游慢 → 上游 emit() 被背压挂起

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*fun main() = runBlocking {val f = flow {repeat(3) { i ->println("emit $i @${System.currentTimeMillis()%100000}")emit(i)                      // 下游没处理完,这里会挂起等待}}f.collect { v ->delay(300)                     // 模拟慢消费者println("consume $v @${System.currentTimeMillis()%100000}")}
}

预期日志(顺序交替,证明 emit 在等):

emit 0 @12345
consume 0 @12648
emit 1 @12649
consume 1 @12953
emit 2 @12954
consume 2 @13257

3.2 buffer(n):在上/下游之间放小仓库,让上游先跑一段

fun main() = runBlocking {val f = flow {repeat(5) { i ->println("emit $i")emit(i)}}f.buffer(2)              // 上游可先放 2 条再等下游.collect { v ->delay(300)           // 下游仍慢println("consume $v")}
}

观察点:前两次 emit 基本不会等,下游开始消费后才逐步“跟上”。


3.3 conflate():丢弃中间值,只保最新(适合“状态”)

fun main() = runBlocking {val f = flow {(0..5).forEach { i ->println("emit $i"); emit(i); delay(50) // 上游较快}}f.conflate()            // 只保最新,可能 0 -> 3 -> 5 这样跳.collect { v ->delay(150)          // 下游更慢println("consume $v")}
}

预期现象consume 会跳号(中间值被合并),只执行最新几次。


3.4 collectLatest{}:新值到来就取消上一次处理(可覆盖任务)

fun main() = runBlocking {(1..4).asFlow().onEach { println("emit $it") }.collectLatest { v ->println("start $v")try {delay(200)        // 模拟耗时处理println("done $v")} finally {println("cancel $v (if not finished)")}}
}

预期现象:对 1、2、3 的处理会在新值到来时被取消,最终只看到 done 4


3.5 debounce()/sample():防抖与抽样(控频)

fun main() = runBlocking {// 模拟用户输入抖动:每 40ms 一个字符val typing = flow {"android".forEachIndexed { idx, c ->emit("$idx:$c"); delay(40)}}println("---- debounce(100) 只发最后一次稳定输入 ----")typing.debounce(100).collect { println("debounce -> $it") }println("---- sample(120) 每 120ms 采样一次 ----")typing.sample(120).collect { println("sample  -> $it") }
}

预期现象

  • debounce(100):每段输完后只发最后一个;

  • sample(120):每 120ms 取一个最近值,可能跳过一些输入。


3.6 flowOn(Dispatchers.IO):切换上游线程,但背压语义不变

fun main() = runBlocking {val f = flow {repeat(3) { i ->println("emit $i on ${Thread.currentThread().name}")emit(i)}}f.flowOn(Dispatchers.IO)        // 上游在 IO.collect { v ->delay(200)println("consume $v on ${Thread.currentThread().name}")}
}

要点:即便上游在 IO 线程,下游慢时 emit() 仍会等待(天然背压没变)。


3.7 回调源的背压(callbackFlow + buffer/conflate

回调是“推模式”,要自己决定承压策略:

  • buffertrySend 易失败(丢);

  • send:缓冲满会挂起回调线程(主线程慎用)。

fun motorAngles(): Flow<Int> = callbackFlow {val l = object: AngleListener {override fun onAngle(d:Int) { trySend(d).isSuccess } // 非挂起推送}motor.addListener(l)awaitClose { motor.removeListener(l) }
}.buffer(64)        // 给回调 64 槽缓冲.conflate()        // 只要最新(状态类).flowOn(Dispatchers.IO)

小结(可放在节末的一句话)
  • 天然背压 = 下游慢 → 上游 emit() 挂起等待;

  • 想让上游不被完全拖慢:在链路上加 buffer / conflate / collectLatest / debounce / sample

  • 回调源用 callbackFlow,配合 bufferconflate 明确你的承压与合并策略;

  • flowOn(IO) 只换“在哪跑”,不改变背压本质

http://www.dtcms.com/a/449504.html

相关文章:

  • Kotlin invoke 函数调用重载
  • 郑州网站建设培训学校昆明怎样优化网站
  • XMLHttpRequest 异步请求servlet 上传文件并且带有参数
  • Python私教FastAPI+React构建Web应用01 概述
  • 深入理解操作系统进程:管理的本质与“先描述,再组织“的核心逻辑
  • 网站手机自适应无锡产品排名优化
  • 深度学习(十五):Dropout
  • 收录提交大全成都百度seo推广
  • wordpress本地更换为网站域名龙华区网站建设
  • 高佣金返利平台的数据一致性挑战:基于Seata的分布式事务解决方案与补偿机制设计
  • 外包网站开发多少钱安监局网站做应急预案备案
  • go build命令
  • Go语言入门(22)-goroutine
  • 网站建设及编辑岗位职责网站做查赚钱
  • 开源革命下的研发突围:Meta Llama系列模型的知识整合实践与启示
  • 做的网站怎样更新排名优化网站seo排名
  • 鸿蒙NEXT网络通信实战:使用HTTP协议进行网络请求
  • FastApi项目启动失败 got an unexpected keyword argument ‘loop_factory‘
  • 杭州有专业做网站的吗用.net做购物网站
  • 什么是 mesh 组网
  • 网站建设什么行业创建个人网站教案
  • 十五、深入理解 SELinux
  • 10.6作业
  • 《投资-70》投资、投机、赌博的比较,一个靠企业内在的价值增值、一个靠市场的价格波动、一个全靠随机性的运气。
  • 前端知识详解——HTML/CSS/Javascript/ES5+/Typescript篇/算法篇
  • 【MySQL】 索引特性详解
  • FreeRTOS实现微秒级时间同步(基于1588V2)
  • 网站基本要素网站建设 主要学是么
  • Java包的命名,常见的包类,如何导入包
  • 滑动窗口题目:替换后的最长重复字符