当前位置: 首页 > news >正文

基于自定义线程池手写一个异步任务管理器

我们在后端执行某些耗时逻辑操作时往往会导致长时间的线程阻塞,在这种情况之下,我们往往会引一条异步线程去处理这些异步任务,如果每次都创建新的线程来处理这些任务,不仅会增加代码冗余,还可能造成线程管理混乱,影响系统性能。在我们的Spring框架中是自带异步任务处理机制的,比如我们使用@Async 注解可以处理一些简单的异步任务,但这样确实无法精确去控制线程池资源,也无法灵活去管理任务调度,由此,我们可以去自行设计一个高效的自定义异步任务管理器去统一调度处理我们的自定义任务。

1.前置配置

自定义线程池,并将其注册到IOC容器中

 /**
 * 自定义线程池配置
 * @Author GuihaoLv
 **/
@Configuration
public class ThreadPoolConfig
{
    // 核心线程池大小
    private int corePoolSize = 50;

    // 最大可创建的线程数
    private int maxPoolSize = 200;

    // 队列最大长度
    private int queueCapacity = 1000;

    // 线程池维护线程所允许的空闲时间
    private int keepAliveSeconds = 300;


    /**
     * 通用任务线程池
     * @return
     */
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        //当线程池满了,新任务无法加入时,CallerRunsPolicy 让提交任务的线程(即调用方线程)直接执行该任务,
        // 而不是丢弃或抛出异常,从而保证任务不会丢失。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    /**
     * 执行周期性或定时任务
     */
    @Bean(name = "scheduledExecutorService")
    protected ScheduledExecutorService scheduledExecutorService()
    {
        //这里没有最大线程数的概念,所有线程都属于核心线程。
        return new ScheduledThreadPoolExecutor(corePoolSize,
                new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d") //设置线程名称,方便排查日志。
                        .daemon(true).build(), //daemon(true) 使线程池中的线程成为 守护线程,即 JVM 退出时不会阻止进程终止。
                new ThreadPoolExecutor.CallerRunsPolicy()) //使用 CallerRunsPolicy,避免任务丢失。
        {
            //任务执行完毕后,调用 Threads.printException(r, t),捕获并记录异常,确保线程池不会因为未捕获的异常而崩溃。
            @Override
            protected void afterExecute(Runnable r, Throwable t)
            {
                super.afterExecute(r, t);
                Threads.printException(r, t);
            }
        };
    }
}

2.异步任务管理器配置

 /**
 * 异步任务管理器
 * AsyncManager 是 整个异步任务调度的核心,它提供了 任务执行、调度和管理。
 * @Author GuihaoLv
 */
public class AsyncManager
{
    /**
     * 操作延迟10毫秒
     */
    private final int OPERATE_DELAY_TIME = 10;

    /**
     * 异步操作任务调度线程池
     * executor 采用 ScheduledExecutorService 线程池,可以 定时执行异步任务,提高并发能力
     */
    private ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");

    /**
     * 单例模式
     * 采用 单例模式,确保全局只有一个 AsyncManager 实例,保证任务调度统一管理。
     创建单例对象
     */
    private AsyncManager(){}

    //创建异步任务管理器的静态对象
    private static AsyncManager me = new AsyncManager();

    public static AsyncManager me()
    {
        return me;
    }

    /**
     * 使用调度线程池执行任务
     * @param task 任务
     */
    //TimerTask是Java编程语言中的一个抽象类,通常用于安排将来某个时间执行的任务,或者以固定的速率重复执行的任务。
    // 它是与Timer类一起使用的,Timer负责管理和调度这些任务。
    public void execute(TimerTask task)
    {
        executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    }

    /**
     * 停止任务线程池
     */
    public void shutdown()
    {
        //优雅关闭线程池
        Threads.shutdownAndAwaitTermination(executor);
    }
}
 /**
 * 确保应用退出时能关闭后台线程
 * @Author GuihaoLv
 */
@Component
public class ShutdownManager
{
    private static final Logger logger = LoggerFactory.getLogger("sys-user");

    @PreDestroy //它用于在 Spring Bean 被销毁前执行清理逻辑。
    public void destroy()
    {
        shutdownAsyncManager();
        HttpUtils.shutdown();
    }

     /**
     * 停止异步执行任务
     */
    private void shutdownAsyncManager()
    {
        try
        {
            logger.info("====关闭后台任务任务线程池====");
            AsyncManager.me().shutdown();
        }
        catch (Exception e)
        {
            logger.error(e.getMessage(), e);
        }
    }

}

3.异步工厂配置,将异步任务的启动逻辑都扔到异步工厂中处理

/**
 * 异步工厂(产生任务用)
 * AsyncFactory 主要用于创建异步任务,它相当于一个 "任务工厂",
 * 可以根据不同的需求创建不同的任务(如记录用户登录信息、记录操作日志)。
 *
 * @Author GuihaoLv
 */
