当前位置: 首页 > news >正文

Android 16 Kotlin协程 第二部分


返回多个值-集合-序列-挂起函数:
如何表示多个值?
挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?

异步返回多个值的方案:
集合
序列
挂起函数
Flow

class CoroutineTest01 {
// 返回多个值,但不是异步
fun simpleList(): List<Int> = listOf<Int>(1,2,3)

// 返回多个值,是同步
fun simpleSequence(): Sequence<Int> = sequence {
for(i in 1..3) {
// Thread.sleep(1000)   // 阻塞,假装再计算
// delay(1000)  // 报错,只能写它接口扩展支持的挂起函数
yield(i)
}
}

// 返回了多个值,是异步,一次性返回了多个值
suspend fun simpleList2(): List<Int>{
delay(1000)
return listOf<Int>(1,2,3)
}

// 返回多个值,而且是异步的,并不是一次性返回的
suspend fun simpleFlow() = flow<int>{
for(i in 1..3) {
delay(1000)  // 假装在做一些重要的事情
emit(i)  // 发射,产生一个元素
}
}

    @Test
fun `test multiple values`() {
// simpleList().forEach {value -> println(value)}
// simpleSequence().forEach {value -> println(value)}
}

@Test
fun `test multiple values2`() = runBlocking<Unit>{
simpleList2.forEach {value -> println(value)}
}

@Test
fun `test multiple values3`() = runBlocking<Unit>{
launch{
for(k in 1..3) {
println("I'm not blocked $k")
delay(1500)
}
}
// collect:末端操作符
simpleFlow().collect {value -> println(value)}
}
}

通过Flow异常返回多个值:
// 返回多个值,而且是异步的,并不是一次性返回的
suspend fun simpleFlow() = flow<int>{
for(i in 1..3) {
delay(1000)  // 假装在做一些重要的事情
emit(i)  // 发射,产生一个元素
}
}

Flow与其他方式的区别:
名为flow的Flow类型构建器函数。
flow{...}构建快的代码可以挂起。
函数simpleFlow不再标有suspend修饰符。
流使用emit函数发射值。
流使用collect函数收集值。
元素 emit --->  Flow(元素 - 元素 - 元素 。。。)   --->collect 元素 

Flow的一个典型应用场景:
在Android当中,文件下载是Flow的一个非常典型的应用.
Main Thread  <--- collect (element element ...)  <--- BackgroundThread(down file -->emit)

什么是冷流:
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
fun simpleFlow() = flow<int>{
println("Flow started")
for(i in 1..3) {
delay(3000)  // 假装在做一些重要的事情
emit(i)  // 发射,产生一个元素
}
}
@Test
fun `test flow is cold`() = runBlocking<Unit> {
val flow = simpleFlow()
println("Calling collect...")
flow.collect{value -> println(value)}
println("Calling collect again...")
flow.collect{value -> println(value)}
}
冷流,冷启动,临阵磨枪
热流,热启动,创业,之前有积累

流的连续性:
流的每次单独手机都是按照顺序执行的,除非使用特殊操作符。
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。
@Test
fun `test flow continuation`() = runBlocking<Unit> {
(1..5).asFlow().filter {
println("Filter")
it % 2 == 0
}.map {
"string $it"
}.collect {
println("Collect $it") // string 2 string 4
}
}

流构建器:
flowOf构建器定义了一个发射固定值集的流。
使用.asFlow()扩展函数,可以将各种集合与序列转换为流。
@Test
fun `test flow builder`() = runBlocking<Unit> {
flowOf("one","two","three")
.onEach{delay(1000)}
}.collect {value ->
println("Collect $value")
}

(1..5).asFlow()..collect {value ->
println("Collect $value")
}
}

流上下文:
流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
flow{...}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。
flowOn操作符,该函数用于更改流发射的上下文。
fun simpleFlow3() = flow(Int) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}

fun simpleFlow4() = flow(Int) {
// 这种方式是后台线程,崩溃了 java.lang.IllegalStateException: Flow invariant is violated
withContext(Dispatcher.IO) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
}

fun simpleFlow5() = flow(Int) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)

@Test 
fun `test flow context`() = runBlocking<Unit> {
simpleFlow3().collect{value -> println("Collected ${value} ${Thread.currentThread().name}")}
}

@Test 
fun `test flow on`() = runBlocking<Unit> {
simpleFlow5().collect{value -> println("Collected ${value} ${Thread.currentThread().name}")}
}

在指定协程中收集流:
使用launchIn替换collect我们可以在单独的协程中启动流的收集。
// 事件源
fun events() = (1.3)
.asFlow()
.onEach{ delay(100)}
.flowOn(Dispatchers.Default)

@Test
fun `test flow launch`() = runBlocking<Unit> {
val job = events()
.onEach{event -> println("Event: $event ${Thread.currentThread().name}")}
//.collect{}
.launchIn(CoroutineScope(Dispatchers.IO))  // 返回的是Job对象
//.join()
// .launchIn(this) // 主协程
delay(200)
job.cancelAndJoin()
}

