当前位置: 首页 > news >正文

分布式定时任务:Elastic-Job-Lite

Elastic-Job-Lite 是一款由 Apache 开源的轻量级分布式任务调度框架,属于 ShardingSphere 生态体系的一部分。它专注于分布式任务调度,支持弹性伸缩、分片处理、高可用等特性,且不依赖中心化架构。

一、基础

(一)核心特性
  1. 分布式协调
    通过 ZooKeeper 实现作业的分布式调度和协调,确保任务在集群环境中不重复、不遗漏地执行。

  2. 分片机制
    支持将任务拆分为多个分片(Sharding)并行执行,提升处理效率。例如:

    // 根据分片参数处理不同数据
    int shardIndex = context.getShardingItem();  // 分片索引(0,1,2...)
    String shardParam = context.getShardingParameter();  // 分片参数
    
  3. 弹性伸缩
    动态感知集群节点变化,自动重新分配分片。新增节点时,分片会被均匀分配到新节点;节点下线时,其分片会被其他节点接管。

  4. 多种作业类型

    • SimpleJob:简单任务,实现 SimpleJob 接口即可。
    • DataflowJob:数据流任务,支持数据抓取(fetch)和处理(process)。
    • ScriptJob:脚本任务,支持 Shell、Python 等脚本语言。
  5. 失效转移
    当作业节点崩溃时,正在执行的分片会被转移到其他节点继续执行。

  6. 幂等性保障
    通过 ZooKeeper 实现分布式锁,确保同一分片在同一时间只被一个节点执行。

(二)架构设计

Elastic-Job-Lite 采用去中心化架构:

  • 作业节点:直接部署在应用中,既是执行节点也是调度节点。
  • 注册中心:依赖 ZooKeeper 存储作业元数据和运行状态。
  • 无中心化调度器:每个节点通过注册中心协调,无需单独的调度中心。
(三)核心概念
  1. 作业(Job)
    任务的抽象,支持 Simple、Dataflow、Script 三种类型。

  2. 分片(Sharding)
    将任务拆分为多个独立的子任务,每个分片由不同的节点执行。例如:

    elasticjob:jobs:myJob:sharding-total-count: 3  # 总分片数sharding-item-parameters: "0=北京,1=上海,2=广州"  # 分片参数
    
  3. 注册中心(Registry Center)
    ZooKeeper 作为协调服务,存储作业配置、运行状态和分片信息。

  4. 作业实例(Job Instance)
    每个作业节点启动时会向注册中心注册自己,成为一个作业实例。

二、在springboot中使用Elastic-Job-Lite

(一)添加依赖

pom.xml 中添加 Elastic-Job-Lite 和 ZooKeeper 客户端依赖:

<!-- Elastic-Job-Lite Spring Boot Starter -->
<dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.3</version> <!-- 最新稳定版本 -->
</dependency><!-- ZooKeeper 客户端 -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency>
(二) 配置 ZooKeeper 注册中心

你提到的没错!Elastic-Job-Lite 配置 ZooKeeper 确实有三种主要方式,我之前的回答集中在 Java 代码配置 上。现在我补充完整另外两种方式:

1.YAML 配置(Spring Boot 自动配置)

最简洁的方式,通过 application.yml 配置:

elasticjob:reg-center:server-lists: localhost:2181  # ZooKeeper 地址namespace: elastic-job        # 命名空间base-sleep-time-milliseconds: 1000  # 初始重试等待时间max-sleep-time-milliseconds: 3000  # 最大重试等待时间max-retries: 3                # 最大重试次数digest: ""                    # 认证信息(可选)jobs:mySimpleJob:type: SIMPLEclass: com.example.job.MySimpleJob  # 作业类路径cron: "0/10 * * * * ?"              # Cron 表达式sharding-total-count: 3             # 分片总数sharding-item-parameters: "0=A,1=B,2=C"  # 分片参数overwrite: true                     # 覆盖注册中心配置

关键点

  • elasticjob.reg-center 配置 ZooKeeper 连接信息。
  • elasticjob.jobs 下定义具体作业,支持 SIMPLEDATAFLOWSCRIPT 等类型。
2.Java 代码配置(手动构建 Bean)

前面示例中使用的方式,适合需要灵活控制配置的场景:

