当前位置: 首页 > news >正文

【汽车标定数据】动态优先级线程池在异步多文件解析中的应用

目录

一、需求背景

项目背景:电控数据管理系统优化

优化方案:引入OLAP数据库和动态线程池

线程池性能急需解决的问题

资源过载与闲置的平衡:

优先级处理与公平性:

任务类型适配性:

二、线程池介绍

2.1、线程池的核心设计分析

1. 顶层接口:Executor(任务提交与执行解耦)

2. 扩展接口:ExecutorService(增强任务管理和控制能力)

3. 抽象类:AbstractExecutorService(串联任务执行流程)

4. 具体实现类:ThreadPoolExecutor(线程与任务协同管理)

2.2 线程池的执行流程

2.3 线程池中需要关注的几个点

2.3.1 线程池状态维护设计

1. ctl变量的作用

2. 位运算的原理

3. 关键计算方法

2.3.2 线程池中的参数可以动态修改吗

2.3.3 核心线程数可以单独进行设置吗

2.3.4  队列的数量怎么设置

2.3.4 线程池被创建后没有任务过来,里面是不会有线程的,如何预热。

2.3.4.1 全部启动

2.3.4.2 只启动一个

三、代码实现

3.1 监听naocs的配置中心,如果有变化更新参数

3.2 构建优先级枚举类及其注册分级线程池

3.2 构建线程池管理类和可调整队列

3.2 构建具体任务类 和 场景策略枚举调用


一、需求背景

项目背景:电控数据管理系统优化

电控数据管理系统主要用于处理汽车电子控制单元(ECU)的相关数据文件,包括解析、比对、合成和校验等功能。核心文件包括:

  • A2L文件:大小约10~20MB,每个文件包含约8000个变量(如传感器参数)。
  • Hex/S19文件:大小4~10MB,存储ECU所有变量的地址和值(如内存映射数据)。
  • DCM文件:大小约1MB,包含1~5000个变量(具体数量取决于主机厂的管理规则)。

系统需要执行以下关键操作:

  1. 文件解析:将文件内容解析为可处理的数据结构(如变量列表)。
  2. 参数比对:比较不同文件中的上万个参数,涉及单值、数组和MAP类型(例如,检查A2L和Hex文件中的变量是否一致)。
  3. 文件合成:合并多个文件(如基于比对结果生成新文件),涉及上万文件的处理。
  4. 文件校验:验证数据完整性(如检查文件是否损坏或篡改)。
  5. 数据入库:将解析结果存储到数据库,供后续查询和分析。
  6. 等其他耗时计算的操作

这些操作都是CPU密集型任务(需要大量计算),尤其在批量操作(如同时处理多个文件)或多用户并发访问时,负载急剧增加。例如:

  • 单个文件解析可能需要几秒到几分钟,取决于文件大小和复杂度。
  • 当10个用户同时上传文件时,系统可能面临数十个并发任务,导致响应延迟或资源争用。

优化方案:引入OLAP数据库和动态线程池

为解决性能瓶颈,引入两项技术:

  1. OLAP数据库(Doris):项目中用到了大量的分析功能,需要设计海量数据的处理分析。Doris支持列式存储和实时分析用于高效存储和查询海量数据,能加速数据入库和比对操作(例如,查询变量值时,响应时间从秒级降到毫秒级)。
  2. 动态优先级线程池:用于管理任务执行线程,根据系统负载自动调整资源。线程池的核心是动态分配线程数,优先处理高优先级任务(如用户实时请求),避免低优先级任务(如后台批量处理)阻塞系统。

线程池性能急需解决的问题

