java分布式定时任务
一、分布式锁的底层实现细节(以 Redis 为例)
分布式锁是解决任务重复执行的核心,需保证原子性、超时释放和可重入性。以下是生产级 Redis 锁实现:
public class RedisDistributedLock {private final RedisTemplate<String, String> redisTemplate;private final String lockKey;private final String lockValue; // 用于标识锁持有者(支持可重入)private final long expireMillis; // 锁过期时间(避免死锁)// 构造函数:初始化锁参数public RedisDistributedLock(RedisTemplate<String, String> redisTemplate, String lockKey, String requestId, long expireMillis) {this.redisTemplate = redisTemplate;this.lockKey = lockKey;this.lockValue = requestId; // 建议使用UUID+线程IDthis.expireMillis = expireMillis;}// 尝试获取锁(原子操作)public boolean tryLock() {// 使用Redis的SET命令实现:NX(不存在则设置)+ PX(毫秒过期)return redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireMillis, TimeUnit.MILLISECONDS);}// 释放锁(需校验持有者,避免误释放)public boolean unlock() {// 使用Lua脚本保证删除操作的原子性String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Long result = (Long) redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),Collections.singletonList(lockKey),lockValue);return result != null && result > 0;}// 带超时等待的获取锁(轮询重试)public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {long timeout = unit.toMillis(waitTime);long start = System.currentTimeMillis();while (true) {if (tryLock()) {return true;}// 等待重试(避免自旋过于频繁)long remaining = timeout - (System.currentTimeMillis() - start);if (remaining <= 0) {return false; // 超时未获取到锁}Thread.sleep(Math.min(remaining, 100)); // 最多等待100ms重试}}
}
关键设计点:
- 锁标识(lockValue):用 UUID + 线程 ID 区分持有者,避免释放其他节点的锁。
- 过期时间:需大于任务执行时间(如任务耗时 5s,锁过期设 10s),防止节点宕机导致锁永久持有。
- 续约机制:若任务执行时间可能超过锁过期时间,需启动后台线程定期续约(如每 3s 续期 10s)。
二、任务调度核心原理(以 XXL-Job 为例)
1. 调度中心与执行器通信流程
- 执行器注册:执行器启动时通过 HTTP 请求向调度中心注册(携带 appname、IP、端口)。
- 任务触发:调度中心根据 CRON 表达式计算下次执行时间,到达时间后通过线程池触发任务,向执行器发送 HTTP 请求(POST 方式)。
- 执行反馈:执行器执行完任务后,将结果(成功 / 失败、日志)同步回调度中心。
2. 路由策略与负载均衡
XXL-Job 支持多种路由策略,解决任务在集群节点的分配问题:
- 第一个节点:固定选择集群中第一个在线节点(适合单节点执行的任务)。
- 轮询:按顺序依次分配给在线节点(均衡负载)。
- 分片广播:所有在线节点同时执行,每个节点处理不同分片(适合大规模任务)。
分片示例:100 万条数据需批量处理,分为 5 个分片,集群 3 个节点:
@XxlJob("shardingTask")
public ReturnT<String> shardingHandler(String param) {// 获取分片参数(由调度中心分配)ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();int shardIndex = shardingVO.getIndex(); // 当前分片索引(0-4)int shardTotal = shardingVO.getTotal(); // 总分片数(5)// 按分片处理数据(如按ID取模:id % shardTotal == shardIndex)List<Data> dataList = dataService.queryBySharding(shardIndex, shardTotal);for (Data data : dataList) {processData(data);}return ReturnT.SUCCESS;
}
三、高可用设计(避免单点故障)
1. 调度中心集群化
- 部署方式:多实例部署(如 2 个节点),通过 Nginx 负载均衡对外提供服务。
- 数据一致性:依赖 MySQL 主从同步(调度中心数据存储在 MySQL),确保多实例数据一致。
2. 执行器故障转移
- 心跳检测:执行器定期向调度中心发送心跳(默认 30s 一次),超过 90s 未心跳则标记为离线。
- 任务转移:若执行器离线,调度中心会将其负责的任务分配给其他在线节点(需任务支持重执行)。
四、监控与告警体系
1. 核心监控指标
- 任务维度:执行次数、成功率、平均耗时、最大耗时。
- 节点维度:CPU 使用率、内存占用、任务并发数。
2. 集成 Prometheus 监控
// 自定义任务执行指标(使用Micrometer)
@Component
public class TaskMetrics {private final MeterRegistry meterRegistry;public TaskMetrics(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;}// 记录任务执行耗时public void recordTaskDuration(String taskName, long durationMs) {Timer.builder("task.execution.duration").tag("task", taskName).register(meterRegistry).record(durationMs, TimeUnit.MILLISECONDS);}// 记录任务失败次数public void incrementFailCount(String taskName) {Counter.builder("task.execution.fail").tag("task", taskName).register(meterRegistry).increment();}
}
在任务执行中埋点:
@XxlJob("orderTimeoutTask")
public ReturnT<String> orderTimeoutHandler(String param) {long start = System.currentTimeMillis();try {// 任务逻辑...metrics.recordTaskDuration("orderTimeoutTask", System.currentTimeMillis() - start);return ReturnT.SUCCESS;} catch (Exception e) {metrics.incrementFailCount("orderTimeoutTask");return ReturnT.FAIL;}
}
3. 告警配置
通过 Grafana 设置告警规则(如任务失败率 > 5% 时触发告警),并集成钉钉 / 企业微信机器人:
// 钉钉告警示例
public class DingTalkAlarm {private final String webhook;public void sendAlarm(String message) {HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);Map<String, Object> body = new HashMap<>();body.put("msgtype", "text");body.put("text", Map.of("content", "定时任务告警:" + message));new RestTemplate().postForObject(webhook, new HttpEntity<>(body, headers), String.class);}
}
五、自定义轻量级方案(无框架依赖)
若场景简单(如无动态配置需求),可基于 Redis + 线程池实现极简方案:
@Component
public class RedisScheduledTask {@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate TaskService taskService;// 初始化定时任务(每分钟执行一次)@PostConstructpublic void init() {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(this::executeTask, 0, 1, TimeUnit.MINUTES);}// 执行任务(加分布式锁)private void executeTask() {String lockKey = "task:order:timeout";String requestId = UUID.randomUUID().toString();RedisDistributedLock lock = new RedisDistributedLock(redisTemplate, lockKey, requestId, 60000);try {if (lock.tryLock()) {// 执行核心逻辑taskService.processTimeoutOrders();} else {log.info("任务被其他节点执行,当前节点跳过");}} finally {lock.unlock(); // 释放锁}}
}
六、避坑指南
- 锁过期时间设置:需大于任务最大执行时间(可通过压测评估),避免任务未执行完锁已释放。
- 任务幂等性:即使加了锁,仍需保证任务可重复执行(如使用
UPDATE orders SET status=1 WHERE id=? AND status=0
)。 - 线程池隔离:核心任务与非核心任务使用独立线程池(如
Executors.newScheduledThreadPool(5)
),避免相互阻塞。 - 日志追踪:任务执行日志需包含唯一 ID(如订单号),便于问题排查。
通过以上细节设计,可构建既高效又可靠的分布式定时任务系统,兼顾性能、可用性和可运维性。实际项目中,建议优先选用 XXL-Job 等成熟框架,减少重复开发;特殊场景下再考虑自定义方案。