当前位置: 首页 > news >正文

Kotlin并发请求的一些知识记录

    private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit	//高阶函数回调) {val deferredList = mutableListOf<Deferred<MyType?>>()// 设定任务超时时间为12秒,并使用 async 并发执行请求withTimeoutOrNull(12_000L) {Log.d(TAG, "request size:${list.size}")for ((index, item) in list.withIndex()) {val deferred = async {	//对每个item都发起一次异步请求, 这里是并发的请求//通过callbackChannel来传递结果,参数UNLIMITED为无限缓冲,具体的在下面扩展有讲val callbackChannel = Channel<MyType?>(Channel.UNLIMITED)SDKInstance(item.id,object : SDKCallback() {override fun onSuccess(children: List<SDKType>,) {super.onSuccess(children)Log.d(TAG, "success name: ${item.name}")val item = MyType(item.name, item.id)item.list = childrencallbackChannel.trySend(item).isSuccess}override fun onError() {super.onError()callbackChannel.trySend(null).isFailureLog.d(TAG, "error name: ${item.name}")}})callbackChannel.receive()}deferredList.add(deferred)}}// 等待所有请求完成val resultData = mutableListOf<MyType>()var requestSituation: Int = REQUEST_TIME_DEFAULT	//超时情况记录for (deferred in deferredList) {try {val result = deferred.await()	//这里的await就是等待异步任务完成result?.let {resultData.add(it)requestSituation = REQUEST_TIME_NORMAL}} catch (e: Exception) {// 处理任务异常Log.d(TAG, "error: ${e}, isTimeOut = $requestSituation")if (requestSituation != REQUEST_TIME_NORMAL) { //如果有数据返回成功就无需记录超时requestSituation = REQUEST_TIME_TIMEOUT	//如果所有数据获取超时,需要反馈异常}}}Log.d(TAG, "response size: ${resultData.size}")if(requestSituation == REQUEST_TIME_TIMEOUT) {onRequestResult(REQ_ERROR, null)} else if (resultData.isEmpty()) {onRequestResult(REQ_NO_DATA, null)} else {if (list.size - resultData.size > Math.max((list.size - 1) / 2, 1) && resultData.size < 5) {onRequestResult(REQ_ERROR, null)} else {onRequestResult(REQ_SUCCESS, resultData)myData.applyPut { cache ->cache[MyKey] = resultData	//这里使用了LruCache,以后再讲}}}}

扩展:

Channel在这段代码中的作用

  1. 桥接api与协程:将传统的回调式API(SDK的回调)转换为协程友好的异步操作
  2. 同步时序:确保在SDK回调后,协程能够继续执行
  3. 结果传递:将回调结果传递回主协程流程

潜在问题

  1. 使用无限缓冲可能不必要,因为channel开启在for循环中,一次只需要接收一个结果
  2. channel没有被显式关闭,可能导致资源泄漏
try{
//回调处理
...
}	finally {callbackChannel.close()	//确保关闭
}

Channel是什么?

它是Kotlin协程中的一个并发通信原语,用于在不同协程之间安全的传递数据。类似阻塞队列,但完全基于协程的非阻塞特性实现。
它是协程间通信的强大工具,特别适合将回调式API转换为挂起函数,使异步代码更线性易读。

Channel的基本特点

生产者-消费者模式:一个协程发送数据,另一个协程接收数据
线程安全:内部已处理好线程同步的问题
可挂起:当Channel满或空时,发生和接收操作会挂起协程而非阻塞线程

Channel在以上代码中的时序关系

  1. 创建channel:在每次async任务中创建一个channel

  2. SDK回调:当收到SDK回调,成功获取数据时,使用trySend发送数据,失败时使用trySend发送null

  3. 接收结果:通过callbackChannel.receive()等待SDK回调

    关键时序点:receive会挂起协程,直到SDK回调触发并发送数据到Channel

Channel的常见用法

  1. 创建Channel
//创建有缓冲的Channel
val channel = Channel<T>(capacity)//capacity
//RENDEZVOUS(默认,无缓冲)
//UNLIMITED(无限缓冲,MAX_VALUE)
//CONFLATED(只保留最小值)
//具体数字(固定缓冲大小)
  1. 发送数据
//常规发送(可能挂起)
channel.send(data)//尝试发送(不挂起)
channel.trySend(data).isSuccess
  1. 接收数据
//常规接收(可能挂起)
val data = channel.receive()//尝试接收(不挂起)
val data = channel.tryReceive().getOrNull()
  1. 关闭Channel
