当前位置: 首页 > news >正文

Flow原理

fun main() {runBlocking {launch {flow4.collect{println("---collect-4")}println("---flow4")}}val flow4 = flow<Boolean>{delay(5000)emit(false)
}

我们分析下整个流程 

1.flow为什么之后在collect之后才会发送数据

2.collect的调用流程

我们先看创建flow的方法

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

可以看见是一个方法,返回的是一个SafeFlow对象,然后把我们传入 {
      delay(5000)
       emit(false)
}   
表达式 
的传入到了这个对象中。

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {override suspend fun collectSafely(collector: FlowCollector<T>) {collector.block()}
}

然后,这个对象提供一个方法collectSafely,这个时候可以看到,要发送数据,必须得调用SafeFlow的collectSafely方法才行。

接下来我们分析下collect方法。看源码发现需要传入一个FlowCollector接口实现类

public suspend fun collect(collector: FlowCollector<T>)
public fun interface FlowCollector<in T> {//注意这个emit方法public suspend fun emit(value: T)
}

因为我们是使用flow方法返回的SafeFlow对象去调用的,所以我们看下SafeFlow的collect方法。SafeFlow是继承AbstractFlow的类,所以我们看这个类就行

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {public final override suspend fun collect(collector: FlowCollector<T>) {//创建SafeCollector对象val safeCollector = SafeCollector(collector, coroutineContext)try {//调用实现类的collectSafely方法,把SafeCollector对象传递过去collectSafely(safeCollector)} finally {safeCollector.releaseIntercepted()}}public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

再接着查看collectSafely方法,发现调用到了我们传入的闭包

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {override suspend fun collectSafely(collector: FlowCollector<T>) {//调用我们传入的闭包,而且他是FlowCollector的扩展函数collector.block()}
}

而我们传入的闭包是suspend FlowCollector<T>.() -> Unit扩展函数,这个对象就是我们collect传入的接口实现类,所以在

val flow4 = flow<Boolean>{
      delay(5000)
       emit(false)
}

方法中调用emit()实际就是调用collect传入接口实现类的emit方法


 

相关文章:

  • 使用TortoiseGit进行文件比较
  • JAVA-ArrayList使用方法
  • RecoNIC 入门:SmartNIC 上支持 RDMA 的计算卸载-FPGA-智能网卡-AMD-Xilinx
  • 实战指南:搭建AIRIOT全场景智慧养老管理平台系统全流程解析
  • 使用手机录制rosbag包
  • 高性价比手机如何挑选?
  • 基于 SSE 和分块传输的 Uniapp 微信小程序 实现 流式传输 对话
  • 第十二节:性能优化高频题-shallowRef/shallowReactive使用场景
  • Kotlin await等待多个异步任务都完成后才进行下一步操作
  • web技术与nginx网站环境部署
  • docker搭建swarm集群
  • node.js 实战——mongoDB
  • 【Docker】——在Docker工具上安装创建容器并完成项目部署
  • Flink HA 总结
  • 人工智能大语言模型与AI芯片新进展:技术演进与商业化路径
  • 【3D基础】深入解析OBJ与MTL文件格式:Blender导出模型示例及3D开发应用
  • 【Linux】第十二章 安装和更新软件包
  • deepseek对IBM MQ SSL 证书算法的建议与解答
  • 自动驾驶L4级技术落地:特斯拉、Waymo与华为的路线之争
  • [三分钟]web自动化测试(三):selenium自动化测试常用函数(下)
  • 云南一餐馆收购长江野生鱼加工为菜品,被查处罚款
  • 看展览|2025影像上海艺博会:市场与当代媒介中的摄影
  • 国家税务总局泰安市税务局:山东泰山啤酒公司欠税超536万元
  • 司法部:民营经济促进法明确禁止违规异地执法、利用行政或者刑事手段违法干预经济纠纷
  • 北京:下调个人住房公积金贷款利率
  • 复旦设立新文科发展基金,校友曹国伟、王长田联合捐赠1亿元