Kotlin并发请求的一些知识记录
private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit //高阶函数回调) {val deferredList = mutableListOf<Deferred<MyType?>>()// 设定任务超时时间为12秒,并使用 async 并发执行请求withTimeoutOrNull(12_000L) {Log.d(TAG, "request size:${list.size}")for ((index, item) in list.withIndex()) {val deferred = async { //对每个item都发起一次异步请求, 这里是并发的请求//通过callbackChannel来传递结果,参数UNLIMITED为无限缓冲,具体的在下面扩展有讲val callbackChannel = Channel<MyType?>(Channel.UNLIMITED)SDKInstance(item.id,object : SDKCallback() {override fun onSuccess(children: List<SDKType>,) {super.onSuccess(children)Log.d(TAG, "success name: ${item.name}")val item = MyType(item.name, item.id)item.list = childrencallbackChannel.trySend(item).isSuccess}override fun onError() {super.onError()callbackChannel.trySend(null).isFailureLog.d(TAG, "error name: ${item.name}")}})callbackChannel.receive()}deferredList.add(deferred)}}// 等待所有请求完成val resultData = mutableListOf<MyType>()var requestSituation: Int = REQUEST_TIME_DEFAULT //超时情况记录for (deferred in deferredList) {try {val result = deferred.await() //这里的await就是等待异步任务完成result?.let {resultData.add(it)requestSituation = REQUEST_TIME_NORMAL}} catch (e: Exception) {// 处理任务异常Log.d(TAG, "error: ${e}, isTimeOut = $requestSituation")if (requestSituation != REQUEST_TIME_NORMAL) { //如果有数据返回成功就无需记录超时requestSituation = REQUEST_TIME_TIMEOUT //如果所有数据获取超时,需要反馈异常}}}Log.d(TAG, "response size: ${resultData.size}")if(requestSituation == REQUEST_TIME_TIMEOUT) {onRequestResult(REQ_ERROR, null)} else if (resultData.isEmpty()) {onRequestResult(REQ_NO_DATA, null)} else {if (list.size - resultData.size > Math.max((list.size - 1) / 2, 1) && resultData.size < 5) {onRequestResult(REQ_ERROR, null)} else {onRequestResult(REQ_SUCCESS, resultData)myData.applyPut { cache ->cache[MyKey] = resultData //这里使用了LruCache,以后再讲}}}}
扩展:
Channel在这段代码中的作用
- 桥接api与协程:将传统的回调式API(SDK的回调)转换为协程友好的异步操作
- 同步时序:确保在SDK回调后,协程能够继续执行
- 结果传递:将回调结果传递回主协程流程
潜在问题
- 使用无限缓冲可能不必要,因为channel开启在for循环中,一次只需要接收一个结果
- channel没有被显式关闭,可能导致资源泄漏
try{
//回调处理
...
} finally {callbackChannel.close() //确保关闭
}
Channel是什么?
它是Kotlin协程中的一个并发通信原语,用于在不同协程之间安全的传递数据。类似阻塞队列,但完全基于协程的非阻塞特性实现。
它是协程间通信的强大工具,特别适合将回调式API转换为挂起函数,使异步代码更线性易读。
Channel的基本特点
生产者-消费者模式:一个协程发送数据,另一个协程接收数据
线程安全:内部已处理好线程同步的问题
可挂起:当Channel满或空时,发生和接收操作会挂起协程而非阻塞线程
Channel在以上代码中的时序关系
-
创建channel:在每次async任务中创建一个channel
-
SDK回调:当收到SDK回调,成功获取数据时,使用trySend发送数据,失败时使用trySend发送null
-
接收结果:通过callbackChannel.receive()等待SDK回调
关键时序点:receive会挂起协程,直到SDK回调触发并发送数据到Channel
Channel的常见用法
- 创建Channel
//创建有缓冲的Channel
val channel = Channel<T>(capacity)//capacity
//RENDEZVOUS(默认,无缓冲)
//UNLIMITED(无限缓冲,MAX_VALUE)
//CONFLATED(只保留最小值)
//具体数字(固定缓冲大小)
- 发送数据
//常规发送(可能挂起)
channel.send(data)//尝试发送(不挂起)
channel.trySend(data).isSuccess
- 接收数据
//常规接收(可能挂起)
val data = channel.receive()//尝试接收(不挂起)
val data = channel.tryReceive().getOrNull()
- 关闭Channel
channel.close() //发送结束信号, 防止资源泄漏
关于这段代码的优化写法
private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit
) {val resultData = mutableListOf<MyType>()var requestSituation = REQUEST_TIME_DEFAULTtry {withTimeout(12_000L) {val deferredResults = list.map { item ->async {try {val result = suspendCancellableCoroutine<MyType?> { continuation ->val callback = object : SDKCallback() {override fun onSuccess(children: List<SDKType>,) {val item= MyType(item.name, item.id).apply {list = children}continuation.resume(item)}override fun onError() {continuation.resume(null)}}continuation.invokeOnCancellation {// 如果协程被取消,可以在这里取消SDK请求// 需要SDK支持取消操作}SDKInstance(item.id, callback)}result} catch (e: Exception) {null}}}deferredResults.forEach { deferred ->deferred.await()?.let {resultData.add(it)requestSituation = REQUEST_TIME_NORMAL}}}} catch (e: TimeoutCancellationException) {if (requestSituation != REQUEST_TIME_NORMAL) {requestSituation = REQUEST_TIME_TIMEOUT}Log.w(TAG, "Request timeout: ${e.message}")} catch (e: Exception) {Log.e(TAG, "Unexpected error: ${e.message}", e)}// 结果处理逻辑保持不变when {requestSituation == REQUEST_TIME_TIMEOUT -> {onRequestResult(REQ_ERROR, null)}resultData.isEmpty() -> {onRequestResult(REQ_NO_DATA, null)}list.size - resultData.size > maxOf(list.size / 2, 1) && resultData.size < 5 -> {onRequestResult(REQ_ERROR, null)}else -> {onRequestResult(REQ_SUCCESS, resultData)myData.applyPut { cache ->cache[myKey] = resultData}}}
}
优化点说明
-
替换Channel为suspendCancellableCoroutine:
更直接地将回调API转换为挂起函数
避免了Channel资源管理问题
-
改进资源管理:
使用invokeOnCancellation处理协程取消
确保所有可能的异常都被捕获
-
缓冲策略优化:
完全移除了不必要的Channel缓冲
使用更直接的协程控制流
-
错误处理增强:
明确区分超时和其他异常
更好的日志记录
核心知识点
-
协程与回调的转换:
suspendCancellableCoroutine将回调API转换为挂起函数协程取消处理机制
-
结构化并发:
withTimeout创建有时间限制的作用域async/await并发模式
-
资源管理:
协程取消时的清理工作异常处理边界
-
并发控制:
多个请求的并行执行结果的聚合处理
-
状态管理:
请求状态的跟踪(REQUEST_TIME_NORMAL/TIMEOUT)结果的成功/失败判定逻辑