当前位置: 首页 > news >正文

Kotlin 协程异步任务工具类:高效处理异步操作与超时控制

下面是完整的 Kotlin 协程异步任务工具类代码,包含详细注释和使用示例。

import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext/*** 异步任务回调接口* 提供任务执行的各种状态回调** @param T 任务返回结果的类型*/
interface TaskCallback<T> {/*** 任务执行成功回调* @param result 任务执行结果*/fun onSuccess(result: T)/*** 任务执行失败回调* @param error 异常信息*/fun onFailure(error: Throwable)/*** 任务执行超时回调*/fun onTimeout()/*** 任务被取消回调*/fun onCancelled()
}/*** Kotlin 协程异步任务工具类* 提供强大的异步任务执行能力,支持超时控制、重试机制、并行执行等功能*/
class CoroutineTaskUtil private constructor() : CoroutineScope {// 使用 SupervisorJob 允许子协程独立失败而不影响其他协程private val job = SupervisorJob()// 协程上下文配置override val coroutineContext: CoroutineContextget() = Dispatchers.Default + job + CoroutineExceptionHandler { _, throwable ->println("未捕获的协程异常: ${throwable.message}")}companion object {@Volatileprivate var instance: CoroutineTaskUtil? = null/*** 获取工具类单例实例* @return CoroutineTaskUtil 实例*/fun getInstance(): CoroutineTaskUtil {return instance ?: synchronized(this) {instance ?: CoroutineTaskUtil().also { instance = it }}}}/*** 执行异步任务(支持超时控制)** @param task 要执行的挂起任务* @param timeoutMs 超时时间(毫秒),null 表示不设置超时* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeWithTimeout(task: suspend () -> T,timeoutMs: Long? = null,callback: TaskCallback<T>? = null): Job {return launch {try {val result = if (timeoutMs != null) {withTimeout(timeoutMs) { task() }} else {task()}callback?.onSuccess(result)} catch (e: TimeoutCancellationException) {callback?.onTimeout()} catch (e: CancellationException) {callback?.onCancelled()} catch (e: Exception) {callback?.onFailure(e)}}}/*** 执行异步任务(简化版,不带超时)** @param task 要执行的挂起任务* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> execute(task: suspend () -> T,callback: TaskCallback<T>? = null): Job {return executeWithTimeout(task, null, callback)}/*** 在IO线程执行任务,在主线程回调** @param task 要执行的挂起任务* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeOnIoWithMainCallback(task: suspend () -> T,callback: TaskCallback<T>? = null): Job {return CoroutineScope(Dispatchers.IO).launch {try {val result = task()withContext(Dispatchers.Main) {callback?.onSuccess(result)}} catch (e: Exception) {withContext(Dispatchers.Main) {when (e) {is CancellationException -> callback?.onCancelled()else -> callback?.onFailure(e)}}}}}/*** 执行多个并行任务,等待所有任务完成** @param tasks 要执行的多个任务* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeAll(vararg tasks: suspend () -> T,callback: TaskCallback<List<T>>? = null): Job {return launch {try {val deferredResults = tasks.map { async { it() } }val results = deferredResults.awaitAll()callback?.onSuccess(results)} catch (e: Exception) {callback?.onFailure(e)}}}/*** 执行带重试机制的任务** @param task 要执行的任务* @param retries 重试次数* @param delayMs 重试间隔(毫秒)* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeWithRetry(task: suspend () -> T,retries: Int = 3,delayMs: Long = 1000,callback: TaskCallback<T>? = null): Job {return launch {var currentRetry = 0var lastError: Throwable? = nullwhile (currentRetry <= retries) {try {val result = task()callback?.onSuccess(result)return@launch} catch (e: Exception) {lastError = ecurrentRetry++if (currentRetry > retries) breakdelay(delayMs)}}callback?.onFailure(lastError ?: Exception("在 $retries 次重试后仍失败"))}}/*** 取消所有正在执行的任务*/fun cancelAll() {job.cancel("手动取消所有任务")}/*** 关闭工具类,释放资源*/fun shutdown() {cancelAll()}
}/*** 简化使用的顶层扩展函数*//*** 为任意类型添加异步执行能力*/
suspend fun <T> T.executeAsync(timeoutMs: Long? = null,callback: TaskCallback<T>? = null
): Job {return CoroutineTaskUtil.getInstance().executeWithTimeout(task = { this },timeoutMs = timeoutMs,callback = callback)
}/*** 简化版的异步任务执行函数*/
suspend fun <T> executeTask(timeoutMs: Long? = null,task: suspend () -> T,onSuccess: (T) -> Unit = {},onFailure: (Throwable) -> Unit = {},onTimeout: () -> Unit = {},onCancelled: () -> Unit = {}
): Job {return CoroutineTaskUtil.getInstance().executeWithTimeout(task = task,timeoutMs = timeoutMs,callback = object : TaskCallback<T> {override fun onSuccess(result: T) = onSuccess(result)override fun onFailure(error: Throwable) = onFailure(error)override fun onTimeout() = onTimeout()override fun onCancelled() = onCancelled()})
}// ==================================================
// 使用示例
// ==================================================/*** 工具类使用示例*/
class CoroutineTaskExamples {/*** 示例1:基本用法*/fun exampleBasicUsage() {CoroutineTaskUtil.getInstance().execute(task = {delay(1000)"异步任务完成"},callback = object : TaskCallback<String> {override fun onSuccess(result: String) {println("任务成功: $result")}override fun onFailure(error: Throwable) {println("任务失败: ${error.message}")}override fun onTimeout() {println("任务超时")}override fun onCancelled() {println("任务被取消")}})}/*** 示例2:带超时的任务*/fun exampleWithTimeout() {CoroutineTaskUtil.getInstance().executeWithTimeout(task = {delay(5000)"长时间任务完成"},timeoutMs = 3000,callback = object : TaskCallback<String> {override fun onSuccess(result: String) {println("成功: $result")}override fun onFailure(error: Throwable) {println("失败: ${error.message}")}override fun onTimeout() {println("任务超时,已中断")}override fun onCancelled() {println("任务被取消")}})}/*** 示例3:带重试的任务*/fun exampleWithRetry() {var attemptCount = 0CoroutineTaskUtil.getInstance().executeWithRetry(task = {attemptCount++if (attemptCount < 3) {throw RuntimeException("第${attemptCount}次尝试失败")}"第${attemptCount}次尝试成功"},retries = 5,delayMs = 500,callback = object : TaskCallback<String> {override fun onSuccess(result: String) {println("重试成功: $result")}override fun onFailure(error: Throwable) {println("重试失败: ${error.message}")}override fun onTimeout() {}override fun onCancelled() {}})}/*** 示例4:并行执行多个任务*/fun exampleParallelTasks() {CoroutineTaskUtil.getInstance().executeAll({ delay(1000)"任务1结果" },{ delay(2000)"任务2结果" },{ delay(1500)"任务3结果" },callback = object : TaskCallback<List<String>> {override fun onSuccess(result: List<String>) {println("所有任务完成: $result")}override fun onFailure(error: Throwable) {println("任务执行失败: ${error.message}")}override fun onTimeout() {}override fun onCancelled() {}})}/*** 示例5:使用简化API*/suspend fun exampleSimplifiedAPI() {executeTask(timeoutMs = 5000,task = {delay(2000)"简化API示例结果"},onSuccess = { result ->println("简化API成功: $result")},onFailure = { error ->println("简化API失败: ${error.message}")})}
}/*** 实际应用场景示例*/
class PracticalExamples {/*** 场景1:网络请求处理*/fun fetchUserData(userId: String) {CoroutineTaskUtil.getInstance().executeWithTimeout(task = {// 模拟网络请求delay(2000)"用户${userId}的数据"},timeoutMs = 10000,callback = object : TaskCallback<String> {override fun onSuccess(result: String) {updateUserInterface(result)}override fun onFailure(error: Throwable) {showErrorMessage("数据加载失败")}override fun onTimeout() {showErrorMessage("请求超时,请重试")}override fun onCancelled() {showErrorMessage("请求被取消")}})}private fun updateUserInterface(data: String) {println("更新UI: $data")}private fun showErrorMessage(message: String) {println("错误信息: $message")}/*** 场景2:文件操作*/fun processLargeFile(filePath: String) {CoroutineTaskUtil.getInstance().executeOnIoWithMainCallback(task = {readAndProcessFile(filePath)},callback = object : TaskCallback<String> {override fun onSuccess(result: String) {showProcessingResult(result)}override fun onFailure(error: Throwable) {showErrorMessage("文件处理失败: ${error.message}")}override fun onTimeout() {}override fun onCancelled() {}})}private suspend fun readAndProcessFile(filePath: String): String {delay(3000)return "处理完成的文件内容: $filePath"}private fun showProcessingResult(result: String) {println("处理结果: $result")}
}// 测试代码
fun main() = runBlocking {val examples = CoroutineTaskExamples()println("=== 示例1:基本用法 ===")examples.exampleBasicUsage()delay(1500)println("\n=== 示例2:带超时的任务 ===")examples.exampleWithTimeout()delay(4000)println("\n=== 示例3:带重试的任务 ===")examples.exampleWithRetry()delay(3000)println("\n=== 示例4:并行执行多个任务 ===")examples.exampleParallelTasks()delay(3000)println("\n=== 示例5:使用简化API ===")examples.exampleSimplifiedAPI()delay(3000)println("\n=== 实际应用场景 ===")val practicalExamples = PracticalExamples()practicalExamples.fetchUserData("123")delay(3000)practicalExamples.processLargeFile("/path/to/file.txt")delay(4000)
}

功能特性

  1. 核心功能

· 异步任务执行:使用协程实现真正的异步操作
· 超时控制:支持设置任务执行超时时间
· 错误处理:完善的异常处理机制
· 线程调度:支持指定执行线程和回调线程

  1. 高级特性

· 重试机制:支持自动重试失败的任务
· 并行执行:支持多个任务并行执行
· 任务取消:提供任务取消功能
· 资源管理:支持资源清理和释放

  1. API 设计

· 简洁易用:提供简单的函数调用接口
· 类型安全:完整的泛型支持
· 扩展性强:支持自定义配置和扩展

使用建议

  1. 网络请求:使用 executeWithTimeout 处理网络请求,设置合适的超时时间
  2. 文件操作:使用 executeOnIoWithMainCallback 在IO线程执行文件操作,在主线程更新UI
  3. 批量处理:使用 executeAll 并行执行多个独立任务
  4. 不稳定操作:使用 executeWithRetry 处理可能失败的操作

这个工具类提供了生产级别的异步任务处理能力,代码格式规范,注释完整,适合直接用于项目开发。

http://www.dtcms.com/a/361600.html

相关文章:

  • 构建共享新生态的智慧物流开源了
  • ClickHouse常见问题——ClickHouseKeeper配置listen_host后不生效
  • java设计模式一、单例模式
  • 查看LoRA 哪个适配器处于激活状态(67)
  • 【秋招笔试】2025.08.31小红书秋招笔试真题
  • 鸿蒙NEXT开发指南:Image、Video与Swiper组件全面解析
  • Mac idea 格式化代码快捷键
  • 用滑动窗口与线性回归将音频信号转换为“Token”序列:一种简单的音频特征编码方法
  • 若依vue自定义发布环境部署后所有菜单无法点击
  • Kubernetes一网络组件概述
  • 如何正确使用ChatGPT做数学建模比赛——数学建模AI使用技巧
  • Sqlsugar补充自定义模板
  • 环境搭建汇总
  • 在.NET标准库中进行数据验证的方法
  • 【qwen3vsglm4.5】JavaScript 与浏览器事件分类
  • 垃圾渗滤液中镍超标怎么处理
  • 亮数据MCP——专为信息爆炸时代打造的AI新闻利器。
  • 如何选择最佳车载交换机?车载交换机功能讲解
  • UCIE Specification详解(十二)
  • 【小白入】显示器核心参数对比度简介
  • Trae + MCP : 一键生成专业封面
  • (论文速读)3DTopia-XL:高质量3D资产生成技术
  • C语言:树的实现和剖析
  • 火狐退出中国后,Zen 浏览器会是「理想平替」吗?
  • MATLAB实现图像分割:Otsu阈值法
  • 辅助日志/备份文件自动化命名方案
  • 展会回顾 | 聚焦医疗前沿 , 礼达先导在广州医博会展示类器官自动化培养技术
  • 解析简历重难点与面试回答要点
  • Redis基础教程
  • 构建线上门户的核心三要素:域名、DNS与IP 全面解析