当前位置: 首页 > news >正文

【Android】kotlin.flow简介

一,概述

kotlin中flow可与协程配合使用,从而可以更加轻量级地处理io任务。flow在kotlin分为冷流和热流,所谓冷流即没有订阅者就不会触发,只有collect操作后,就触发flow。热流是即便不存在订阅者,也可以emit操作。

本文主要讲述冷流和热流的基本使用。

二,冷流

冷流主要由工厂方法创建,每次collect便会触发一次冷流,示例如下:

val coldFlow = flow {emit("准备获取数据")for (i in 1..10) {println("emit$i thread=${Thread.currentThread().name}")delay(200)emit(i)}emit("complete")}.flowOn(Dispatchers.IO)//指定collect触发上下文,直接flow.collect即runBlocking上下文环境,此处mainprintln("开始 coldFlow collect")coldFlow.collect { value ->println("collect: $value thread=${Thread.currentThread().name}")}

日志如下,当collect后才会触发emit,且collect消耗完emit数据后,就会返回。

注意,冷流可重复使用,二次触发collect,会重新开始emit。

冷流在作用上类似Steam流,如下,可作数据过滤操作。

flowOf(1, 2, 3, 4, 5).filter { it >= 2 }.map { it * 2 }.collect {println("collect$it")}

扩展方法flowOn可指定冷流触发的上下文线程。

三,热流

1,SharedFlow

热流是Android常用场景,以SharedFlow为代表。

SharedFlow实现类MutableSharedFlow,示例如下:

//热流,没有collect,也会触发flow内数据
val hotFlow1 = MutableSharedFlow<Int>(replay = 5)
for (i in 1..10) {hotFlow1.tryEmit(i)
}//指定collect触发上下文,直接flow.collect即runBlocking上下文环境,此处main
launch {withContext(Dispatchers.IO) {hotFlow1.collect {println("hot1 flow collect: $it thread=${Thread.currentThread().name}")}}
}

创建一个MutableSharedFlow,由于热流的collect是阻塞的,因此需要新开一个协程collect,这里可通过withContext指定collect上下文。

热流相比冷流多了一个tryEmit方法,这是非阻塞emit方法,当热流已经关闭,或emit缓冲区满了后,会立即返回false,代表emit失败。

在上述例子中,MutableSharedFlow指定replay=5,代表订阅此热流的新订阅者,会收到缓存区最新的5个数据,输入如下

随后重点讲解下MutableSharedFlow构造方法参数,

三个重要参数:

reply:大于等于0,代表新订阅者能从缓冲中获取的数据个数。当reply=1,就等效LiveData,适合处理状态类数据。当reply=0,则等效无sticky,新订阅者只会收到订阅后的消息。

extraBufferCapacity:额外缓冲区,reply+extraBufferCapacity代表总的缓存数量,当无可用缓存,且策略是BufferOverflow.SUSPEND,emit方法会被挂起直到缓冲被消耗,tryEmit则会立即返回false。

onBufferOverflow:缓冲区满了后的策略,提供了如下三种:

SUSPEND :会挂起当前emit,直到emit缓冲-1。如果是tryEmit立即返回false。

DROP_OLDEST:emit立即返回不会被挂起,缓冲区会丢弃最旧的数据。

DROP_LATEST:emit立即返回不会被挂起,缓冲区会丢弃最新的数据。

现给出几个例子体会下这三个参数:

replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND
val hotFlow2 = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
launch {hotFlow2.collect {println("hot2 flow collect: $it thread=${Thread.currentThread().name}")}
}
delay(200)
for(i in 1..10) {println("tryEmit ${hotFlow2.tryEmit(i)}")
}
delay(200)hotFlow2.tryEmit(100)

所谓的emit缓冲,指的是emit到buffer中,还未被Collector消费而暂存的buffer。上述在collect后连续使用tryEmit发送10条消息,但由于buffer=reply+extrabuffer==1,因此在collect消耗完第一条消息前,后续的tryEmit都是失败返回false,输入如下