动态线程池能显著提升系统性能,但必须解决以下关键问题,否则在高负载下可能导致系统崩溃或效率低下:

  1. 资源过载与闲置的平衡

    • 问题:在用户量激增时(如多用户批量上传文件),线程数不足会导致任务排队,响应时间变慢(例如,解析延迟从1秒增至10秒)。反之,线程过多会耗尽CPU,引发系统卡顿。
    • 急需解决:线程池必须根据解析时间(任务耗时)和用户量(并发请求数)动态调整线程数。例如,当用户量增加时,自动增加线程;当任务耗时较长时,减少线程以避免争用。
  2. 优先级处理与公平性

    • 问题:不同任务优先级不同(如用户交互操作优先于后台合成),但静态线程池可能让低优先级任务“饿死”高优先级任务(例如,批量文件合成占用所有线程,导致用户无法及时查看结果)。
    • 急需解决:线程池需支持动态优先级队列。高优先级任务(如文件校验)应优先获取线程;低优先级任务(如批量入库)在系统空闲时执行。这需要算法实时计算任务权重(如基于用户等待时间或任务类型)。
  3. 任务类型适配性

    • 问题:文件解析任务差异大(如A2L文件解析比DCM更耗时),线程池若“一刀切”分配线程,会造成资源浪费(例如,简单任务占用线程但快速完成,复杂任务等待)。
    • 急需解决:线程池应识别任务类型(如通过预估解析时间),并动态分配线程资源。例如,复杂任务(耗时>5秒)分配更多线程,简单任务(耗时<1秒)合并处理。

二、线程池介绍

2.1、线程池的核心设计分析

(参考了美团的线程池文章)

ThreadPoolExecutor从继承关系入手,整体采用分层设计:从顶层接口到抽象类,再到具体实现。以下按逻辑顺序解析核心组件。

1. 顶层接口:Executor(任务提交与执行解耦)
  • 核心思想:将任务提交和任务执行分离,用户无需关心线程创建或调度细节。
  • 工作方式:用户提供Runnable对象(定义任务逻辑),Executor负责线程调配和任务执行。
  • 优势:简化并发编程,提升代码可维护性。
2. 扩展接口:ExecutorService(增强任务管理和控制能力)
  • 功能扩展:在Executor基础上,添加两类关键能力:
    • 任务执行增强:支持异步任务处理,如生成Future对象(用于跟踪一批任务结果)。
    • 线程池管控:提供停止线程池等方法(如shutdown()),实现更精细的资源控制。
3. 抽象类:AbstractExecutorService(串联任务执行流程)
  • 承上启下作用:作为ExecutorService的实现骨架,将任务执行流程标准化。
  • 设计目的:抽象通用逻辑(如任务提交、结果处理),确保子类仅需关注核心执行方法。
  • 简化开发:降低底层实现复杂度,便于扩展新线程池类型。
4. 具体实现类:ThreadPoolExecutor(线程与任务协同管理)
  • 核心职责:作为最终实现,同时处理两方面:
    • 生命周期维护:管理线程池状态(如运行、关闭)。
    • 资源协调:高效调度线程与任务,实现并行执行(例如,通过工作队列和线程复用机制)。
  • 核心参数说明(标红部分是最核心的参数部分,也是需要关注配置的地方):
    • corePoolSize(核心线程数大小):不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut。

    • maximumPoolSize(最大线程数):线程池中最多允许创建 maximumPoolSize 个线程。

    • keepAliveTime:(存活时间):如果经过 keepAliveTime 时间后,超过核心线程数的线程还没有接受到新的任务,那就回收。

    • unit(keepAliveTime 的时间单位。)

    • workQueue(存放待执行任务的队列):当提交的任务数超过核心线程数大小后,再提交的任务就存放在这里。它仅仅用来存放被 execute 方法提交的 Runnable 任务。

    • threadFactory(线程工程):用来创建线程工厂。比如这里面可以自定义线程名称,看着名字就知道这个线程是哪里来的。

    • handler (拒绝策略):当队列里面放满了任务、最大线程数的线程都在工作时,这时继续提交的任务线程池就处理不了,应该执行怎么样的拒绝策略。

2.2 线程池的执行流程

运行逻辑可分为两个核心部分:

  1. 任务管理(生产者角色)
    当任务提交时,线程池根据当前状态决定任务的流转:

    • 如果线程可用,直接分配线程执行任务。
    • 如果线程繁忙,将任务缓冲到队列中等待。
    • 如果系统过载,拒绝该任务。
  2. 线程管理(消费者角色)
    线程池维护一组线程,它们:

    • 主动获取并执行任务。
    • 任务完成后,继续从队列中获取新任务。
    • 当长时间无任务可获取时,线程被回收以节省资源。

