【Android】kotlin.flow简介
一,概述
kotlin中flow可与协程配合使用,从而可以更加轻量级地处理io任务。flow在kotlin分为冷流和热流,所谓冷流即没有订阅者就不会触发,只有collect操作后,就触发flow。热流是即便不存在订阅者,也可以emit操作。
本文主要讲述冷流和热流的基本使用。
二,冷流
冷流主要由工厂方法创建,每次collect便会触发一次冷流,示例如下:
val coldFlow = flow {emit("准备获取数据")for (i in 1..10) {println("emit$i thread=${Thread.currentThread().name}")delay(200)emit(i)}emit("complete")}.flowOn(Dispatchers.IO)//指定collect触发上下文,直接flow.collect即runBlocking上下文环境,此处mainprintln("开始 coldFlow collect")coldFlow.collect { value ->println("collect: $value thread=${Thread.currentThread().name}")}
日志如下,当collect后才会触发emit,且collect消耗完emit数据后,就会返回。
注意,冷流可重复使用,二次触发collect,会重新开始emit。
冷流在作用上类似Steam流,如下,可作数据过滤操作。
flowOf(1, 2, 3, 4, 5).filter { it >= 2 }.map { it * 2 }.collect {println("collect$it")}
扩展方法flowOn可指定冷流触发的上下文线程。
三,热流
1,SharedFlow
热流是Android常用场景,以SharedFlow为代表。
SharedFlow实现类MutableSharedFlow,示例如下:
//热流,没有collect,也会触发flow内数据
val hotFlow1 = MutableSharedFlow<Int>(replay = 5)
for (i in 1..10) {hotFlow1.tryEmit(i)
}//指定collect触发上下文,直接flow.collect即runBlocking上下文环境,此处main
launch {withContext(Dispatchers.IO) {hotFlow1.collect {println("hot1 flow collect: $it thread=${Thread.currentThread().name}")}}
}
创建一个MutableSharedFlow,由于热流的collect是阻塞的,因此需要新开一个协程collect,这里可通过withContext指定collect上下文。
热流相比冷流多了一个tryEmit方法,这是非阻塞emit方法,当热流已经关闭,或emit缓冲区满了后,会立即返回false,代表emit失败。
在上述例子中,MutableSharedFlow指定replay=5,代表订阅此热流的新订阅者,会收到缓存区最新的5个数据,输入如下
随后重点讲解下MutableSharedFlow构造方法参数,
三个重要参数:
reply:大于等于0,代表新订阅者能从缓冲中获取的数据个数。当reply=1,就等效LiveData,适合处理状态类数据。当reply=0,则等效无sticky,新订阅者只会收到订阅后的消息。
extraBufferCapacity:额外缓冲区,reply+extraBufferCapacity代表总的缓存数量,当无可用缓存,且策略是BufferOverflow.SUSPEND,emit方法会被挂起直到缓冲被消耗,tryEmit则会立即返回false。
onBufferOverflow:缓冲区满了后的策略,提供了如下三种:
SUSPEND :会挂起当前emit,直到emit缓冲-1。如果是tryEmit立即返回false。
DROP_OLDEST:emit立即返回不会被挂起,缓冲区会丢弃最旧的数据。
DROP_LATEST:emit立即返回不会被挂起,缓冲区会丢弃最新的数据。
现给出几个例子体会下这三个参数:
replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND
val hotFlow2 = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
launch {hotFlow2.collect {println("hot2 flow collect: $it thread=${Thread.currentThread().name}")}
}
delay(200)
for(i in 1..10) {println("tryEmit ${hotFlow2.tryEmit(i)}")
}
delay(200)hotFlow2.tryEmit(100)
所谓的emit缓冲,指的是emit到buffer中,还未被Collector消费而暂存的buffer。上述在collect后连续使用tryEmit发送10条消息,但由于buffer=reply+extrabuffer==1,因此在collect消耗完第一条消息前,后续的tryEmit都是失败返回false,输入如下
简单将上述代码改为
replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST
后,效果就不一样了,DROP策略不会挂起,因此tryEmit一定返回true。
emit buffer满了后,会丢弃OLDEST消息,因此接收到10;
再将上述代码改为
replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST
emit buffer满了后,会丢弃LATEST消息,因此接收到1;
如果想要使用SUSPEND策略,并且在buffer满了后,挂起emit,则直接起一个协程emit即可,这可以实现TransformQueue和BlockQueue效果
如消费速度决定生产速度的flow
val hotFlow2 = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)
launch {hotFlow2.collect {println("hot2 flow collect: $it thread=${Thread.currentThread().name}")}
}
delay(200)launch {for(i in 1..10) {println("hot flow2 emit ${i}")hotFlow2.emit(i)}
}
输出如下:只有collect后,后续的emit才会执行。
一般的阻塞队列呢?很简单,定义如下:
val hotFlow2 = MutableSharedFlow<Int>(replay = 5, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
replay参数可以理解为队列大小参数。
基于以上,可知
Android官方LiveData
LiveData = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
无Sticky的LiveData
无stickyLiveData = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
TransformQueue
queue = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)
BlockQueue
queue = MutableSharedFlow<Int>(replay = 5/*队列大小*/, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
2,StateFlow
如果想简单使用Flow类似LiveData,还可以直接使用StateFlow。
val stateFLow = MutableStateFlow(1)launch {stateFLow.collect {println("state flow collect: $it thread=${Thread.currentThread().name}")}}delay(1000)for (i in 1..10) {stateFLow.tryEmit(i)}
这里tryEmit和emit方法实现相同,都是立即返回不会挂起。StateFlow必须有一个初始状态,且collect只会接受到最新的值,因为StateFlow没有replayCache。