public class AsyncFactory
{
    private static final Logger sys_user_logger = LoggerFactory.getLogger("sys-user");


    // 定义任务:计算热点文章
    public static TimerTask calculateHotArticlesTask() {
        return new TimerTask() {
            @Override
            public void run() {
                try {
                    // 获取 HotArticleAsycTask 实例
                    HotArticleAsycTask hotArticleAsycTask = SpringUtils.getBean(HotArticleAsycTask.class);
                    // 执行热点文章计算任务
                    hotArticleAsycTask.calculateHotArticles();
                } catch (Exception e) {
                    sys_user_logger.error("计算热点文章失败", e);
                }
            }
        };
    }

    // 定义任务:定时热搜清理
    public static TimerTask cleanupOldKeywords() {
        return new TimerTask() {
            @Override
            public void run() {
                try {
                    // 获取 HotArticleAsycTask 实例
                    HotSearchCleanupTask hotArticleAsycTask = SpringUtils.getBean(HotSearchCleanupTask.class);
                    // 执行热点文章计算任务
                    hotArticleAsycTask.cleanupOldKeywords();
                } catch (Exception e) {
                    sys_user_logger.error("清理热搜失败", e);
                }
            }
        };
    }
}

4.调用异步工厂中的逻辑

/**
 * 热点文章实时计算
 * @Author GuihaoLv
 */
@Component
public class HotArticleAsycTask {

    @Autowired
    private StringRedisTemplate redisTemplate;


    private static final String LIKES_KEY = "blog:likes";
    private static final String FAVORITES_KEY = "blog:favorites";
    private static final String HOT_ARTICLES_KEY = "blog:hot"; // 存热点文章

    public void calculateHotArticles() {
        Set<String> blogIds = redisTemplate.opsForZSet().range(LIKES_KEY, 0, -1);
        if (blogIds == null || blogIds.isEmpty()) return;

        for (String blogId : blogIds) {
            Double likes = redisTemplate.opsForZSet().score(LIKES_KEY, blogId);
            Double favorites = redisTemplate.opsForZSet().score(FAVORITES_KEY, blogId);

            // 计算热度
            double hotScore = (likes != null ? likes * 5.0 : 0) +
                    (favorites != null ? favorites * 8.0 : 0);

            // 热度超过阈值 500,加入热点文章
            if (hotScore >= 500) {
                redisTemplate.opsForZSet().add(HOT_ARTICLES_KEY, blogId, hotScore);
                System.out.println("🔥 文章 " + blogId + " 进入热点榜,热度:" + hotScore);
            }
        }
    }


     /**
      * 每 5 分钟执行一次的定时任务
      */
     @Scheduled(fixedRate = 100000) // 每 5 分钟执行一次
     public void scheduleHotArticlesCalculation() {
         // 将任务交给异步任务管理器执行
         AsyncManager.me().execute(AsyncFactory.calculateHotArticlesTask());
     }
}
http://www.dtcms.com/a/71202.html

相关文章:

  • 2025 linux系统资源使用率统计docker容器使用率统计docker监控软件Weave Scope安装weavescope
  • 快速导出MySQL数据表结构到Excel或Word的方法(Navicat和EasyDatabaseExport)
  • 十种宠物狗-图像分类数据集
  • 分享一个免费的CKA认证学习资料
  • 【eNSP实战】配置Easy IP
  • ClickHouse总体学习
  • uniapp报毒
  • 明基PD2700U显示器无法调节图像模式
  • 骨质健康护理笔记
  • bhSDR Matlab-通用软件无线电平台
  • VSTO(C#)Excel开发8:打包发布安装卸载
  • 设计模式之命令设计模式
  • 墨香阁-测试报告
  • 星越L_电动车窗使用及初始化讲解
  • Ai智能体四:互动式 AI 聊天助手:前端实现
  • 【PHP】新版本特性记录(持续更新)
  • el-table表格样式设置单元格样式方法 :cell-class-name
  • Vue3 Pinia $subscribe localStorage的用法 Store的组合式写法
  • Postman中Authorization和Headers的区别
  • 《图解设计模式》 学习笔记
  • 基于Grafana+Prometheus的IB网卡硬件计数器监控方案
  • 深入解析工厂模式及其C#实现
  • node.js-node.js作为服务器,前端使用WebSocket(单个TCP连接上进行全双工通讯的协议)
  • 70.HarmonyOS NEXT PicturePreview组件深度剖析:从架构设计到核心代码实现
  • 560.和为k的子数组
  • mysql索引讲解
  • 深入理解 Reactor Netty 线程配置及启动命令设置
  • 洛谷 P1068 [NOIP 2009 普及组] 分数线划定 python
  • rust学习笔记14-函数
  • SSO单点登录