@Configuration
public class JobConfig {@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter() {ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elastic-job");return new ZookeeperRegistryCenter(zkConfig);}@Bean(initMethod = "init")public SpringJobScheduler simpleJobScheduler(MySimpleJob mySimpleJob, ZookeeperRegistryCenter regCenter) {JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("mySimpleJob", "0/10 * * * * ?", 3).shardingItemParameters("0=A,1=B,2=C").build();SimpleJobConfiguration jobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());return new SpringJobScheduler(mySimpleJob, regCenter, LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build());}
}

关键点

  • 手动创建 ZookeeperRegistryCenterSpringJobScheduler Bean。
  • 通过 JobCoreConfigurationSimpleJobConfiguration 构建作业配置。
(三)创建简单作业类

实现 SimpleJob 接口,定义作业逻辑:

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.stereotype.Component;@Component
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {// 获取分片信息int shardIndex = shardingContext.getShardingItem();String shardParam = shardingContext.getShardingParameter();// 作业逻辑(根据分片参数处理不同数据)System.out.printf("分片项: %d, 参数: %s, 时间: %s%n", shardIndex, shardParam, System.currentTimeMillis());// 示例:根据分片处理不同的数据// if (shardIndex == 0) { processGroupA(); }// else if (shardIndex == 1) { processGroupB(); }}
}
(四)配置作业

使用 @ElasticSimpleJob 注解配置作业:

import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.api.ElasticSimpleJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class JobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Autowiredprivate MySimpleJob mySimpleJob;@Bean(initMethod = "init")public SpringJobScheduler simpleJobScheduler() {// 定义作业核心配置JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("mySimpleJob",      // 作业名称"0/10 * * * * ?",   // Cron 表达式3                   // 分片总数).shardingItemParameters("0=A,1=B,2=C")  // 分片参数.build();// 定义 Simple 作业配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 定义 Lite 作业配置LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)  // 允许覆盖注册中心配置.build();// 创建作业调度器return new SpringJobScheduler(mySimpleJob, regCenter, jobConfig);}
}
(五)配置说明
参数说明
reg-center.server-listsZooKeeper 服务器地址,多个地址用逗号分隔(如 host1:2181,host2:2181
reg-center.namespace命名空间,用于隔离不同项目的作业配置
coreConfig.cronCron 表达式,定义作业执行时间规则
coreConfig.shardingTotalCount分片总数,决定作业拆分为多少个并行执行单元
coreConfig.shardingItemParameters分片参数,格式为 0=A,1=B,2=C,为每个分片指定参数
http://www.dtcms.com/a/264760.html

相关文章:

  • P3842 [TJOI2007] 线段(动态规划)
  • RAC (ReactiveCocoa) 的实现机制与消息传递策略
  • XILINX Kintex 7系列FPGA的架构
  • ubentu服务器版本安装Dify
  • 【leetcode算法300】:哈希板块
  • 多项式带余除法——线性代数题目为例
  • 【.NET Framework 窗体应用程序项目结构介绍】
  • WHAT - React Native 中 Light and Dark mode 深色模式(黑暗模式)机制
  • 如何在Excel中每隔几行取一行
  • 【PMP】项目管理入门:从基础到环境的体系化拆解
  • 分布式定时任务:xxl-job
  • 苍穹外卖day12--Apache POI导出Excel报表
  • [MIA 2025]CLIP in medical imaging: A survey
  • 多云密钥统一管理实战:CKMS对接阿里云/华为云密钥服务
  • .npmrc和.yarnrc配置文件介绍:分别用于 Node.js 中的 npm(Node Package Manager)和 Yarn 包管理工具
  • oracle集合三嵌套表(Nested Table)学习
  • 【第三章:神经网络原理详解与Pytorch入门】01.神经网络算法理论详解与实践-(1)神经网络预备知识(线性代数、微积分、概率等)
  • 微控制器中的EXTI0(External Interrupt 0)中断是什么?
  • uniapp socket 封装 (可拿去直接用)
  • 可编辑33页PPT | 某材料制造企业工业互联网平台解决方案
  • 云原生环境下部署大语言模型服务:以 DeepSeek 为例的实战教程
  • 6种iOS开发中常用的设计模式
  • Qt designer坑-布局内子控件的顺序错乱
  • 量化交易学习之自动化交易策略 [freqtrade 框架学习] ,常见问题避坑指南!!!!
  • <u>#12288;#8203;</u> HTML5全角空格,自动换行,半角用#32;#8203;
  • Spring AI Advisor RAG使用指南
  • Android Auto即将带来变革
  • AI大模型:从编码助手到流程重构者——软件开发新范式解析
  • 【前端】1 小时实现 React 简历项目
  • 多种方法实现golang中实现对http的响应内容生成图片