channel.close()	//发送结束信号, 防止资源泄漏

关于这段代码的优化写法

private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit
) {val resultData = mutableListOf<MyType>()var requestSituation = REQUEST_TIME_DEFAULTtry {withTimeout(12_000L) {val deferredResults = list.map { item ->async {try {val result = suspendCancellableCoroutine<MyType?> { continuation ->val callback = object : SDKCallback() {override fun onSuccess(children: List<SDKType>,) {val item= MyType(item.name, item.id).apply {list = children}continuation.resume(item)}override fun onError() {continuation.resume(null)}}continuation.invokeOnCancellation {// 如果协程被取消,可以在这里取消SDK请求// 需要SDK支持取消操作}SDKInstance(item.id, callback)}result} catch (e: Exception) {null}}}deferredResults.forEach { deferred ->deferred.await()?.let {resultData.add(it)requestSituation = REQUEST_TIME_NORMAL}}}} catch (e: TimeoutCancellationException) {if (requestSituation != REQUEST_TIME_NORMAL) {requestSituation = REQUEST_TIME_TIMEOUT}Log.w(TAG, "Request timeout: ${e.message}")} catch (e: Exception) {Log.e(TAG, "Unexpected error: ${e.message}", e)}// 结果处理逻辑保持不变when {requestSituation == REQUEST_TIME_TIMEOUT -> {onRequestResult(REQ_ERROR, null)}resultData.isEmpty() -> {onRequestResult(REQ_NO_DATA, null)}list.size - resultData.size > maxOf(list.size / 2, 1) && resultData.size < 5 -> {onRequestResult(REQ_ERROR, null)}else -> {onRequestResult(REQ_SUCCESS, resultData)myData.applyPut { cache ->cache[myKey] = resultData}}}
}

优化点说明

  • 替换Channel为suspendCancellableCoroutine:

    更直接地将回调API转换为挂起函数

    避免了Channel资源管理问题

  • 改进资源管理:

    使用invokeOnCancellation处理协程取消

    确保所有可能的异常都被捕获

  • 缓冲策略优化:

    完全移除了不必要的Channel缓冲

    使用更直接的协程控制流

  • 错误处理增强:

    明确区分超时和其他异常

    更好的日志记录

核心知识点

  1. 协程与回调的转换:

    suspendCancellableCoroutine将回调API转换为挂起函数协程取消处理机制
    
  2. 结构化并发:

    withTimeout创建有时间限制的作用域async/await并发模式
    
  3. 资源管理:

    协程取消时的清理工作异常处理边界
    
  4. 并发控制:

    多个请求的并行执行结果的聚合处理
    
  5. 状态管理:

    请求状态的跟踪(REQUEST_TIME_NORMAL/TIMEOUT)结果的成功/失败判定逻辑
    

相关文章:

  • 掌握Multi-Agent实践(七):基于AgentScope分布式模式实现多智能体高效协作[并行加速大模型辅助搜索、分布式多用户协同辩论赛]
  • 详细分析python 中的deque 以及和list 的用法区别
  • 【深度剖析】安踏体育的数字化转型(上篇1)
  • 嵌入式学习的第二十一天-数据结构-双向链表
  • js关于number类型的计算问题
  • RabbitMQ工作流程及使用方法
  • c/c++消息队列库RabbitMQ的使用
  • 动态库和静态库的区别
  • 以项目的方式学QT开发(二)
  • 哲学物理:太极图和莫比乌斯环有什么关系?
  • OkHttp用法-Java调用http服务
  • 【Linux系列】Linux 系统下 SSD 磁盘识别
  • 【油藏地球物理正演软件ColchisFM】基于数据驱动的油藏参数叠前地震反演研究进展
  • 操作系统学习笔记第3章 内存管理(灰灰题库)
  • javaSE.QueueDeque
  • python打卡打印26
  • Git 常用命令详解
  • 进程替换讲解
  • 【day01】 Chroma 核心操作流程
  • IT系统的基础设施:流量治理、服务治理、资源治理,还有数据治理。
  • 英国6月初将公布对华关系的审计报告,外交部:望英方树立正确政策导向
  • 在本轮印巴冲突的舆论场上也胜印度一筹,巴基斯坦靠什么?
  • 警方通报男子广州南站持刀伤人:造成1人受伤,嫌疑人被控制
  • 泰山、华海、中路等山东险企综合成本率均超100%,承保业务均亏损
  • 视频|王弘治:王太后,“先天宫斗圣体”?
  • 新能源汽车,告别混乱创新