流的取消:
流采用与协程同样的协作取消。像往常一样,流的收集可以是当流在一个可取消的挂起函数(例如delay)中挂起的时候取消。
fun simpleFlow6() = flow(Int) {
for (i in 1..3) {
delay(1000)
emit(i)
println("Emitting $i")
}
}

@Test
fun `test cancel flow`() = runBlocking<Unit> {
withTimeoutOrNull(2500) {
simpleFlow6().collect {value -> println(value)}
}
println("Done")
}

流的取消检测:
为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...}发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检擦,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。
fun simpleFlow7() = flow(Int) {
for (i in 1..5) {
emit(i)
println("Emitting $i")
}
}

@Test
fun `test cancel flow check`() = runBlocking<Unit> {
simpleFlow7().collect {value ->
println(value)
if(value == 3) cancel
}

// 繁忙任务取消失败 加上cancellable
(1..5).asFlow().cancellable().collect {value ->
println(value)
if(value == 3) cancel
}
}

使用缓冲与flowOn处理背压:
背压: back pressure
响应式编程,基于生产者消费者模式,都有背压
生产者【入口】  =====》 水流收到与流动方向一致的压力,背压   ---》消费者【出口】
生产者生产效率大于消费者消费效率。 要么降低生产效率,要么提高消费效率。
buffer(),并发运行流种发射元素的代码。
conflate(),合并发射项,不对每个值进行处理。
collectLatest(),取消并重新发射最后一个值。
当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显式地请求缓冲而不改变执行上下文。

fun simpleFlow8() = flow(Int) {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}

// .buffer(50) 一次性发射 100ms
// .flowOn(Dispatchers.Default)  一次性发射 100ms 效果和.buffer(50)一致
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.flowOn(Dispatchers.Default)
.buffer(50).collect {value ->
delay(300) // 处理这个元素消耗300ms
println("Collected $value ${Thread.currentThread().name}")
}
println("Coillected in $time ms")  //1200ms
}

合并与处理最新值:
.conflate()  //消费的时候过滤某些元素,处理最新的值,跳过中间的过渡值
.collectLatest()  // 直接处理最后一个值


过渡流操作符
可以使用操作符转换流,就像使用集合与序列一样。
过渡操作符应用于上游流,并返回下游流。
这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。
它运行的速度很快,返回新的转换流的定义。

转换操作符:
suspend fun performRequest(request: Int) : String {
delay(1000)
return "response $request"
}

@Test
fun `test transform flow operator`() = runBlocking<Unit> {
(1..3).asFlow()
.map {request -> performRequest(request)}
.collect {value -> println(value)}

(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect {value -> println(value)}
}


限长操作符:
fun numbers() = flow<Int> {
try{
emit(1)
emit(2)
println("This line will not execute")  // 不打印
emit(3)
}finally{
println("Finally in numbers")
}
}

@Test
fun `test limit length operator`() = runBlocking<Unit> {
numbers().take(2).collect {value -> println(value)}
}

末端流操作符:
末端操作符是在流上用户启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符。
转换为各种集合,例如toList与toSet。
获取第一个(first)值与确保流发射单个(single)值的操作符。
使用reduce与fold将流规约到单个值。

@Test
fun `test terminal operator`() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it }
.reduce {a, b -> a + b}
println(sum) // 55
}

组合操作符:
就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。
@Test
fun `test zip operator`() = runBlocking<Unit> {
val numbers = (1..3).asFlow()
val strs = flowOf("One", "Two", "Three")
numbers.zip(strs) {a, b -> "$a -> $b"}.collect {println(it)}
}

@Test
fun `test zip operator 02`() = runBlocking<Unit> {
val numbers = (1..3).asFlow().onEach {delay(300)}
val strs = flowOf("One", "Two", "Three").onEach {delay(400)}
val startTime = System.currentTimeMillis()
numbers.zip(strs) {a, b -> "$a -> $b"}.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}


展平操作符:
流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符。
flatMapConcat 连接模式
flatMapMerge 合并模式
flatMapLatest 最新展平模式

fun requestFlow(i: Int) = flow<String> {
emit("$i : First")
delay(500)
emit("$i : Second")
}

// 两个流具备关联性
// 连接展平模式
@Test
fun `test flatMapConcat`() = runBlocking<Unit> {
// Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach{ delay(100)}
//.map {requestFlow(it)}  // 得到一个Flow<Flow<String>> 元素
.flatMapConcat{requestFlow(it)}  // 展平,从二维到一维,得到一个Flow<String>
.collect {value -> print("$value at ${System.currentTimeMillis() - startTime} ms from start")}
// 1 1 2 2 3 3 输出这种格式
}

// 合并展平模式
@Test
fun `test flatMapMerge`() = runBlocking<Unit> {
// Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach{ delay(100)}
//.map {requestFlow(it)}  // 得到一个Flow<Flow<String>> 元素
.flatMapMerge{requestFlow(it)}  // 展平,从二维到一维,得到一个Flow<String>
.collect {value -> print("$value at ${System.currentTimeMillis() - startTime} ms from start")}
// 1 2 3 1 2 3 输出这种格式
}

