真人做爰网站东莞网站建设平台
LJF-Framework 第13章 LjfAsyncManager异步任务管理
一、LjfAsyncService接口
提供两个接口,执行无返回值的异步任务和有返回值的异步任务
package com.ljf.framework.Async;import java.util.concurrent.ThreadPoolExecutor;/*** 说明:** @Auther: lijinfeng* @Date: 2024/8/1*/
public interface LjfAsyncService {/*** 异步执行且有返回值** @param ljfAsyncEvent 异步事件* @param threadPoolExecutor 执行线程池* @param <T> 返回值范型* @return*/<T> LjfAsyncResult<T> supplyAsync(LjfAsyncEvent<T> ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor);/*** 异步执行无返回值** @param ljfAsyncEvent 异步事件* @param threadPoolExecutor 执行线程池*/void runAsync(LjfAsyncEvent ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor);}
二、LjfAsyncResult 封装异步直接结果
package com.ljf.framework.Async;import java.util.concurrent.CompletableFuture;/*** 说明:异步执行结果,扩展需要添加对应的构造方法** @Auther: lijinfeng* @Date: 2024/8/2*/
public class LjfAsyncResult<T>{private CompletableFuture<T> completableFuture;public LjfAsyncResult(CompletableFuture<T> completableFuture) {this.completableFuture = completableFuture;}public T join(){return completableFuture.join();}
}
三、LjfAsyncAbstractService抽象类
内部实现一些框架需要的中间处理逻辑,客制化实现需要实现该方法而不是LjfAsyncService
package com.ljf.framework.Async;import com.ljf.framework.context.LjfContextManager;import java.util.concurrent.ThreadPoolExecutor;/*** 说明:** @Auther: lijinfeng* @Date: 2024/8/2*/
public abstract class LjfAsyncAbstractService implements LjfAsyncService {abstract <T> LjfAsyncResult<T> supplyAsyncCus(LjfAsyncEvent<T> ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor);abstract void runAsyncCus(LjfAsyncEvent ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor);@Overridepublic <T> LjfAsyncResult<T> supplyAsync(LjfAsyncEvent<T> ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor) {ljfAsyncEvent.setLjfContext(LjfContextManager.getContext());if (null == threadPoolExecutor){threadPoolExecutor = ThreadPoolConfiguration.getSystemPoolExecutorService();}return this.supplyAsyncCus(ljfAsyncEvent,threadPoolExecutor);}@Overridepublic void runAsync(LjfAsyncEvent ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor) {ljfAsyncEvent.setLjfContext(LjfContextManager.getContext());if (null == threadPoolExecutor) {threadPoolExecutor = ThreadPoolConfiguration.getSystemPoolExecutorService();}this.runAsyncCus(ljfAsyncEvent,threadPoolExecutor);}
}
四、LjfAsyncEvent封装需要执行的任务
自定义异步任务时,需要继承该抽象类,run()中编写具体的任务逻辑,这里我们封装了上下文的处理,以便在子线程中获取主线程中上下文参数。
package com.ljf.framework.Async;import com.ljf.framework.LjfManager;
import com.ljf.framework.context.LjfContext;import java.util.concurrent.ConcurrentHashMap;/*** 说明:异步任务事件** @Auther: lijinfeng* @Date: 2024/8/1*/
public abstract class LjfAsyncEvent<T> {private ConcurrentHashMap map = new ConcurrentHashMap();public abstract T run();public T runPrepare(){// 设置上下文LjfManager.getLjfContext().syncContext(map.get("ljfContextObject"));return run();};public void setLjfContext(LjfContext context){if (context == null)return;map.put("ljfContextObject",context.getSyncObject());};
}
五、LjfAsyncServiceDefaultImpl默认实现,使用CompletableFuture
package com.ljf.framework.Async;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;/*** 说明:** @Auther: lijinfeng* @Date: 2024/8/1*/
public class LjfAsyncServiceDefaultImpl extends LjfAsyncAbstractService {@Overridepublic <T> LjfAsyncResult<T> supplyAsyncCus(LjfAsyncEvent<T> ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor){// 创建一个CompletableFuture实例CompletableFuture<T> futurePrice = CompletableFuture.supplyAsync(ljfAsyncEvent::runPrepare, threadPoolExecutor);// 当结果准备好后,获取它return new LjfAsyncResult<>(futurePrice);}@Overridepublic void runAsyncCus(LjfAsyncEvent ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor) {CompletableFuture.runAsync(ljfAsyncEvent::runPrepare,threadPoolExecutor);}
}
六、配置个线程池供异步任务使用
要使用的话记得调整参数,这里只是简单测试配置的。
package com.ljf.framework.Async;import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 说明:自定义线程池** @Auther: lijinfeng* @Date: 2024/8/1*/
public class ThreadPoolConfiguration {private static ThreadPoolExecutor systemPoolExecutorService;private static ThreadPoolExecutor dbPoolExecutorService;static {setSystemPoolExecutorService();setDbPoolExecutorService();}/*** 系统异步任务线程池** @return*/public static void setSystemPoolExecutorService() {ThreadPoolConfiguration.systemPoolExecutorService = new ThreadPoolExecutor(3,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),Executors.defaultThreadFactory(),(r, executor) -> System.out.println("system pool is full! "));}/*** 数据库异步任务线程池** @return*/public static void setDbPoolExecutorService() {ThreadPoolConfiguration.dbPoolExecutorService = new ThreadPoolExecutor(3,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),Executors.defaultThreadFactory(),(r, executor) -> System.out.println("system pool is full! "));}public static ThreadPoolExecutor getSystemPoolExecutorService() {return systemPoolExecutorService;}public static ThreadPoolExecutor getDbPoolExecutorService() {return dbPoolExecutorService;}
}
七、小管家LjfAsyncManager
异步任务的执行和管理在这里统一管理
package com.ljf.framework.Async;import com.ljf.framework.LjfManager;
import com.ljf.framework.exception.LjfAsyncException;
import com.ljf.framework.exception.LjfExceptionEnum;import java.util.concurrent.ThreadPoolExecutor;/*** 说明:** @Auther: lijinfeng* @Date: 2024/8/1*/
public class LjfAsyncManager {private static boolean isRunning = true;/*** 有返回值的异步任务,使用默认线程池** @param ljfAsyncEvent 异步事件* @param <T> 返回值类型* @return AsyncResult*/public static <T> LjfAsyncResult<T> supplyAsync(LjfAsyncEvent<T> ljfAsyncEvent) {check();return LjfManager.getAsyncService().supplyAsync(ljfAsyncEvent, null);}/*** 无返回值的异步任务,使用默认线程池** @param ljfAsyncEvent 异步事件*/public static void runAsync(LjfAsyncEvent ljfAsyncEvent) {check();LjfManager.getAsyncService().runAsync(ljfAsyncEvent, null);}/*** 有返回值的异步任务,使用指定线程池** @param ljfAsyncEvent 异步事件* @param threadPoolExecutor 指定线程池* @param <T> 返回值类型* @return AsyncResult*/public static <T> LjfAsyncResult<T> supplyAsync(LjfAsyncEvent<T> ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor) {check();return LjfManager.getAsyncService().supplyAsync(ljfAsyncEvent, threadPoolExecutor);}/*** 无返回值的异步任务,使用指定线程池** @param ljfAsyncEvent 异步事件* @param threadPoolExecutor 指定线程池*/public static void runAsync(LjfAsyncEvent ljfAsyncEvent, ThreadPoolExecutor threadPoolExecutor) {check();LjfManager.getAsyncService().runAsync(ljfAsyncEvent, threadPoolExecutor);}/*** 关闭线程池*/public static void shutdown() {isRunning = false;ThreadPoolConfiguration.getDbPoolExecutorService().shutdown();ThreadPoolConfiguration.getSystemPoolExecutorService().shutdown();System.out.println("ljf-framework ThreadPoolExecutor shutdown");}public static void check() {if (!isRunning) {throw new LjfAsyncException(LjfExceptionEnum.ASYNC_STOP,"异步任务管理器已停止,不能执行异步任务");}}
}
八、测试一下
package com.ljf.test;import com.ljf.framework.Async.LjfAsyncEvent;
import com.ljf.framework.Async.LjfAsyncManager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 描述 :* <p>* 版本 作者 时间 内容* 1.0 lijinfeng 2025-04-03 09:33 create*/
@SpringBootApplication
public class LjfAsyncTest {public static void main(String[] args) throws InterruptedException {SpringApplication.run(LjfAsyncTest.class, args);LjfAsyncManager.runAsync(new LjfAsyncEvent() {@Overridepublic Object run() {for (int i = 0; i < 8; i++) {System.out.println("异步:"+i);try {Thread.sleep(3 * 1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}return null;}});for (int i = 0; i < 8; i++) {System.out.println(i);if(i==4){LjfAsyncManager.shutdown();}LjfAsyncManager.runAsync(new LjfAsyncEvent() {@Overridepublic Object run() {System.out.println("新的异步异步");return null;}});Thread.sleep(3 * 1000);}}
}
测试结果:
我这里有点随便,大家可以再认真测试一下,有bug麻烦留言,我会持续改善它