Kotlin协程Flow流buffer缓冲批量任务或数据,条件筛选任务或数据
Kotlin协程Flow流buffer缓冲批量任务或数据,条件筛选任务或数据
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.runBlockingfun main() {runBlocking {var eachFlag = truevar collectFlag = true(0..50).chunked(5) //切块,分区.asFlow().onStart { println("onStart") }.onEach { itList ->//假设这里是生产者,密集生产数据或任务collectFlag = trueif (eachFlag) {println("--------------------")eachFlag = false}print("onEach [ ")itList.forEach { itData ->print("$itData ")}print("]")println()}.buffer(capacity = 3, onBufferOverflow = BufferOverflow.SUSPEND).collect { itList ->//假设这里是消费者,这里的消费者以一定的耗时完成任务或消费数据eachFlag = trueif (collectFlag) {println("↓")collectFlag = false}print("collect [ ")itList.forEach { itData ->print("$itData ")}print("]")val sortedList = itList.sortedBy { it }print(" 最大值=${sortedList.lastOrNull()} 最小值=${sortedList.firstOrNull()}")println()}}
}
输出:
onStart
--------------------
onEach [ 0 1 2 3 4 ]
onEach [ 5 6 7 8 9 ]
onEach [ 10 11 12 13 14 ]
onEach [ 15 16 17 18 19 ]
onEach [ 20 21 22 23 24 ]
↓
collect [ 0 1 2 3 4 ] 最大值=4 最小值=0
collect [ 5 6 7 8 9 ] 最大值=9 最小值=5
collect [ 10 11 12 13 14 ] 最大值=14 最小值=10
collect [ 15 16 17 18 19 ] 最大值=19 最小值=15
collect [ 20 21 22 23 24 ] 最大值=24 最小值=20
--------------------
onEach [ 25 26 27 28 29 ]
onEach [ 30 31 32 33 34 ]
onEach [ 35 36 37 38 39 ]
onEach [ 40 41 42 43 44 ]
onEach [ 45 46 47 48 49 ]
↓
collect [ 25 26 27 28 29 ] 最大值=29 最小值=25
collect [ 30 31 32 33 34 ] 最大值=34 最小值=30
collect [ 35 36 37 38 39 ] 最大值=39 最小值=35
collect [ 40 41 42 43 44 ] 最大值=44 最小值=40
collect [ 45 46 47 48 49 ] 最大值=49 最小值=45
--------------------
onEach [ 50 ]
↓
collect [ 50 ] 最大值=50 最小值=50
相关:
https://blog.csdn.net/zhangphil/article/details/132527122
https://blog.csdn.net/zhangphil/article/details/139237348