2.3 线程池中需要关注的几个点

2.3.1 线程池状态维护设计

线程池内部使用一个名为ctl的原子整数变量来高效维护运行状态和线程数量,避免不一致性问题并提升性能。

1. ctl变量的作用

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  • ctl是一个整数变量(具体是AtomicInteger类型),它同时存储两个关键信息:
    • 运行状态(runState):表示线程池的生命周期状态(如运行中、关闭等),存储在ctl的高3位。
    • 线程数量(workerCount):表示当前活跃线程的数量,存储在ctl的低29位。
  • 用一个变量存储两个值,好处是:在决策时(如添加或移除线程),无需额外锁来保证状态和数量的一致性,提高了并发效率。
2. 位运算的原理
  • ctl的高3位和低29位互不干扰,通过位运算快速提取或组合值:
  • 这种设计基于二进制表示,例如:
    • 如果ctl的值是二进制xxx yyyyy...x代表高3位状态,y代表低29位数量),则:
      • 提取状态时,只保留高3位。
      • 提取数量时,只保留低29位。
3. 关键计算方法

线程池提供了三个方法来操作ctl

  • 获取运行状态runStateOf(c) 
    • 解释:通过按位与操作,屏蔽掉低29位,只保留高3位的状态值。
    • private static int runStateOf(int c){ return c & ~CAPACITY; } //计算当前运行状态
  • 获取线程数量workerCountOf(c)
    • 解释:通过按位与操作,屏蔽掉高3位,只保留低29位的数量值。
    • private static int workerCountOf(int c){ return c & CAPACITY; }//计算当前线程数量

  • 组合状态和数量ctlOf(rs, wc)
    • 解释:通过按位或操作,将高3位的状态和低29位的数量合并成一个新值。
    • private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

2.3.2 线程池中的参数可以动态修改吗

当线程池在运行期调用 setCorePoolSize 方法设置新的 corePoolSize 值时,线程池会立即覆盖原有的 corePoolSize 值。随后,基于新值与原始值的比较结果,采取以下处理策略:

  • 如果新值小于当前工作线程数
    表明存在多余的 worker 线程。线程池会向当前空闲(idle)的 worker 线程发送中断请求以启动回收过程;多余的 worker 线程在下次空闲时也会被回收。

  • 如果新值大于原始值且任务队列中有待执行任务
    线程池会创建新的 worker 线程,以执行队列中的任务。

此设计确保了线程池资源能动态调整,以适应配置变化。

源码中也写了可以动态进行修改设置。

设置最大线程数量同时可以进行设置

2.3.3 核心线程数可以单独进行设置吗

此处需要注意的是最大线程数和核心线程数要一起进行设置。如果当前核心是3 最大线程是5,我们在单独设置核心线程数为7,这个时候核心线程数会大于最大线程数。活跃线程数是不会变的,所以会出现设置出现问题,所以建议设置参数的时候一起进行设置

ava.util.concurrent.ThreadPoolExecutor#getTask

2.3.4  队列的数量怎么设置

队列的 capacity 是被 final 修饰了,所以没办法进行设置,需要我们进行重写一个类,把final去掉就可以设置线程池队列的大小了。

2.3.4 线程池被创建后没有任务过来,里面是不会有线程的,如何预热。

2.3.4.1 全部启动

2.3.4.2 只启动一个

三、代码实现

ok,下面进行代码的展示,如何写一个支持基于nacos动态刷新的分级线程池

3.1 监听naocs的配置中心,如果有变化更新参数


