【Java】xxl-job
https://www.xuxueli.com/xxl-job/
https://www.bilibili.com/video/BV1nW421R7qJ
https://blog.csdn.net/qq_37128049/article/details/93889170
定时任务调度部分感觉是目前看来一半中,最难理解的部分,要反复看
定时任务调度框架Spring Task:Spring3.0 以后自带的 task,配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器。
xxl-job入门案例
下载源码https://github.com/xuxueli/xxl-job
将项目中 /xxl-job/doc/db/ 目录下的 tables_xxl_job.sql 的数据库表导入数据库
源码结构如下:
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
修改xxl-job-admin项目里面配置文件application.properties
启动调度中心xxl-job-admin
调度中心访问地址:http://localhost:8080/xxl-job-admin
默认登录账号 “admin/123456”
部署执行器项目
作用:负责接收“调度中心”的调度并执行;可直接部署执行器,也可以将执行器集成到现有业务项目中。
要做的具体业务(事情) xxl-job-executor-sample-springboot
maven依赖
确认pom文件中引入了 “xxl-job-core” 的maven依赖;
修改执行器配置文件,把执行器项目在调度中心进行注册
创建配置类,获取任务调用过程中需要使用相关参数
xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/core/config/XxlJobConfig.java
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.admin.accessToken}")
private String accessToken;
@Value("${xxl.job.admin.timeout}")
private int timeout;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setTimeout(timeout);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
启动执行器项目
开发执行器项目job方法
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
System.out.println("happycoder" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
创建任务并启动任务
通过图形化界面方式进行操作(后面我们都是通过java代码来操作的,这里使用图形化界面演示),新增任务
启动任务(这里让任务只执行一次)
调度成功
继承xxl-job
引入依赖
<dependencies>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
</dependencies>
执行器配置
这里我们在项目nacos配置中心里面配置进行修改
xxl:
job:
admin:
# 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册
addresses: http://localhost:8080/xxl-job-admin
# 执行器通讯TOKEN [选填]:非空时启用
accessToken: default_token
executor:
# 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-sample
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip:
# 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
client:
jobGroupId: 1
addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob
seata:
tx-service-group: daijia_tx_group
enable-auto-data-source-proxy: false
创建XXL-JOB配置类
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
编写任务job方法
@Component
public class DispatchJobHandler {
@XxlJob("firstJobHandler")
public void testJobHandler() {
System.out.println("xxl-job项目集成测试");
}
}
测试
第一步 启动相关服务
* 启动调度中心 (仍然使用官方提供的源码中的调度中心,运行一下即可 xxl-job-admin)
* 启动执行器项目服务 (我们现在继承到项目的模块,启动一下)
第二步 在调度中心创建任务
先点一下 手动录入,再点一下 自动注册,再保存。达到一个更新的效果
运行一次
失败了
感觉是执行器没有注册好的原因,给执行器换个端口试一下
成功了
封装XXL-JOB客户端
因为任务调度是灵活过程,需要封装相关方法,通过方法实现添加并启动任务
xxl-job-admin调度中心中创建 关于任务的方法
//自定义任务操作的方法
//添加任务
@RequestMapping("/addJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> addJobInfo(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.add(jobInfo);
}
//删除任务
@RequestMapping("/removeJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> removeJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.remove(jobInfo.getId());
}
//修改任务
@RequestMapping("/updateJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> updateJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.update(jobInfo);
}
//停止任务
@RequestMapping("/stopJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> pauseJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.stop(jobInfo.getId());
}
//启动任务
@RequestMapping("/startJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.start(jobInfo.getId());
}
//添加并启动任务
@RequestMapping("/addAndStartJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> addAndStartJob(@RequestBody XxlJobInfo jobInfo) {
ReturnT<String> result = xxlJobService.add(jobInfo);
String content = result.getContent();
int id = Integer.parseInt(content);
xxlJobService.start(id);
//立即执行一次
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, jobInfo.getExecutorParam(), "");
return result;
}
service-dispatch执行器配置文件中 添加任务方法
xxl:
job:
admin:
# 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册
addresses: http://localhost:8080/xxl-job-admin
# 执行器通讯TOKEN [选填]:非空时启用
accessToken: default_token
executor:
# 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-sample
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip:
# 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
client:
jobGroupId: 1
addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob
seata:
tx-service-group: daijia_tx_group
enable-auto-data-source-proxy: false
关键部分
client:
jobGroupId: 1
addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob
service-dispatch添加相关配置类。创建配置类,读取配置文件里面调用的调度中心任务操作的方法
@Data
@Component
@ConfigurationProperties(prefix = "xxl.job.client")
public class XxlJobClientConfig {
private Integer jobGroupId;
private String addUrl;
private String removeUrl;
private String startJobUrl;
private String stopJobUrl;
private String addAndStartUrl;
}
service-dispatch创建客户端类,编写调用调度中心里面的方法
import com.alibaba.fastjson.JSONObject;
import com.atguigu.daijia.common.execption.GuiguException;
import com.atguigu.daijia.common.result.ResultCodeEnum;
import com.atguigu.daijia.dispatch.xxl.config.XxlJobClientConfig;
import com.atguigu.daijia.model.entity.dispatch.XxlJobInfo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Slf4j
@Component
public class XxlJobClient {
@Autowired
private XxlJobClientConfig xxlJobClientConfig;
//客户端调用服务端里面的方法
@Autowired
private RestTemplate restTemplate;
@SneakyThrows
public Long addJob(String executorHandler, String param, String corn, String desc){
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setJobGroup(xxlJobClientConfig.getJobGroupId());
xxlJobInfo.setJobDesc(desc);
xxlJobInfo.setAuthor("qy");
xxlJobInfo.setScheduleType("CRON");
xxlJobInfo.setScheduleConf(corn);
xxlJobInfo.setGlueType("BEAN");
xxlJobInfo.setExecutorHandler(executorHandler);
xxlJobInfo.setExecutorParam(param);
xxlJobInfo.setExecutorRouteStrategy("FIRST");
xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
xxlJobInfo.setMisfireStrategy("FIRE_ONCE_NOW");
xxlJobInfo.setExecutorTimeout(0);
xxlJobInfo.setExecutorFailRetryCount(0);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
String url = xxlJobClientConfig.getAddUrl();
ResponseEntity<JSONObject> response =
restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("增加xxl执行任务成功,返回信息:{}", response.getBody().toJSONString());
//content为任务id
return response.getBody().getLong("content");
}
log.info("调用xxl增加执行任务失败:{}", response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.DATA_ERROR);
}
public Boolean startJob(Long jobId) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(jobId.intValue());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
String url = xxlJobClientConfig.getStartJobUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("启动xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
return true;
}
log.info("启动xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.DATA_ERROR);
}
public Boolean stopJob(Long jobId) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(jobId.intValue());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
String url = xxlJobClientConfig.getStopJobUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("停止xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
return true;
}
log.info("停止xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.DATA_ERROR);
}
public Boolean removeJob(Long jobId) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(jobId.intValue());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
String url = xxlJobClientConfig.getRemoveUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("删除xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
return true;
}
log.info("删除xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.DATA_ERROR);
}
//添加并启动任务
public Long addAndStart(String executorHandler, String param, String corn, String desc) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setJobGroup(xxlJobClientConfig.getJobGroupId());
xxlJobInfo.setJobDesc(desc);
xxlJobInfo.setAuthor("qy");
xxlJobInfo.setScheduleType("CRON");
xxlJobInfo.setScheduleConf(corn);
xxlJobInfo.setGlueType("BEAN");
xxlJobInfo.setExecutorHandler(executorHandler);
xxlJobInfo.setExecutorParam(param);
xxlJobInfo.setExecutorRouteStrategy("FIRST");
xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
xxlJobInfo.setMisfireStrategy("FIRE_ONCE_NOW");
xxlJobInfo.setExecutorTimeout(0);
xxlJobInfo.setExecutorFailRetryCount(0);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
//获取调度中心请求路径
String url = xxlJobClientConfig.getAddAndStartUrl();
//restTemplate
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("增加并开始执行xxl任务成功,返回信息:{}", response.getBody().toJSONString());
//content为任务id
return response.getBody().getLong("content");
}
log.info("增加并开始执行xxl任务失败:{}", response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.DATA_ERROR);
}
}
启动类配置RestTemplate
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
创建并启动任务接口
在service-dispatch操作
controller
@Tag(name = "司机新订单接口管理")
@RestController
@RequestMapping("/dispatch/newOrder")
@SuppressWarnings({"unchecked", "rawtypes"})
public class NewOrderController {
@Autowired
private NewOrderService newOrderService;
//创建并启动任务调度方法
@Operation(summary = "添加并开始新订单任务调度")
@PostMapping("/addAndStartTask")
public Result<Long> addAndStartTask(@RequestBody NewOrderTaskVo newOrderTaskVo) {
Long id = newOrderService.addAndStartTask(newOrderTaskVo);
return Result.ok(id);
}
}
service
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class NewOrderServiceImpl implements NewOrderService {
@Autowired
private OrderJobMapper orderJobMapper;
@Autowired
private XxlJobClient xxlJobClient;
//创建并启动任务调度方法
@Override
public Long addAndStartTask(NewOrderTaskVo newOrderTaskVo) {
//1 判断当前订单是否启动任务调度
//根据订单id查询
LambdaQueryWrapper<OrderJob> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(OrderJob::getOrderId,newOrderTaskVo.getOrderId());
OrderJob orderJob = orderJobMapper.selectOne(wrapper);
//2 没有启动,进行操作
if(orderJob == null) {
//创建并启动任务调度
//String executorHandler 执行任务job方法
// String param
// String corn 执行cron表达式
// String desc 描述信息
Long jobId = xxlJobClient.addAndStart("newOrderTaskHandler", "",
"0 0/1 * * * ?",
"新创建订单任务调度:" + newOrderTaskVo.getOrderId());
//记录任务调度信息
orderJob = new OrderJob();
orderJob.setOrderId(newOrderTaskVo.getOrderId());
orderJob.setJobId(jobId);
orderJob.setParameter(JSONObject.toJSONString(newOrderTaskVo));
orderJobMapper.insert(orderJob);
}
return orderJob.getJobId();
}
}
远程调用定义
@FeignClient(value = "service-dispatch")
public interface NewOrderFeignClient {
/**
* 添加新订单任务
* @param newOrderDispatchVo
* @return
*/
@PostMapping("/dispatch/newOrder/addAndStartTask")
Result<Long> addAndStartTask(@RequestBody NewOrderTaskVo newOrderDispatchVo);
}
JobHandler
@Component
public class JobHandler {
@Autowired
private XxlJobLogMapper xxlJobLogMapper;
@Autowired
private NewOrderService newOrderService;
@XxlJob("newOrderTaskHandler")
public void newOrderTaskHandler() {
//记录任务调度日志
XxlJobLog xxlJobLog = new XxlJobLog();
xxlJobLog.setJobId(XxlJobHelper.getJobId());
long startTime = System.currentTimeMillis();
try {
//执行任务:搜索附近代驾司机
newOrderService.executeTask(XxlJobHelper.getJobId());
//成功状态
xxlJobLog.setStatus(1);
} catch (Exception e) {
//失败状态
xxlJobLog.setStatus(0);
xxlJobLog.setError(e.getMessage());
e.printStackTrace();
} finally {
long times = System.currentTimeMillis()- startTime;
//TODO 完善long
xxlJobLog.setTimes((int)times);
xxlJobLogMapper.insert(xxlJobLog);
}
}
}
service
//执行任务:搜索附近代驾司机
@Override
public void executeTask(long jobId) {
//1 根据jobid查询数据库,当前任务是否已经创建
//如果没有创建,不往下执行了
LambdaQueryWrapper<OrderJob> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(OrderJob::getJobId,jobId);
OrderJob orderJob = orderJobMapper.selectOne(wrapper);
if(orderJob == null) {
//不往下执行了
return;
}
//2 查询订单状态,如果当前订单接单状态,继续执行。如果当前订单不是接单状态,停止任务调度
//获取OrderJob里面对象
String jsonString = orderJob.getParameter();
NewOrderTaskVo newOrderTaskVo = JSONObject.parseObject(jsonString, NewOrderTaskVo.class);
//获取orderId
Long orderId = newOrderTaskVo.getOrderId();
Integer status = orderInfoFeignClient.getOrderStatus(orderId).getData();
if(status.intValue() != OrderStatus.WAITING_ACCEPT.getStatus().intValue()) {
//停止任务调度
xxlJobClient.stopJob(jobId);
return;
}
//3 远程调用:搜索附近满足条件可以接单司机
//4 远程调用之后,获取满足可以接单司机集合
SearchNearByDriverForm searchNearByDriverForm = new SearchNearByDriverForm();
searchNearByDriverForm.setLongitude(newOrderTaskVo.getStartPointLongitude());
searchNearByDriverForm.setLatitude(newOrderTaskVo.getStartPointLatitude());
searchNearByDriverForm.setMileageDistance(newOrderTaskVo.getExpectDistance());
//远程调用
List<NearByDriverVo> nearByDriverVoList =
locationFeignClient.searchNearByDriver(searchNearByDriverForm).getData();
//5 遍历司机集合,得到每个司机,为每个司机创建临时队列,存储新订单信息
nearByDriverVoList.forEach(driver -> {
//使用Redis的set类型
//根据订单id生成key
String repeatKey =
RedisConstant.DRIVER_ORDER_REPEAT_LIST+newOrderTaskVo.getOrderId();
//记录司机id,防止重复推送
Boolean isMember = redisTemplate.opsForSet().isMember(repeatKey, driver.getDriverId());
if(!isMember) {
//把订单信息推送给满足条件多个司机
redisTemplate.opsForSet().add(repeatKey,driver.getDriverId());
//过期时间:15分钟,超过15分钟没有接单自动取消
redisTemplate.expire(repeatKey,
RedisConstant.DRIVER_ORDER_REPEAT_LIST_EXPIRES_TIME,
TimeUnit.MINUTES);
NewOrderDataVo newOrderDataVo = new NewOrderDataVo();
newOrderDataVo.setOrderId(newOrderTaskVo.getOrderId());
newOrderDataVo.setStartLocation(newOrderTaskVo.getStartLocation());
newOrderDataVo.setEndLocation(newOrderTaskVo.getEndLocation());
newOrderDataVo.setExpectAmount(newOrderTaskVo.getExpectAmount());
newOrderDataVo.setExpectDistance(newOrderTaskVo.getExpectDistance());
newOrderDataVo.setExpectTime(newOrderTaskVo.getExpectTime());
newOrderDataVo.setFavourFee(newOrderTaskVo.getFavourFee());
newOrderDataVo.setDistance(driver.getDistance());
newOrderDataVo.setCreateTime(newOrderTaskVo.getCreateTime());
//新订单保存司机的临时队列,Redis里面List集合
String key = RedisConstant.DRIVER_ORDER_TEMP_LIST+driver.getDriverId();
redisTemplate.opsForList().leftPush(key,JSONObject.toJSONString(newOrderDataVo));
//过期时间:1分钟
redisTemplate.expire(key,RedisConstant.DRIVER_ORDER_TEMP_LIST_EXPIRES_TIME, TimeUnit.MINUTES);
}
});
}
乘客下单添加任务调度
// //乘客下单
@Override
public Long submitOrder(SubmitOrderForm submitOrderForm) {
//1 重新计算驾驶线路
CalculateDrivingLineForm calculateDrivingLineForm = new CalculateDrivingLineForm();
BeanUtils.copyProperties(submitOrderForm,submitOrderForm);
Result<DrivingLineVo> drivingLineVoResult = mapFeignClient.calculateDrivingLine(calculateDrivingLineForm);
DrivingLineVo drivingLineVo = drivingLineVoResult.getData();
//2 重新订单费用
FeeRuleRequestForm calculateOrderFeeForm = new FeeRuleRequestForm();
calculateOrderFeeForm.setDistance(drivingLineVo.getDistance());
calculateOrderFeeForm.setStartTime(new Date());
calculateOrderFeeForm.setWaitMinute(0);
Result<FeeRuleResponseVo> feeRuleResponseVoResult = feeRuleFeignClient.calculateOrderFee(calculateOrderFeeForm);
FeeRuleResponseVo feeRuleResponseVo = feeRuleResponseVoResult.getData();
//封装数据
OrderInfoForm orderInfoForm = new OrderInfoForm();
BeanUtils.copyProperties(submitOrderForm,orderInfoForm);
orderInfoForm.setExpectDistance(drivingLineVo.getDistance());
orderInfoForm.setExpectAmount(feeRuleResponseVo.getTotalAmount());
Result<Long> orderInfoResult = orderInfoFeignClient.saveOrderInfo(orderInfoForm);
Long orderId = orderInfoResult.getData();
//任务调度:查询附近可以接单司机
NewOrderTaskVo newOrderDispatchVo = new NewOrderTaskVo();
newOrderDispatchVo.setOrderId(orderId);
newOrderDispatchVo.setStartLocation(orderInfoForm.getStartLocation());
newOrderDispatchVo.setStartPointLongitude(orderInfoForm.getStartPointLongitude());
newOrderDispatchVo.setStartPointLatitude(orderInfoForm.getStartPointLatitude());
newOrderDispatchVo.setEndLocation(orderInfoForm.getEndLocation());
newOrderDispatchVo.setEndPointLongitude(orderInfoForm.getEndPointLongitude());
newOrderDispatchVo.setEndPointLatitude(orderInfoForm.getEndPointLatitude());
newOrderDispatchVo.setExpectAmount(orderInfoForm.getExpectAmount());
newOrderDispatchVo.setExpectDistance(orderInfoForm.getExpectDistance());
newOrderDispatchVo.setExpectTime(drivingLineVo.getDuration());
newOrderDispatchVo.setFavourFee(orderInfoForm.getFavourFee());
newOrderDispatchVo.setCreateTime(new Date());
//远程调用
Long jobId = newOrderFeignClient.addAndStartTask(newOrderDispatchVo).getData();
//返回订单id
return orderId;
}