当前位置: 首页 > news >正文

【后端】Java封装一个多线程处理任务,可以设置任务优先级优先插队处理,并且提供根据任务ID取消任务

这个需求其实就是一个 带优先级调度、可取消的线程池任务管理器。封装一个 Java Demo,支持:

  1. 任务优先级:数值越大优先级越高,可以“插队”。

  2. 任务取消:可以根据任务 ID 取消还未执行的任务。

  3. 多线程并发执行:用线程池来跑任务。


✨ 基础功能

实现思路

  • 使用 PriorityBlockingQueue 管理任务,保证高优先级任务先执行。

  • 封装一个 Task 类,实现 RunnableComparable<Task>,用于排序。

  • 每个任务有 唯一 ID,方便取消。

  • 使用 ThreadPoolExecutor 配合优先级队列执行任务。


代码示例

import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** 优先级任务*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {private final String taskId;private final int priority;private final Runnable action;private final AtomicBoolean cancelled = new AtomicBoolean(false);public PriorityTask(String taskId, int priority, Runnable action) {this.taskId = taskId;this.priority = priority;this.action = action;}public String getTaskId() {return taskId;}public void cancel() {cancelled.set(true);}@Overridepublic void run() {if (!cancelled.get()) {action.run();} else {System.out.println("任务 " + taskId + " 已被取消,未执行");}}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面return Integer.compare(other.priority, this.priority);}
}/*** 任务管理器*/
class TaskManager {private final ThreadPoolExecutor executor;private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();public TaskManager(int poolSize) {PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();this.executor = new ThreadPoolExecutor(poolSize,poolSize,60L,TimeUnit.SECONDS,queue);}/** 提交任务 */public String submitTask(int priority, Runnable action) {String taskId = UUID.randomUUID().toString();PriorityTask task = new PriorityTask(taskId, priority, action);taskMap.put(taskId, task);executor.execute(task);return taskId;}/** 取消任务 */public boolean cancelTask(String taskId) {PriorityTask task = taskMap.remove(taskId);if (task != null) {task.cancel();return true;}return false;}/** 关闭线程池 */public void shutdown() {executor.shutdown();}
}/*** 测试*/
public class PriorityTaskDemo {public static void main(String[] args) throws InterruptedException {TaskManager manager = new TaskManager(3);// 提交一些任务String id1 = manager.submitTask(1, () -> {System.out.println("执行任务1,优先级1");});String id2 = manager.submitTask(10, () -> {System.out.println("执行任务2,优先级10");});String id3 = manager.submitTask(5, () -> {System.out.println("执行任务3,优先级5");});// 取消一个任务boolean cancelled = manager.cancelTask(id3);System.out.println("取消任务3结果: " + cancelled);Thread.sleep(2000);manager.shutdown();}
}

执行结果示例

可能输出类似:

执行任务1,优先级1
执行任务2,优先级10
任务 c264a955-23e2-4192-8914-bd5e83d9f89d 已被取消,未执行
取消任务3结果: true

说明:

  • 任务 优先级10 插队最先执行。

  • 任务3 被取消,不会执行。

  • 任务1 继续执行。


进一步扩展一下,让任务支持 回调监听(成功/失败/取消),方便在业务里统一埋点


✨ 功能扩展

  1. 每个任务可以带一个 TaskListener 回调,监听任务状态:

    • onSuccess(String taskId)

    • onFailure(String taskId, Throwable error)

    • onCancelled(String taskId)

  2. PriorityTask 内部捕获异常,调用对应的回调方法。