/*** Nacos配置监听,动态调整线程池参数*/
@RefreshScope
@Component
public class NacosThreadPoolConfig implements InitializingBean {private static final Logger logger = LoggerFactory.getLogger(DynamicStrategyTask.class);@Resourceprivate ThreadPoolManager threadPoolManager;@Autowiredprivate NacosConfigManager nacosConfigManager;@Autowiredprivate NacosConfigProperties nacosConfigProperties;/*** 监听Nacos配置变化,动态调整线程池*/public void onConfigChange(String config) {try {// 解析为实体类ThreadPoolProperties TPconfig = YamlParser.parseToObject(config);// 处理高优先级线程池配置handleThreadPoolConfig(TPconfig, HIGH);// 处理中优先级线程池配置handleThreadPoolConfig(TPconfig, MEDIUM);// 处理低优先级线程池配置handleThreadPoolConfig(TPconfig, LOW);} catch (Exception e) {// 处理配置解析异常e.printStackTrace();}}private void handleThreadPoolConfig(ThreadPoolProperties configJson, PriorityEnum priority) {// 解析为Mapif(checkNotNull(configJson)){int core = configJson.getCore().getSize();int max = configJson.getMax().getSize();int queue = configJson.getQueue().getSize();switch (priority){case HIGH:core = core * 2;max  = max  * 2;break;case MEDIUM:queue = queue * 2;break;case LOW:core = core / 2;max = max / 2;queue = queue * 3;break;default:break;}threadPoolManager.adjustThreadPool(priority, core, max, queue);}}private boolean checkNotNull(ThreadPoolProperties configJson) {if (configJson == null || configJson.getCore()== null|| configJson.getMax() == null || configJson.getQueue() == null){return false;}return true;}@Overridepublic void afterPropertiesSet() throws Exception {//nacos配置变更监听nacosConfigManager.getConfigService().addListener("thread-pool-config", nacosConfigProperties.getGroup(),new Listener() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic void receiveConfigInfo(String configInfo) {//配置变更,修改线程池配置logger.info("线程池参数有变动请留意:【%s】",configInfo);onConfigChange(configInfo);}});}

3.2 构建优先级枚举类及其注册分级线程池

/*** 任务优先级枚举*/
public enum PriorityEnum {HIGH(3),MEDIUM(2),LOW(1);private final int level;PriorityEnum(int level) {this.level = level;}public int getLevel() {return level;}public static PriorityEnum fromLevel(int level) {for (PriorityEnum priority : values()) {if (priority.level == level) {return priority;}}return MEDIUM;}
}

/*** 线程池配置类*/
@Configuration
@ConfigurationProperties(prefix = "threadpool")
public class ThreadPoolConfig {@Value("${threadpool.core-pool-size}")private Integer corePoolSize;@Value("${threadpool.maximum-pool-size}")private Integer maximumPoolSize;@Value("${threadpool.queue-capacity}")private Integer queueCapacity;/*** 高优先级线程池*/@Beanpublic DynamicThreadPool highPriorityThreadPool() {DynamicThreadPool threadPool = new DynamicThreadPool(PriorityEnum.HIGH);// 可以根据需要设置不同优先级线程池的默认参数threadPool.setCorePoolSize(corePoolSize * 2);threadPool.setMaxPoolSize(maximumPoolSize * 2);threadPool.setQueueCapacity(queueCapacity);threadPool.setThreadNamePrefix("High-Priority-Thread-");return threadPool;}/*** 中优先级线程池*/@Beanpublic DynamicThreadPool mediumPriorityThreadPool() {DynamicThreadPool threadPool = new DynamicThreadPool(PriorityEnum.MEDIUM);threadPool.setCorePoolSize(corePoolSize);threadPool.setMaxPoolSize(maximumPoolSize);threadPool.setQueueCapacity(queueCapacity * 2);threadPool.setThreadNamePrefix("Medium-Priority-Thread-");return threadPool;}/*** 低优先级线程池*/@Beanpublic DynamicThreadPool lowPriorityThreadPool() {DynamicThreadPool threadPool = new DynamicThreadPool(PriorityEnum.LOW);threadPool.setCorePoolSize(corePoolSize / 2);threadPool.setMaxPoolSize(maximumPoolSize / 2);threadPool.setQueueCapacity(queueCapacity * 3);threadPool.setThreadNamePrefix("Low-Priority-Thread-");return threadPool;}
}
/*** 动态线程池,支持动态调整参数*/
public class DynamicThreadPool extends ThreadPoolTaskExecutor {@Getterprivate final PriorityEnum priority;public DynamicThreadPool(PriorityEnum priority) {this.priority = priority;}/*** 动态调整线程池参数*/public void adjustParameters(int corePoolSize, int maximumPoolSize, int queueCapacity) {// 调整核心线程数setCorePoolSize(corePoolSize);// 调整最大线程数setMaxPoolSize(maximumPoolSize);setQueueCapacity(queueCapacity);}@Overrideprotected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new ResizeCapacityLinkedBlockingQueue<>(queueCapacity);}else {return new SynchronousQueue<>();}}
}

3.2 构建线程池管理类和可调整队列

/*** 线程池管理器,管理不同优先级的线程池*/
@Component
public class ThreadPoolManager {// 用枚举作为键存储线程池private final Map<PriorityEnum, DynamicThreadPool> threadPools;// 自动注入所有DynamicThreadPool类型的Bean(key为Bean名称,value为Bean实例)@Autowiredpublic ThreadPoolManager(List<DynamicThreadPool> dynamicThreadPools) {this.threadPools = new EnumMap<>(PriorityEnum.class);// 将Bean按名称匹配到对应的枚举for (DynamicThreadPool dtp : dynamicThreadPools) {try {// 假设Bean名称与枚举名一致(如"HIGH"对应PriorityEnum.HIGH)threadPools.put(dtp.getPriority(), dtp);} catch (IllegalArgumentException e) {// 处理不匹配的Bean名称(可选:日志告警或忽略)System.out.println("未匹配到枚举的线程池Bean:" + dtp.getPriority());}}}// 根据优先级获取线程池public DynamicThreadPool getThreadPool(PriorityEnum priority) {return threadPools.get(priority);}/*** 提交任务到合适的线程池*/public <T> CompletableFuture<T> submit(PriorityTask<T> task) {PriorityEnum priority = task.getPriority();DynamicThreadPool threadPool = threadPools.get(priority);if (threadPool == null) {throw new IllegalArgumentException("No thread pool found for priority: " + priority);}// 可以根据任务预估时长做更精细的路由调整return CompletableFuture.supplyAsync(() -> {try {return task.call();} catch (Exception e) {throw new RuntimeException(e);}}, threadPool);}/*** 动态调整线程池参数*/public void adjustThreadPool(PriorityEnum priority, int corePoolSize, int maxPoolSize, int queueCapacity) {DynamicThreadPool threadPool = threadPools.get(priority);if (threadPool != null) {threadPool.adjustParameters(corePoolSize, maxPoolSize, queueCapacity);}}/*** 获取线程池状态信息*/public Map<String, Object> getThreadPoolStatus() {Map<String, Object> statusMap = new HashMap<>();threadPools.forEach((priority, pool) -> {Map<String, Object> poolStatus = new java.util.HashMap<>();poolStatus.put("corePoolSize", pool.getCorePoolSize());poolStatus.put("maxPoolSize", pool.getMaxPoolSize());poolStatus.put("queueCapacity", pool.getQueueCapacity());poolStatus.put("activeCount", pool.getActiveCount());poolStatus.put("queueSize", pool.getThreadPoolExecutor().getQueue().size());poolStatus.put("completedTaskCount", pool.getThreadPoolExecutor().getCompletedTaskCount());statusMap.put(priority.name(), poolStatus);});return statusMap;}
ublic class ResizeCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//其他省略private int capacity;
}

3.2 构建具体任务类 和 场景策略枚举调用

public class DynamicStrategyTask implements PriorityTask<String> {private static final Logger logger = LoggerFactory.getLogger(DynamicStrategyTask.class);private final String taskName;private final PriorityEnum priority;private final EvenStrategyInterface strategy;private final BaseDomainEvent event;public DynamicStrategyTask(String taskName, PriorityEnum priority, EvenStrategyInterface strategy,BaseDomainEvent event) {this.taskName = taskName;this.priority = priority;this.strategy = strategy;this.event = event;}@Overridepublic PriorityEnum getPriority() {return priority;}@Overridepublic long estimateExecutionTime() {return 0;}@Overridepublic String call() throws Exception {// 模拟任务执行Date startTime = new Date();logger.info("任务【%s】开始执行,执行线程为【%s】,开始时间【%s】",taskName,Thread.currentThread().getName(), startTime);strategy.execute(event);Date endTime = new Date();long diff    = DateUtils.diff(endTime,startTime);logger.info("任务【%s】执行结束,执行线程为【%s】,结束时间【%s】,计时【%s】",taskName,Thread.currentThread().getName(), endTime,diff);return String.format("任务 [%s] 执行完成,优先级: %s,耗时: %dms",taskName, priority,diff);}
}
@Component
public class EvenStrategyContext {private static final Logger logger = LoggerFactory.getLogger(EvenStrategyContext.class);private EvenStrategyInterface strategy;@Autowiredprivate ThreadPoolManager threadPoolManager;public EvenStrategyContext getEvenStrategyContext(String cycleTypeCode) {if (EvenTypeEnum.getByDomainEvent(cycleTypeCode) == null || EvenTypeEnum.getByDomainEvent(cycleTypeCode).getEvenStrategyInterface() == null){this.strategy = null;}else{this.strategy = EvenTypeEnum.getByDomainEvent(cycleTypeCode).getEvenStrategyInterface();}return this;}public void execute(BaseDomainEvent event) {if (strategy == null){logger.info("没有对应的监听事件,以及对应的处理方法,请检查传入时间【%s】",event.getEventType());}else{threadPoolManager.submit(new DynamicStrategyTask(event.getEventType().getValue(), PriorityEnum.fromLevel(event.getEventType().getPriority()),strategy,event));}}
}

最后在说一下,关于线程池如何调整,美团给了一个公式,但是我目前还是根据经验调整,监控动态线程池的执行情况适量进行调整,毕竟一般的公司服务器资源不是很充足,一个服务器可能部署很多的服务会占用线程资源,所以还得根据 实际情况,综合进行考虑。

http://www.dtcms.com/a/328658.html

相关文章:

  • 2022 年全国硕士研究生招生考试真题笔记
  • 深度学习赋能汽车制造缺陷检测
  • “我店模式”:零售转型中的场景化突围
  • 美团搜索推荐统一Agent之交互协议与多Agent协同
  • 【计算机网络 | 第6篇】计算机体系结构与参考模型
  • go学习笔记-匿名函数
  • 算法题笔记
  • Java连接MySQL数据库
  • Socket 套接字常用方法
  • Java多源AI接口融合框架:动态模型切换与智能路由实战
  • pybind11绑定C++项目心得
  • Sentinel 和 Hystrix
  • MySQL 存储过程终止执行的方法
  • 力扣热题100------279.完全平方数
  • XGBoost 的适用场景以及与 CNN、LSTM 的区别
  • AQS的原理与ReentrantLock的关系
  • 基于Rocky Linux 9的容器化部署方案,使用Alpine镜像实现轻量化
  • 企业高性能web服务器(3)
  • Linux学习-应用软件编程(文件IO)
  • 【科研绘图系列】R语言绘制特定区域颜色标记散点图
  • Pytest项目_day13(usefixture方法、params、ids)
  • 【不动依赖】Kali Linux 2025.2 中安装mongosh
  • 【数据结构】二叉树详细解析
  • 安路Anlogic FPGA下载器的驱动安装与测试教程
  • C++联合体的定义
  • 数据结构 二叉树(2)堆
  • 带宽受限信道下的数据传输速率计算:有噪声与无噪声场景
  • C++方向知识汇总(四)
  • PyCATIA高级建模技术:等距平面、点云重命名与棱柱体创建的工业级实现
  • 基于Java与Vue搭建的供应商询报价管理系统,实现询价、报价、比价全流程管理,功能完备,提供完整可运行源码