简单将上述代码改为

replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST

后,效果就不一样了,DROP策略不会挂起,因此tryEmit一定返回true。

emit buffer满了后,会丢弃OLDEST消息,因此接收到10;

再将上述代码改为

replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST

emit buffer满了后,会丢弃LATEST消息,因此接收到1;

如果想要使用SUSPEND策略,并且在buffer满了后,挂起emit,则直接起一个协程emit即可,这可以实现TransformQueue和BlockQueue效果

如消费速度决定生产速度的flow

val hotFlow2 = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)
launch {hotFlow2.collect {println("hot2 flow collect: $it thread=${Thread.currentThread().name}")}
}
delay(200)launch {for(i in 1..10) {println("hot flow2 emit ${i}")hotFlow2.emit(i)}
}

输出如下:只有collect后,后续的emit才会执行。

一般的阻塞队列呢?很简单,定义如下:

val hotFlow2 = MutableSharedFlow<Int>(replay = 5, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)

replay参数可以理解为队列大小参数。

基于以上,可知

Android官方LiveData

LiveData = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST)

无Sticky的LiveData

无stickyLiveData = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

TransformQueue

queue = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)

BlockQueue

queue MutableSharedFlow<Int>(replay = 5/*队列大小*/, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)

2,StateFlow

如果想简单使用Flow类似LiveData,还可以直接使用StateFlow。

val stateFLow = MutableStateFlow(1)launch {stateFLow.collect {println("state flow collect: $it thread=${Thread.currentThread().name}")}}delay(1000)for (i in 1..10) {stateFLow.tryEmit(i)}

这里tryEmit和emit方法实现相同,都是立即返回不会挂起。StateFlow必须有一个初始状态,且collect只会接受到最新的值,因为StateFlow没有replayCache。

http://www.dtcms.com/a/418241.html

相关文章:

  • 如何在电脑上编辑三星联系人
  • Java开发环境搭建之 9.使用Docker Compose 安装部署RabbitMQ
  • 智能家居:从设备互联到智慧感知的技术演进
  • 做网站是个什么行业网站设计示例
  • D018 vue+django 旅游图谱推荐问答系统|neo4j数据库|智能问答
  • 11. Jmeter性能与优化
  • 水脉织城・文脉映画:泰州城市旅游宣传片的专业化叙事路径
  • QT文件解析与乱码问题
  • 医疗编程AI技能树与培训技能树报告(国内外一流大学医疗AI相关专业分析2025版,下)
  • seo网站快速排名企业域名怎么填写
  • 谈谈数学和式的理解和应用
  • 【Linux指令 (一)】Linux 命令行入门:从零开始理解Linux系统理论核心概念与基础指令
  • 网站建设案例赏析网站制作比较好的制作公司
  • 线上JVM问题定位常用命令
  • 通过配置 GitLab 自动触发项目自动化构建与部署
  • 【qml-12】Quick3D实现机器人鼠标拖拽转换视角(无限角度)与滚轮缩放
  • h5实现一个吸附在键盘上的工具栏
  • 解决 sqlplus / as sysdba 登录缓慢问题
  • Mysql DBA学习笔记(主从复制)
  • 网站开发的交付文档企业策划是什么意思
  • 个人网站主机选择上海品牌全案设计公司
  • 布谷娱乐直播系统源码开发实用功能:技术驱动更迭的创新体验
  • ArcGIS JSAPI 高级教程 - 高亮效果优化之开启使用多高亮样式
  • 元宇宙的搜索引擎:如何在虚拟世界中查找信息
  • Unity-AB包
  • Rust开发环境配置
  • 【mdBook】5.2.1 通用配置
  • Rust自定义函数引用
  • Java 集合体系 —— List 篇
  • 操作系统应用开发(十一)RustDesk在线编译自己客户端——东方仙盟筑基期