【后端】Java封装一个多线程处理任务,可以设置任务优先级优先插队处理,并且提供根据任务ID取消任务
这个需求其实就是一个 带优先级调度、可取消的线程池任务管理器。封装一个 Java Demo,支持:
任务优先级:数值越大优先级越高,可以“插队”。
任务取消:可以根据任务 ID 取消还未执行的任务。
多线程并发执行:用线程池来跑任务。
✨ 基础功能
实现思路
使用
PriorityBlockingQueue
管理任务,保证高优先级任务先执行。封装一个
Task
类,实现Runnable
和Comparable<Task>
,用于排序。每个任务有 唯一 ID,方便取消。
使用
ThreadPoolExecutor
配合优先级队列执行任务。
代码示例
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** 优先级任务*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {private final String taskId;private final int priority;private final Runnable action;private final AtomicBoolean cancelled = new AtomicBoolean(false);public PriorityTask(String taskId, int priority, Runnable action) {this.taskId = taskId;this.priority = priority;this.action = action;}public String getTaskId() {return taskId;}public void cancel() {cancelled.set(true);}@Overridepublic void run() {if (!cancelled.get()) {action.run();} else {System.out.println("任务 " + taskId + " 已被取消,未执行");}}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面return Integer.compare(other.priority, this.priority);}
}/*** 任务管理器*/
class TaskManager {private final ThreadPoolExecutor executor;private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();public TaskManager(int poolSize) {PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();this.executor = new ThreadPoolExecutor(poolSize,poolSize,60L,TimeUnit.SECONDS,queue);}/** 提交任务 */public String submitTask(int priority, Runnable action) {String taskId = UUID.randomUUID().toString();PriorityTask task = new PriorityTask(taskId, priority, action);taskMap.put(taskId, task);executor.execute(task);return taskId;}/** 取消任务 */public boolean cancelTask(String taskId) {PriorityTask task = taskMap.remove(taskId);if (task != null) {task.cancel();return true;}return false;}/** 关闭线程池 */public void shutdown() {executor.shutdown();}
}/*** 测试*/
public class PriorityTaskDemo {public static void main(String[] args) throws InterruptedException {TaskManager manager = new TaskManager(3);// 提交一些任务String id1 = manager.submitTask(1, () -> {System.out.println("执行任务1,优先级1");});String id2 = manager.submitTask(10, () -> {System.out.println("执行任务2,优先级10");});String id3 = manager.submitTask(5, () -> {System.out.println("执行任务3,优先级5");});// 取消一个任务boolean cancelled = manager.cancelTask(id3);System.out.println("取消任务3结果: " + cancelled);Thread.sleep(2000);manager.shutdown();}
}
执行结果示例
可能输出类似:
执行任务1,优先级1
执行任务2,优先级10
任务 c264a955-23e2-4192-8914-bd5e83d9f89d 已被取消,未执行
取消任务3结果: true
说明:
任务 优先级10 插队最先执行。
任务3 被取消,不会执行。
任务1 继续执行。
进一步扩展一下,让任务支持 回调监听(成功/失败/取消),方便在业务里统一埋点
✨ 功能扩展
每个任务可以带一个
TaskListener
回调,监听任务状态:onSuccess(String taskId)
onFailure(String taskId, Throwable error)
onCancelled(String taskId)
PriorityTask
内部捕获异常,调用对应的回调方法。TaskManager
提交任务时可以选择性传入监听器。
完整代码
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** 任务监听器*/
interface TaskListener {void onSuccess(String taskId);void onFailure(String taskId, Throwable error);void onCancelled(String taskId);
}/*** 优先级任务*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {private final String taskId;private final int priority;private final Runnable action;private final TaskListener listener;private final AtomicBoolean cancelled = new AtomicBoolean(false);public PriorityTask(String taskId, int priority, Runnable action, TaskListener listener) {this.taskId = taskId;this.priority = priority;this.action = action;this.listener = listener;}public String getTaskId() {return taskId;}public void cancel() {cancelled.set(true);}@Overridepublic void run() {if (cancelled.get()) {if (listener != null) listener.onCancelled(taskId);System.out.println("任务 " + taskId + " 已被取消,未执行");return;}try {action.run();if (listener != null) listener.onSuccess(taskId);} catch (Throwable t) {if (listener != null) listener.onFailure(taskId, t);}}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面return Integer.compare(other.priority, this.priority);}
}/*** 任务管理器*/
class TaskManager {private final ThreadPoolExecutor executor;private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();public TaskManager(int poolSize) {PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();this.executor = new ThreadPoolExecutor(poolSize,poolSize,60L,TimeUnit.SECONDS,queue);}/** 提交任务(带监听器) */public String submitTask(int priority, Runnable action, TaskListener listener) {String taskId = UUID.randomUUID().toString();PriorityTask task = new PriorityTask(taskId, priority, action, listener);taskMap.put(taskId, task);executor.execute(task);return taskId;}/** 简化方法(无监听器) */public String submitTask(int priority, Runnable action) {return submitTask(priority, action, null);}/** 取消任务 */public boolean cancelTask(String taskId) {PriorityTask task = taskMap.remove(taskId);if (task != null) {task.cancel();return true;}return false;}/** 关闭线程池 */public void shutdown() {executor.shutdown();}
}/*** 测试*/
public class PriorityTaskDemo {public static void main(String[] args) throws InterruptedException {TaskManager manager = new TaskManager(3);// 定义一个统一的监听器TaskListener listener = new TaskListener() {@Overridepublic void onSuccess(String taskId) {System.out.println("任务 " + taskId + " 执行成功 ✅");}@Overridepublic void onFailure(String taskId, Throwable error) {System.out.println("任务 " + taskId + " 执行失败 ❌: " + error.getMessage());}@Overridepublic void onCancelled(String taskId) {System.out.println("任务 " + taskId + " 被取消 ⏹️");}};// 提交任务String id1 = manager.submitTask(1, () -> {System.out.println("执行任务1(优先级1)");}, listener);String id2 = manager.submitTask(10, () -> {System.out.println("执行任务2(优先级10)");throw new RuntimeException("模拟异常");}, listener);String id3 = manager.submitTask(5, () -> {System.out.println("执行任务3(优先级5)");}, listener);// 取消一个任务boolean cancelled = manager.cancelTask(id3);System.out.println("取消任务3结果: " + cancelled);Thread.sleep(2000);manager.shutdown();}
}
运行结果示例
执行任务1(优先级1)
执行任务2(优先级10)
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 被取消 ⏹️
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 已被取消,未执行
任务 ede52aab-e4a3-4896-88a3-e664b1c1a667 执行成功 ✅
取消任务3结果: true
任务 36230882-e049-44a9-8406-a90d34acad47 执行失败 ❌: 模拟异常
这样就实现了:
优先级插队(10 → 5 → 1)
取消任务(取消后不会执行,并回调
onCancelled
)失败捕获(异常不会导致线程池崩溃,回调
onFailure
)成功回调(回调
onSuccess
)
再加一个 全局监听器(所有任务统一回调,方便在一个地方打点统计),希望在 一个地方统一打点统计,包括:
任务ID
开始时间
结束时间
耗时
执行状态(成功 / 失败 / 取消)
这可以通过在 TaskManager
里引入一个 全局监听器 GlobalTaskListener 来实现。
✨ 改进方案
新增
GlobalTaskListener
接口,统一接收所有任务的生命周期事件。PriorityTask
内部在执行时打点(记录开始、结束时间),并回调到GlobalTaskListener
。TaskManager
可以配置一个全局监听器(所有任务都会走这里)。
🔧 完整代码
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** 全局任务监听器(统一打点)*/
interface GlobalTaskListener {void onTaskStart(String taskId, int priority, Instant startTime);void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime, Duration duration);void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime, Duration duration,Throwable error);void onTaskCancelled(String taskId, int priority, Instant startTime);
}/*** 优先级任务*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {private final String taskId;private final int priority;private final Runnable action;private final AtomicBoolean cancelled = new AtomicBoolean(false);private final GlobalTaskListener globalListener;public PriorityTask(String taskId, int priority, Runnable action, GlobalTaskListener globalListener) {this.taskId = taskId;this.priority = priority;this.action = action;this.globalListener = globalListener;}public String getTaskId() {return taskId;}public void cancel() {cancelled.set(true);}@Overridepublic void run() {Instant startTime = Instant.now();if (cancelled.get()) {if (globalListener != null) {globalListener.onTaskCancelled(taskId, priority, startTime);}System.out.println("任务 " + taskId + " 已被取消,未执行");return;}if (globalListener != null) {globalListener.onTaskStart(taskId, priority, startTime);}try {action.run();Instant endTime = Instant.now();if (globalListener != null) {globalListener.onTaskSuccess(taskId, priority, startTime, endTime,Duration.between(startTime, endTime));}} catch (Throwable t) {Instant endTime = Instant.now();if (globalListener != null) {globalListener.onTaskFailure(taskId, priority, startTime, endTime, Duration.between(startTime, endTime),t);}}}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面return Integer.compare(other.priority, this.priority);}
}/*** 任务管理器*/
class TaskManager {private final ThreadPoolExecutor executor;private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();private final GlobalTaskListener globalListener;public TaskManager(int poolSize, GlobalTaskListener globalListener) {PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();this.executor = new ThreadPoolExecutor(poolSize,poolSize,60L,TimeUnit.SECONDS,queue);this.globalListener = globalListener;}/** 提交任务 */public String submitTask(int priority, Runnable action) {String taskId = UUID.randomUUID().toString();PriorityTask task = new PriorityTask(taskId, priority, action, globalListener);taskMap.put(taskId, task);executor.execute(task);return taskId;}/** 取消任务 */public boolean cancelTask(String taskId) {PriorityTask task = taskMap.remove(taskId);if (task != null) {task.cancel();return true;}return false;}/** 关闭线程池 */public void shutdown() {executor.shutdown();}
}/*** 测试*/
public class PriorityTaskDemo {public static void main(String[] args) throws InterruptedException {// 定义全局监听器GlobalTaskListener listener = new GlobalTaskListener() {@Overridepublic void onTaskStart(String taskId, int priority, Instant startTime) {System.out.println("任务 " + taskId + "(优先级 " + priority + ")开始执行,时间:" + startTime);}@Overridepublic void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime,Duration duration) {System.out.println("任务 " + taskId + " 成功 ✅,耗时:" + duration.toMillis() + " ms");}@Overridepublic void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime,Duration duration, Throwable error) {System.out.println("任务 " + taskId + " 失败 ❌,耗时:" + duration.toMillis() + " ms,错误:" + error.getMessage());}@Overridepublic void onTaskCancelled(String taskId, int priority, Instant startTime) {System.out.println("任务 " + taskId + " 被取消 ⏹️,取消时间:" + startTime);}};TaskManager manager = new TaskManager(3, listener);// 提交几个任务String id1 = manager.submitTask(1, () -> {try {Thread.sleep(500);} catch (InterruptedException ignored) {}System.out.println("执行任务1");});String id2 = manager.submitTask(10, () -> {try {Thread.sleep(200);} catch (InterruptedException ignored) {}System.out.println("执行任务2");throw new RuntimeException("模拟异常");});String id3 = manager.submitTask(5, () -> {try {Thread.sleep(300);} catch (InterruptedException ignored) {}System.out.println("执行任务3");});// 取消任务3boolean cancelled = manager.cancelTask(id3);System.out.println("取消任务3结果: " + cancelled);Thread.sleep(2000);manager.shutdown();}
}
🖥️ 输出示例
取消任务3结果: true
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 被取消 ⏹️,取消时间:2025-09-12T15:49:57.720205Z
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a(优先级 1)开始执行,时间:2025-09-12T15:49:57.719738Z
任务 d489c530-70d7-4ee7-87b8-627eba04c797(优先级 10)开始执行,时间:2025-09-12T15:49:57.719892Z
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 已被取消,未执行
执行任务2
任务 d489c530-70d7-4ee7-87b8-627eba04c797 失败 ❌,耗时:218 ms,错误:模拟异常
执行任务1
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a 成功 ✅,耗时:518 ms
这样就可以在一个地方(GlobalTaskListener
)统一打点,清楚知道:
任务什么时候开始
是否执行成功 / 失败 / 被取消
每个任务的耗时