Kotlin协程Flow任务流buffer缓冲批量任务,筛选批量中最高优先级任务运行(2)
Kotlin协程Flow任务流buffer缓冲批量任务,筛选批量中最高优先级任务运行(2)
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.runBlockingfun main() {runBlocking {var eachFlag = truevar collectFlag = trueval taskFlow = flow {(0..51).chunked(5) //切块,分区.forEach { itList ->val emitData = mutableListOf<EmitData>()itList.forEach { idx ->val priority = (Math.random() * 99999).toInt()val data = TaskData(idx, priority)emitData.add(EmitData(data, { myTask(data) }))}emit(emitData)}}taskFlow.onStart { println("onStart") }.onEach { itList ->//假设这里是生产者,密集生产数据或任务collectFlag = trueif (eachFlag) {println("\n- - - - - - - - - - - - - - - - - - - -")eachFlag = false}print("onEach [ ")itList.forEach { itData ->print("(id=${itData.data.id} priority=${itData.data.priority}) ")}print("]")println()}.buffer(capacity = 3, onBufferOverflow = BufferOverflow.SUSPEND).collect { itList ->//假设这里是消费者,这里的消费者以一定的耗时完成任务或消费数据eachFlag = trueif (collectFlag) {println("\n↓ ↓ ↓\n")collectFlag = false}print("collect [ ")itList.forEach { itData ->print("(id=${itData.data.id} priority=${itData.data.priority}) ")}print("]")val sortedList = itList.sortedBy { it.data.priority }val highTask = sortedList.lastOrNull() as EmitDataprint(" 最高优先级=${highTask.data.priority}")println()highTask.func.invoke(highTask.data)println("*")}}
}data class TaskData(val id: Int, val priority: Int)fun myTask(data: TaskData) {println("high priority task (id=${data.id} priority=${data.priority}), run!")Thread.sleep(1000)
}data class EmitData(val data: TaskData, val func: (data: TaskData) -> Unit)
输出:onStart
onStart
- - - - - - - - - - - - - - - - - - - -
onEach [ (id=0 priority=44362) (id=1 priority=33932) (id=2 priority=34716) (id=3 priority=54267) (id=4 priority=94424) ]
onEach [ (id=5 priority=66072) (id=6 priority=66137) (id=7 priority=24833) (id=8 priority=91758) (id=9 priority=57628) ]
onEach [ (id=10 priority=52976) (id=11 priority=72277) (id=12 priority=96616) (id=13 priority=78802) (id=14 priority=42246) ]
onEach [ (id=15 priority=25361) (id=16 priority=5490) (id=17 priority=69197) (id=18 priority=18863) (id=19 priority=29917) ]
onEach [ (id=20 priority=87976) (id=21 priority=75357) (id=22 priority=81733) (id=23 priority=1851) (id=24 priority=44115) ]↓ ↓ ↓
collect [ (id=0 priority=44362) (id=1 priority=33932) (id=2 priority=34716) (id=3 priority=54267) (id=4 priority=94424) ] 最高优先级=94424
high priority task (id=4 priority=94424), run!
*
collect [ (id=5 priority=66072) (id=6 priority=66137) (id=7 priority=24833) (id=8 priority=91758) (id=9 priority=57628) ] 最高优先级=91758
high priority task (id=8 priority=91758), run!
*
collect [ (id=10 priority=52976) (id=11 priority=72277) (id=12 priority=96616) (id=13 priority=78802) (id=14 priority=42246) ] 最高优先级=96616
high priority task (id=12 priority=96616), run!
*
collect [ (id=15 priority=25361) (id=16 priority=5490) (id=17 priority=69197) (id=18 priority=18863) (id=19 priority=29917) ] 最高优先级=69197
high priority task (id=17 priority=69197), run!
*
collect [ (id=20 priority=87976) (id=21 priority=75357) (id=22 priority=81733) (id=23 priority=1851) (id=24 priority=44115) ] 最高优先级=87976
high priority task (id=20 priority=87976), run!
*- - - - - - - - - - - - - - - - - - - -
onEach [ (id=25 priority=11767) (id=26 priority=7471) (id=27 priority=97271) (id=28 priority=34503) (id=29 priority=84445) ]
onEach [ (id=30 priority=71684) (id=31 priority=14763) (id=32 priority=40293) (id=33 priority=68775) (id=34 priority=54138) ]
onEach [ (id=35 priority=11276) (id=36 priority=43112) (id=37 priority=38698) (id=38 priority=88547) (id=39 priority=82780) ]
onEach [ (id=40 priority=75921) (id=41 priority=3175) (id=42 priority=30385) (id=43 priority=33549) (id=44 priority=85067) ]
onEach [ (id=45 priority=17563) (id=46 priority=6782) (id=47 priority=33834) (id=48 priority=60989) (id=49 priority=2547) ]↓ ↓ ↓
collect [ (id=25 priority=11767) (id=26 priority=7471) (id=27 priority=97271) (id=28 priority=34503) (id=29 priority=84445) ] 最高优先级=97271
high priority task (id=27 priority=97271), run!
*
collect [ (id=30 priority=71684) (id=31 priority=14763) (id=32 priority=40293) (id=33 priority=68775) (id=34 priority=54138) ] 最高优先级=71684
high priority task (id=30 priority=71684), run!
*
collect [ (id=35 priority=11276) (id=36 priority=43112) (id=37 priority=38698) (id=38 priority=88547) (id=39 priority=82780) ] 最高优先级=88547
high priority task (id=38 priority=88547), run!
*
collect [ (id=40 priority=75921) (id=41 priority=3175) (id=42 priority=30385) (id=43 priority=33549) (id=44 priority=85067) ] 最高优先级=85067
high priority task (id=44 priority=85067), run!
*
collect [ (id=45 priority=17563) (id=46 priority=6782) (id=47 priority=33834) (id=48 priority=60989) (id=49 priority=2547) ] 最高优先级=60989
high priority task (id=48 priority=60989), run!
*- - - - - - - - - - - - - - - - - - - -
onEach [ (id=50 priority=81466) (id=51 priority=44705) ]↓ ↓ ↓
collect [ (id=50 priority=81466) (id=51 priority=44705) ] 最高优先级=81466
high priority task (id=50 priority=81466), run!
*
相关:
https://blog.csdn.net/zhangphil/article/details/154840841