// latest最新展平模式
@Test
fun `test flatMapLatest`() = runBlocking<Unit> {
// Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach{ delay(100)}
//.map {requestFlow(it)}  // 得到一个Flow<Flow<String>> 元素
.flatMapLatest{requestFlow(it)}  // 展平,从二维到一维,得到一个Flow<String>
.collect {value -> print("$value at ${System.currentTimeMillis() - startTime} ms from start")}
// 1 2 3 3 输出这种格式  最后输出的中间的值不要了


流的异常处理:
当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:
try/catch块
catch函数
class CoroutineTest03 {
fun simpleFlow() = flow<Int> {
for(i in 1..3) {
println("Emitting $i")
emit(i)
}
}

@Test
fun `test flow exception`() = runBlocking<Unit> {
try {
simpleFlow().collect{ value ->
println(value)
check(value <= 1) {"Collected $value"}
}
}catch(e: Throwable) {
println("Caught $e")
}
}

@Test
fun `test flow exception2`() = runBlocking<Unit> {
// flow的设计原则
flow{
emit(1)
throw ArithmeticException("Div 0")
}catch{ // 上游异常
e: Throwable -> println("Caught $e")}
.flowOn(Dispatchers.IO)
.collect{ println(it)}


flow{
emit(1)
throw ArithmeticException("Div 0")
}catch{ e: Throwable ->// 上游异常
println("Caught $e")
emit(10)
}.flowOn(Dispatchers.IO).collect{ println(it)}
}
}

流的完成:
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。
命令式finally块
onCompletion声明式处理
fun simpleFlow2() = (1..3).asFlow()

@Test
fun `test flow complete in finally`() = runBlocking<Unit> {
try{
simpleFlow2().collect {println(it)}
}finally{
println("Done")
}
}

@Test
fun `test flow complete in onCompletion`() = runBlocking<Unit> {
try{
simpleFlow2().onCompletion {println("Done")}.collect {println(it)}
}finally{
println("Done")
}
}

fun simpleFlow3() = flow<Int> {
emit(1)
throw RuntimeException()
}

@Test
fun `test flow complete in onCompletion exception`() = runBlocking<Unit> {
simpleFlow2()
.onCompletion {exception ->
if(exception != null)  println("Flow completed exceptionally")
}
.catch{ exception -> println("Caught $exception")} // 上游的异常
.collect {println(it)}
}

@Test
fun `test flow complete in onCompletion exception2`() = runBlocking<Unit> {
simpleFlow2()
.onCompletion {exception ->
if(exception != null)  println("Flow completed exceptionally")  // 需要try-catch了
}
.collect {value ->
println(value)
check(value <= 1) {"Collected $value"}
}
}

http://www.dtcms.com/a/585283.html

相关文章:

  • 网站建设公司兴田德润电话新县城乡规划建设局网站
  • Claude Code使用指南
  • 如何进行MSSQL提权?默认库,xp_cmdshell提权
  • 第三章 布局
  • 「数据获取」《中国口岸年鉴》(2001-2024)(2002未出版)
  • Visual Studio笔记
  • 【开题答辩全过程】以 二手手机交易平台的设计与实现为例,包含答辩的问题和答案
  • “AI+XR”赋能智慧研创中心,预见职业教育“新双高”的未来
  • 保障房建设网站首页河北信息门户网站定制
  • MySQL的IFNULL函数介绍
  • 【数据结构】从零开始认识图论 --- 单源/多源最短路算法
  • 基于PyTorch的动物识别模型训练与应用实战
  • JS之BOM与DOM操作
  • 品牌企业网站案例wordpress 漂浮广告
  • 【人工智能学习笔记 三】 AI教学之前端跨栈一:React整体分层架构
  • 【ZeroRange WebRTC】WebRTC 在 IPC(网络摄像头)中的应用:架构、实现与实践(深入指南)
  • WiFi 热点启动失败问题排查与解决
  • 手写序列化与反序列化
  • T41NQ/T41N高性能低功耗SOC芯片 软硬件资料T41NQ适用于各种AIoT应用,适用于智能安防、智能家居,机器视觉等领域方案
  • 购物网站建设要求用wordpress改
  • vector 底层模拟实现(上):核心机制全解析 + 迭代器失效深度剖析
  • mysql内置函数——了解常用的函数
  • 网站建设步骤ppt一个企业seo网站的优化流程
  • 技术演进中的开发沉思-178 JSP :前世今生(下)
  • 做网站学什么软件网页美工实例教程
  • 深入理解 Spring Boot Actuator:构建可观测性与运维友好的应用
  • 现代C++的AI革命:C++20/C++23核心特性解析与实战应用
  • 【数据结构】单链表的经典算法题
  • 网站优化要用什么软件做公司网站哪家好
  • 【DaisyUI】select 和 dropdown 怎么选择?