  3. TaskManager 提交任务时可以选择性传入监听器。


完整代码

import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** 任务监听器*/
interface TaskListener {void onSuccess(String taskId);void onFailure(String taskId, Throwable error);void onCancelled(String taskId);
}/*** 优先级任务*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {private final String taskId;private final int priority;private final Runnable action;private final TaskListener listener;private final AtomicBoolean cancelled = new AtomicBoolean(false);public PriorityTask(String taskId, int priority, Runnable action, TaskListener listener) {this.taskId = taskId;this.priority = priority;this.action = action;this.listener = listener;}public String getTaskId() {return taskId;}public void cancel() {cancelled.set(true);}@Overridepublic void run() {if (cancelled.get()) {if (listener != null) listener.onCancelled(taskId);System.out.println("任务 " + taskId + " 已被取消,未执行");return;}try {action.run();if (listener != null) listener.onSuccess(taskId);} catch (Throwable t) {if (listener != null) listener.onFailure(taskId, t);}}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面return Integer.compare(other.priority, this.priority);}
}/*** 任务管理器*/
class TaskManager {private final ThreadPoolExecutor executor;private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();public TaskManager(int poolSize) {PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();this.executor = new ThreadPoolExecutor(poolSize,poolSize,60L,TimeUnit.SECONDS,queue);}/** 提交任务(带监听器) */public String submitTask(int priority, Runnable action, TaskListener listener) {String taskId = UUID.randomUUID().toString();PriorityTask task = new PriorityTask(taskId, priority, action, listener);taskMap.put(taskId, task);executor.execute(task);return taskId;}/** 简化方法(无监听器) */public String submitTask(int priority, Runnable action) {return submitTask(priority, action, null);}/** 取消任务 */public boolean cancelTask(String taskId) {PriorityTask task = taskMap.remove(taskId);if (task != null) {task.cancel();return true;}return false;}/** 关闭线程池 */public void shutdown() {executor.shutdown();}
}/*** 测试*/
public class PriorityTaskDemo {public static void main(String[] args) throws InterruptedException {TaskManager manager = new TaskManager(3);// 定义一个统一的监听器TaskListener listener = new TaskListener() {@Overridepublic void onSuccess(String taskId) {System.out.println("任务 " + taskId + " 执行成功 ✅");}@Overridepublic void onFailure(String taskId, Throwable error) {System.out.println("任务 " + taskId + " 执行失败 ❌: " + error.getMessage());}@Overridepublic void onCancelled(String taskId) {System.out.println("任务 " + taskId + " 被取消 ⏹️");}};// 提交任务String id1 = manager.submitTask(1, () -> {System.out.println("执行任务1(优先级1)");}, listener);String id2 = manager.submitTask(10, () -> {System.out.println("执行任务2(优先级10)");throw new RuntimeException("模拟异常");}, listener);String id3 = manager.submitTask(5, () -> {System.out.println("执行任务3(优先级5)");}, listener);// 取消一个任务boolean cancelled = manager.cancelTask(id3);System.out.println("取消任务3结果: " + cancelled);Thread.sleep(2000);manager.shutdown();}
}

运行结果示例

执行任务1(优先级1)
执行任务2(优先级10)
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 被取消 ⏹️
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 已被取消,未执行
任务 ede52aab-e4a3-4896-88a3-e664b1c1a667 执行成功 ✅
取消任务3结果: true
任务 36230882-e049-44a9-8406-a90d34acad47 执行失败 ❌: 模拟异常

这样就实现了:

  • 优先级插队(10 → 5 → 1)

  • 取消任务(取消后不会执行,并回调 onCancelled

  • 失败捕获(异常不会导致线程池崩溃,回调 onFailure

  • 成功回调(回调 onSuccess


再加一个 全局监听器(所有任务统一回调,方便在一个地方打点统计),希望在 一个地方统一打点统计,包括:

  • 任务ID

  • 开始时间

  • 结束时间

  • 耗时

  • 执行状态(成功 / 失败 / 取消)

这可以通过在 TaskManager 里引入一个 全局监听器 GlobalTaskListener 来实现。


✨ 改进方案

  1. 新增 GlobalTaskListener 接口,统一接收所有任务的生命周期事件。

  2. PriorityTask 内部在执行时打点(记录开始、结束时间),并回调到 GlobalTaskListener

  3. TaskManager 可以配置一个全局监听器(所有任务都会走这里)。


🔧 完整代码

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** 全局任务监听器(统一打点)*/
interface GlobalTaskListener {void onTaskStart(String taskId, int priority, Instant startTime);void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime, Duration duration);void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime, Duration duration,Throwable error);void onTaskCancelled(String taskId, int priority, Instant startTime);
}/*** 优先级任务*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {private final String taskId;private final int priority;private final Runnable action;private final AtomicBoolean cancelled = new AtomicBoolean(false);private final GlobalTaskListener globalListener;public PriorityTask(String taskId, int priority, Runnable action, GlobalTaskListener globalListener) {this.taskId = taskId;this.priority = priority;this.action = action;this.globalListener = globalListener;}public String getTaskId() {return taskId;}public void cancel() {cancelled.set(true);}@Overridepublic void run() {Instant startTime = Instant.now();if (cancelled.get()) {if (globalListener != null) {globalListener.onTaskCancelled(taskId, priority, startTime);}System.out.println("任务 " + taskId + " 已被取消,未执行");return;}if (globalListener != null) {globalListener.onTaskStart(taskId, priority, startTime);}try {action.run();Instant endTime = Instant.now();if (globalListener != null) {globalListener.onTaskSuccess(taskId, priority, startTime, endTime,Duration.between(startTime, endTime));}} catch (Throwable t) {Instant endTime = Instant.now();if (globalListener != null) {globalListener.onTaskFailure(taskId, priority, startTime, endTime, Duration.between(startTime, endTime),t);}}}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面return Integer.compare(other.priority, this.priority);}
}/*** 任务管理器*/
class TaskManager {private final ThreadPoolExecutor executor;private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();private final GlobalTaskListener globalListener;public TaskManager(int poolSize, GlobalTaskListener globalListener) {PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();this.executor = new ThreadPoolExecutor(poolSize,poolSize,60L,TimeUnit.SECONDS,queue);this.globalListener = globalListener;}/** 提交任务 */public String submitTask(int priority, Runnable action) {String taskId = UUID.randomUUID().toString();PriorityTask task = new PriorityTask(taskId, priority, action, globalListener);taskMap.put(taskId, task);executor.execute(task);return taskId;}/** 取消任务 */public boolean cancelTask(String taskId) {PriorityTask task = taskMap.remove(taskId);if (task != null) {task.cancel();return true;}return false;}/** 关闭线程池 */public void shutdown() {executor.shutdown();}
}/*** 测试*/
public class PriorityTaskDemo {public static void main(String[] args) throws InterruptedException {// 定义全局监听器GlobalTaskListener listener = new GlobalTaskListener() {@Overridepublic void onTaskStart(String taskId, int priority, Instant startTime) {System.out.println("任务 " + taskId + "(优先级 " + priority + ")开始执行,时间:" + startTime);}@Overridepublic void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime,Duration duration) {System.out.println("任务 " + taskId + " 成功 ✅,耗时:" + duration.toMillis() + " ms");}@Overridepublic void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime,Duration duration, Throwable error) {System.out.println("任务 " + taskId + " 失败 ❌,耗时:" + duration.toMillis() + " ms,错误:" + error.getMessage());}@Overridepublic void onTaskCancelled(String taskId, int priority, Instant startTime) {System.out.println("任务 " + taskId + " 被取消 ⏹️,取消时间:" + startTime);}};TaskManager manager = new TaskManager(3, listener);// 提交几个任务String id1 = manager.submitTask(1, () -> {try {Thread.sleep(500);} catch (InterruptedException ignored) {}System.out.println("执行任务1");});String id2 = manager.submitTask(10, () -> {try {Thread.sleep(200);} catch (InterruptedException ignored) {}System.out.println("执行任务2");throw new RuntimeException("模拟异常");});String id3 = manager.submitTask(5, () -> {try {Thread.sleep(300);} catch (InterruptedException ignored) {}System.out.println("执行任务3");});// 取消任务3boolean cancelled = manager.cancelTask(id3);System.out.println("取消任务3结果: " + cancelled);Thread.sleep(2000);manager.shutdown();}
}

🖥️ 输出示例

取消任务3结果: true
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 被取消 ⏹️,取消时间:2025-09-12T15:49:57.720205Z
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a(优先级 1)开始执行,时间:2025-09-12T15:49:57.719738Z
任务 d489c530-70d7-4ee7-87b8-627eba04c797(优先级 10)开始执行,时间:2025-09-12T15:49:57.719892Z
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 已被取消,未执行
执行任务2
任务 d489c530-70d7-4ee7-87b8-627eba04c797 失败 ❌,耗时:218 ms,错误:模拟异常
执行任务1
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a 成功 ✅,耗时:518 ms

这样就可以在一个地方(GlobalTaskListener)统一打点,清楚知道:

  • 任务什么时候开始

  • 是否执行成功 / 失败 / 被取消

  • 每个任务的耗时


文章转载自:

http://LBxTWazw.pbwcq.cn
http://UYkZy42b.pbwcq.cn
http://RwBPml94.pbwcq.cn
http://gHgHzhOQ.pbwcq.cn
http://NXwMdJ0t.pbwcq.cn
http://qBddP1Tt.pbwcq.cn
http://QzdAprIX.pbwcq.cn
http://SHGvVs7W.pbwcq.cn
http://hvFDsDM8.pbwcq.cn
http://XK8eGXii.pbwcq.cn
http://qhmGxUOO.pbwcq.cn
http://L9V1lmEK.pbwcq.cn
http://uevkgv1C.pbwcq.cn
http://2CbDFeFD.pbwcq.cn
http://LZfBtnXa.pbwcq.cn
http://eztEH2XP.pbwcq.cn
http://mNxExAmO.pbwcq.cn
http://x9xT3DOS.pbwcq.cn
http://XlPzSkGd.pbwcq.cn
http://rvQ2xlZi.pbwcq.cn
http://dYY77KEM.pbwcq.cn
http://jaTtDqtw.pbwcq.cn
http://vEmiLsNh.pbwcq.cn
http://Jk88R9Rw.pbwcq.cn
http://PfnigxQ1.pbwcq.cn
http://vBo8jFUv.pbwcq.cn
http://ZnO8XWTV.pbwcq.cn
http://TkfSmpvJ.pbwcq.cn
http://imkECTyZ.pbwcq.cn
http://uePWb5xt.pbwcq.cn
http://www.dtcms.com/a/380202.html

相关文章:

  • 数据通信学习
  • Coze源码分析-资源库-创建知识库-前端源码-核心组件
  • GEO 优化工具:让品牌被 AI 主动推荐的关键!
  • 调用京东商品详情API接口时,如何进行性能优化?
  • 鸿蒙审核问题——折叠屏展开态切换时,输入框内容丢失
  • JAiRouter GitHub Actions 自动打包发布镜像到 Docker Hub 技术揭秘
  • 破壁者指南:内网穿透技术的深度解构与实战方法
  • TOGAF——ArchiMate
  • 吃透 Vue 样式穿透:从 scoped 原理到组件库样式修改实战
  • Linux网络:初识网络
  • 【Docker-Nginx】通过Docker部署Nginx容器
  • 测试es向量检索
  • 统计与大数据分析专业核心工具指南
  • Qtday2作业
  • LazyForEach性能优化:解决长列表卡顿问题
  • 封装从url 拉取 HTML 并加载到 WebView 的完整流程
  • Python 批量处理:Markdown 与 HTML 格式相互转换
  • SOME/IP 协议深度解析
  • 变分自编码器详解与实现
  • 危险的PHP命令执行方法
  • 设计模式(C++)详解—抽象工厂模式 (Abstract Factory)(1)
  • 芯科科技FG23L无线SoC现已全面供货,为Sub-GHz物联网应用提供最佳性价比
  • 4步OpenCV-----扫秒身份证号
  • Qt的数据库模块介绍,Qt访问SQLite详细示例
  • 线性预热机制(Linear Warmup):深度学习训练稳定性的关键策略
  • 【Ansible】管理复杂的Play和Playbook知识点
  • 微软图引擎GraphEngine深度解析:分布式内存计算的技术革命
  • TBBT: FunWithFlags靶场渗透
  • Git .gitignore 文件不生效的原因及解决方法
  • Elasticsearch面试精讲 Day 16:索引性能优化策略