当前位置: 首页 > news >正文

【Java】xxl-job

https://www.xuxueli.com/xxl-job/
https://www.bilibili.com/video/BV1nW421R7qJ
https://blog.csdn.net/qq_37128049/article/details/93889170

定时任务调度部分感觉是目前看来一半中,最难理解的部分,要反复看

定时任务调度框架Spring Task:Spring3.0 以后自带的 task,配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器。

xxl-job入门案例

下载源码https://github.com/xuxueli/xxl-job

将项目中 /xxl-job/doc/db/ 目录下的 tables_xxl_job.sql 的数据库表导入数据库

源码结构如下:

xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
    :xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
    :xxl-job-executor-sample-frameless:无框架版本;

修改xxl-job-admin项目里面配置文件application.properties
在这里插入图片描述
启动调度中心xxl-job-admin

调度中心访问地址:http://localhost:8080/xxl-job-admin
默认登录账号 “admin/123456”
在这里插入图片描述

部署执行器项目

作用:负责接收“调度中心”的调度并执行;可直接部署执行器,也可以将执行器集成到现有业务项目中。

要做的具体业务(事情) xxl-job-executor-sample-springboot

maven依赖
确认pom文件中引入了 “xxl-job-core” 的maven依赖;
在这里插入图片描述
修改执行器配置文件,把执行器项目在调度中心进行注册
在这里插入图片描述
创建配置类,获取任务调用过程中需要使用相关参数
xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/core/config/XxlJobConfig.java

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.admin.accessToken}")
    private String accessToken;

    @Value("${xxl.job.admin.timeout}")
    private int timeout;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setTimeout(timeout);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

启动执行器项目
在这里插入图片描述
开发执行器项目job方法
在这里插入图片描述

    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            System.out.println("happycoder" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        // default success
    }

创建任务并启动任务

通过图形化界面方式进行操作(后面我们都是通过java代码来操作的,这里使用图形化界面演示),新增任务
在这里插入图片描述
启动任务(这里让任务只执行一次)
在这里插入图片描述
调度成功
在这里插入图片描述
在这里插入图片描述

继承xxl-job

引入依赖

<dependencies>
    <dependency>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-job-core</artifactId>
    </dependency>
</dependencies>

执行器配置
这里我们在项目nacos配置中心里面配置进行修改

xxl:
  job:
    admin:
      # 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册""任务结果回调";为空则关闭自动注册
      addresses: http://localhost:8080/xxl-job-admin
      # 执行器通讯TOKEN [选填]:非空时启用
    accessToken: default_token

    executor:
      # 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
      appname: xxl-job-executor-sample
      # 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
      address:
      # 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册""调度中心请求并触发任务";
      ip:
      # 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
      port: 9999
      # 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
      logpath: /data/applogs/xxl-job/jobhandler
      # 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则,-1, 关闭自动清理功能;
      logretentiondays: 30
    client:
      jobGroupId: 1
      addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
      removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
      startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
      stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
      addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob
seata:
  tx-service-group: daijia_tx_group
  enable-auto-data-source-proxy: false

创建XXL-JOB配置类


import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}

编写任务job方法

@Component
public class DispatchJobHandler {
    
    @XxlJob("firstJobHandler")
    public void testJobHandler() {
        System.out.println("xxl-job项目集成测试");
    }
}

测试

第一步 启动相关服务

* 启动调度中心  (仍然使用官方提供的源码中的调度中心,运行一下即可  xxl-job-admin)
* 启动执行器项目服务  (我们现在继承到项目的模块,启动一下)

第二步 在调度中心创建任务

在这里插入图片描述
先点一下 手动录入,再点一下 自动注册,再保存。达到一个更新的效果
在这里插入图片描述
运行一次

失败了
在这里插入图片描述

感觉是执行器没有注册好的原因,给执行器换个端口试一下

成功了

封装XXL-JOB客户端

因为任务调度是灵活过程,需要封装相关方法,通过方法实现添加并启动任务

xxl-job-admin调度中心中创建 关于任务的方法
在这里插入图片描述

//自定义任务操作的方法
//添加任务
@RequestMapping("/addJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> addJobInfo(@RequestBody XxlJobInfo jobInfo) {
   return xxlJobService.add(jobInfo);
}

//删除任务
@RequestMapping("/removeJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> removeJob(@RequestBody XxlJobInfo jobInfo) {
   return xxlJobService.remove(jobInfo.getId());
}

//修改任务
@RequestMapping("/updateJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> updateJob(@RequestBody XxlJobInfo jobInfo) {
   return xxlJobService.update(jobInfo);
}

//停止任务
@RequestMapping("/stopJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> pauseJob(@RequestBody XxlJobInfo jobInfo) {
   return xxlJobService.stop(jobInfo.getId());
}

//启动任务
@RequestMapping("/startJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {
   return xxlJobService.start(jobInfo.getId());
}

//添加并启动任务
@RequestMapping("/addAndStartJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> addAndStartJob(@RequestBody XxlJobInfo jobInfo) {
   ReturnT<String> result = xxlJobService.add(jobInfo);

   String content = result.getContent();
   int id = Integer.parseInt(content);
   xxlJobService.start(id);

   //立即执行一次
   JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, jobInfo.getExecutorParam(), "");
   return result;
}

service-dispatch执行器配置文件中 添加任务方法

xxl:
  job:
    admin:
      # 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册""任务结果回调";为空则关闭自动注册
      addresses: http://localhost:8080/xxl-job-admin
      # 执行器通讯TOKEN [选填]:非空时启用
    accessToken: default_token

    executor:
      # 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
      appname: xxl-job-executor-sample
      # 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
      address:
      # 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册""调度中心请求并触发任务";
      ip:
      # 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
      port: 9999
      # 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
      logpath: /data/applogs/xxl-job/jobhandler
      # 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则,-1, 关闭自动清理功能;
      logretentiondays: 30
    client:
      jobGroupId: 1
      addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
      removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
      startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
      stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
      addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob
seata:
  tx-service-group: daijia_tx_group
  enable-auto-data-source-proxy: false

关键部分

 client:
      jobGroupId: 1
      addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
      removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
      startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
      stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
      addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob

service-dispatch添加相关配置类。创建配置类,读取配置文件里面调用的调度中心任务操作的方法

@Data
@Component
@ConfigurationProperties(prefix = "xxl.job.client")
public class XxlJobClientConfig {

    private Integer jobGroupId;
    private String addUrl;
    private String removeUrl;
    private String startJobUrl;
    private String stopJobUrl;
    private String addAndStartUrl;
}

service-dispatch创建客户端类,编写调用调度中心里面的方法

import com.alibaba.fastjson.JSONObject;
import com.atguigu.daijia.common.execption.GuiguException;
import com.atguigu.daijia.common.result.ResultCodeEnum;
import com.atguigu.daijia.dispatch.xxl.config.XxlJobClientConfig;
import com.atguigu.daijia.model.entity.dispatch.XxlJobInfo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

@Slf4j
@Component
public class XxlJobClient {

    @Autowired
    private XxlJobClientConfig xxlJobClientConfig;

    //客户端调用服务端里面的方法
    @Autowired
    private RestTemplate restTemplate;

    @SneakyThrows
    public Long addJob(String executorHandler, String param, String corn, String desc){
        XxlJobInfo xxlJobInfo = new XxlJobInfo();
        xxlJobInfo.setJobGroup(xxlJobClientConfig.getJobGroupId());
        xxlJobInfo.setJobDesc(desc);
        xxlJobInfo.setAuthor("qy");
        xxlJobInfo.setScheduleType("CRON");
        xxlJobInfo.setScheduleConf(corn);
        xxlJobInfo.setGlueType("BEAN");
        xxlJobInfo.setExecutorHandler(executorHandler);
        xxlJobInfo.setExecutorParam(param);
        xxlJobInfo.setExecutorRouteStrategy("FIRST");
        xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
        xxlJobInfo.setMisfireStrategy("FIRE_ONCE_NOW");
        xxlJobInfo.setExecutorTimeout(0);
        xxlJobInfo.setExecutorFailRetryCount(0);

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);

        String url = xxlJobClientConfig.getAddUrl();
        ResponseEntity<JSONObject> response =
                restTemplate.postForEntity(url, request, JSONObject.class);

        if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
            log.info("增加xxl执行任务成功,返回信息:{}", response.getBody().toJSONString());
            //content为任务id
            return response.getBody().getLong("content");
        }
        log.info("调用xxl增加执行任务失败:{}", response.getBody().toJSONString());
        throw new GuiguException(ResultCodeEnum.DATA_ERROR);
    }

    public Boolean startJob(Long jobId) {
        XxlJobInfo xxlJobInfo = new XxlJobInfo();
        xxlJobInfo.setId(jobId.intValue());

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);

        String url = xxlJobClientConfig.getStartJobUrl();
        ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
        if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
            log.info("启动xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
            return true;
        }
        log.info("启动xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
        throw new GuiguException(ResultCodeEnum.DATA_ERROR);
    }

    public Boolean stopJob(Long jobId) {
        XxlJobInfo xxlJobInfo = new XxlJobInfo();
        xxlJobInfo.setId(jobId.intValue());

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);

        String url = xxlJobClientConfig.getStopJobUrl();
        ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
        if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
            log.info("停止xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
            return true;
        }
        log.info("停止xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
        throw new GuiguException(ResultCodeEnum.DATA_ERROR);
    }

    public Boolean removeJob(Long jobId) {
        XxlJobInfo xxlJobInfo = new XxlJobInfo();
        xxlJobInfo.setId(jobId.intValue());

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);

        String url = xxlJobClientConfig.getRemoveUrl();
        ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
        if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
            log.info("删除xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
            return true;
        }
        log.info("删除xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
        throw new GuiguException(ResultCodeEnum.DATA_ERROR);
    }

    //添加并启动任务
    public Long addAndStart(String executorHandler, String param, String corn, String desc) {
        XxlJobInfo xxlJobInfo = new XxlJobInfo();
        xxlJobInfo.setJobGroup(xxlJobClientConfig.getJobGroupId());
        xxlJobInfo.setJobDesc(desc);
        xxlJobInfo.setAuthor("qy");
        xxlJobInfo.setScheduleType("CRON");
        xxlJobInfo.setScheduleConf(corn);
        xxlJobInfo.setGlueType("BEAN");
        xxlJobInfo.setExecutorHandler(executorHandler);
        xxlJobInfo.setExecutorParam(param);
        xxlJobInfo.setExecutorRouteStrategy("FIRST");
        xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
        xxlJobInfo.setMisfireStrategy("FIRE_ONCE_NOW");
        xxlJobInfo.setExecutorTimeout(0);
        xxlJobInfo.setExecutorFailRetryCount(0);

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);

        //获取调度中心请求路径
        String url = xxlJobClientConfig.getAddAndStartUrl();

        //restTemplate
        ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
        if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
            log.info("增加并开始执行xxl任务成功,返回信息:{}", response.getBody().toJSONString());
            //content为任务id
            return response.getBody().getLong("content");
        }
        log.info("增加并开始执行xxl任务失败:{}", response.getBody().toJSONString());
        throw new GuiguException(ResultCodeEnum.DATA_ERROR);
    }
}

启动类配置RestTemplate

@Bean
public RestTemplate restTemplate() {
    return new RestTemplate();
}

创建并启动任务接口

在service-dispatch操作

controller
@Tag(name = "司机新订单接口管理")
@RestController
@RequestMapping("/dispatch/newOrder")
@SuppressWarnings({"unchecked", "rawtypes"})
public class NewOrderController {

    @Autowired
    private NewOrderService newOrderService;

    //创建并启动任务调度方法
    @Operation(summary = "添加并开始新订单任务调度")
    @PostMapping("/addAndStartTask")
    public Result<Long> addAndStartTask(@RequestBody NewOrderTaskVo newOrderTaskVo) {
        Long id = newOrderService.addAndStartTask(newOrderTaskVo);
        return Result.ok(id);
    }

}
service
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class NewOrderServiceImpl implements NewOrderService {

    @Autowired
    private OrderJobMapper orderJobMapper;

    @Autowired
    private XxlJobClient xxlJobClient;

    //创建并启动任务调度方法
    @Override
    public Long addAndStartTask(NewOrderTaskVo newOrderTaskVo) {
        //1 判断当前订单是否启动任务调度
        //根据订单id查询
        LambdaQueryWrapper<OrderJob> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(OrderJob::getOrderId,newOrderTaskVo.getOrderId());
        OrderJob orderJob = orderJobMapper.selectOne(wrapper);

        //2 没有启动,进行操作
        if(orderJob == null) {
            //创建并启动任务调度
            //String executorHandler 执行任务job方法
            // String param
            // String corn 执行cron表达式
            // String desc 描述信息
            Long jobId = xxlJobClient.addAndStart("newOrderTaskHandler", "",
                    "0 0/1 * * * ?",
                    "新创建订单任务调度:" + newOrderTaskVo.getOrderId());

            //记录任务调度信息
            orderJob = new OrderJob();
            orderJob.setOrderId(newOrderTaskVo.getOrderId());
            orderJob.setJobId(jobId);
            orderJob.setParameter(JSONObject.toJSONString(newOrderTaskVo));
            orderJobMapper.insert(orderJob);
        }
        return orderJob.getJobId();
    }
}
远程调用定义
@FeignClient(value = "service-dispatch")
public interface NewOrderFeignClient {

    /**
     * 添加新订单任务
     * @param newOrderDispatchVo
     * @return
     */
    @PostMapping("/dispatch/newOrder/addAndStartTask")
    Result<Long> addAndStartTask(@RequestBody NewOrderTaskVo newOrderDispatchVo);
}
JobHandler
@Component
public class JobHandler {

    @Autowired
    private XxlJobLogMapper xxlJobLogMapper;
    
    @Autowired
    private NewOrderService newOrderService;
    
    @XxlJob("newOrderTaskHandler")
    public void newOrderTaskHandler() {

        //记录任务调度日志
        XxlJobLog xxlJobLog = new XxlJobLog();
        xxlJobLog.setJobId(XxlJobHelper.getJobId());
        long startTime = System.currentTimeMillis();

        try {
            //执行任务:搜索附近代驾司机
            newOrderService.executeTask(XxlJobHelper.getJobId());

            //成功状态
            xxlJobLog.setStatus(1);
        } catch (Exception e) {
            //失败状态
            xxlJobLog.setStatus(0);
            xxlJobLog.setError(e.getMessage());
            e.printStackTrace();
        } finally {
            long times = System.currentTimeMillis()- startTime;
            //TODO 完善long
            xxlJobLog.setTimes((int)times);
            xxlJobLogMapper.insert(xxlJobLog);
        }
    }
}
service
//执行任务:搜索附近代驾司机
@Override
public void executeTask(long jobId) {
    //1 根据jobid查询数据库,当前任务是否已经创建
    //如果没有创建,不往下执行了
    LambdaQueryWrapper<OrderJob> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(OrderJob::getJobId,jobId);
    OrderJob orderJob = orderJobMapper.selectOne(wrapper);
    if(orderJob == null) {
        //不往下执行了
        return;
    }

    //2 查询订单状态,如果当前订单接单状态,继续执行。如果当前订单不是接单状态,停止任务调度
    //获取OrderJob里面对象
    String jsonString = orderJob.getParameter();
    NewOrderTaskVo newOrderTaskVo = JSONObject.parseObject(jsonString, NewOrderTaskVo.class);

    //获取orderId
    Long orderId = newOrderTaskVo.getOrderId();
    Integer status = orderInfoFeignClient.getOrderStatus(orderId).getData();
    if(status.intValue() != OrderStatus.WAITING_ACCEPT.getStatus().intValue()) {
        //停止任务调度
        xxlJobClient.stopJob(jobId);
        return;
    }

    //3 远程调用:搜索附近满足条件可以接单司机
    //4 远程调用之后,获取满足可以接单司机集合
    SearchNearByDriverForm searchNearByDriverForm = new SearchNearByDriverForm();
    searchNearByDriverForm.setLongitude(newOrderTaskVo.getStartPointLongitude());
    searchNearByDriverForm.setLatitude(newOrderTaskVo.getStartPointLatitude());
    searchNearByDriverForm.setMileageDistance(newOrderTaskVo.getExpectDistance());
    //远程调用
    List<NearByDriverVo> nearByDriverVoList =
            locationFeignClient.searchNearByDriver(searchNearByDriverForm).getData();

    //5 遍历司机集合,得到每个司机,为每个司机创建临时队列,存储新订单信息
    nearByDriverVoList.forEach(driver -> {
        //使用Redis的set类型
        //根据订单id生成key
        String repeatKey =
                RedisConstant.DRIVER_ORDER_REPEAT_LIST+newOrderTaskVo.getOrderId();
        //记录司机id,防止重复推送
        Boolean isMember = redisTemplate.opsForSet().isMember(repeatKey, driver.getDriverId());
        if(!isMember) {
            //把订单信息推送给满足条件多个司机
            redisTemplate.opsForSet().add(repeatKey,driver.getDriverId());
            //过期时间:15分钟,超过15分钟没有接单自动取消
            redisTemplate.expire(repeatKey,
                    RedisConstant.DRIVER_ORDER_REPEAT_LIST_EXPIRES_TIME,
                    TimeUnit.MINUTES);

            NewOrderDataVo newOrderDataVo = new NewOrderDataVo();
            newOrderDataVo.setOrderId(newOrderTaskVo.getOrderId());
            newOrderDataVo.setStartLocation(newOrderTaskVo.getStartLocation());
            newOrderDataVo.setEndLocation(newOrderTaskVo.getEndLocation());
            newOrderDataVo.setExpectAmount(newOrderTaskVo.getExpectAmount());
            newOrderDataVo.setExpectDistance(newOrderTaskVo.getExpectDistance());
            newOrderDataVo.setExpectTime(newOrderTaskVo.getExpectTime());
            newOrderDataVo.setFavourFee(newOrderTaskVo.getFavourFee());
            newOrderDataVo.setDistance(driver.getDistance());
            newOrderDataVo.setCreateTime(newOrderTaskVo.getCreateTime());
            //新订单保存司机的临时队列,Redis里面List集合
            String key = RedisConstant.DRIVER_ORDER_TEMP_LIST+driver.getDriverId();
            redisTemplate.opsForList().leftPush(key,JSONObject.toJSONString(newOrderDataVo));
            //过期时间:1分钟
            redisTemplate.expire(key,RedisConstant.DRIVER_ORDER_TEMP_LIST_EXPIRES_TIME, TimeUnit.MINUTES);
        }
    });
}

乘客下单添加任务调度

在这里插入图片描述

// //乘客下单
@Override
public Long submitOrder(SubmitOrderForm submitOrderForm) {
    //1 重新计算驾驶线路
    CalculateDrivingLineForm calculateDrivingLineForm = new CalculateDrivingLineForm();
    BeanUtils.copyProperties(submitOrderForm,submitOrderForm);
    Result<DrivingLineVo> drivingLineVoResult = mapFeignClient.calculateDrivingLine(calculateDrivingLineForm);
    DrivingLineVo drivingLineVo = drivingLineVoResult.getData();

    //2 重新订单费用
    FeeRuleRequestForm calculateOrderFeeForm = new FeeRuleRequestForm();
    calculateOrderFeeForm.setDistance(drivingLineVo.getDistance());
    calculateOrderFeeForm.setStartTime(new Date());
    calculateOrderFeeForm.setWaitMinute(0);
    Result<FeeRuleResponseVo> feeRuleResponseVoResult = feeRuleFeignClient.calculateOrderFee(calculateOrderFeeForm);
    FeeRuleResponseVo feeRuleResponseVo = feeRuleResponseVoResult.getData();

    //封装数据
    OrderInfoForm orderInfoForm = new OrderInfoForm();
    BeanUtils.copyProperties(submitOrderForm,orderInfoForm);
    orderInfoForm.setExpectDistance(drivingLineVo.getDistance());
    orderInfoForm.setExpectAmount(feeRuleResponseVo.getTotalAmount());
    Result<Long> orderInfoResult = orderInfoFeignClient.saveOrderInfo(orderInfoForm);
    Long orderId = orderInfoResult.getData();

    //任务调度:查询附近可以接单司机
    NewOrderTaskVo newOrderDispatchVo = new NewOrderTaskVo();
    newOrderDispatchVo.setOrderId(orderId);
    newOrderDispatchVo.setStartLocation(orderInfoForm.getStartLocation());
    newOrderDispatchVo.setStartPointLongitude(orderInfoForm.getStartPointLongitude());
    newOrderDispatchVo.setStartPointLatitude(orderInfoForm.getStartPointLatitude());
    newOrderDispatchVo.setEndLocation(orderInfoForm.getEndLocation());
    newOrderDispatchVo.setEndPointLongitude(orderInfoForm.getEndPointLongitude());
    newOrderDispatchVo.setEndPointLatitude(orderInfoForm.getEndPointLatitude());
    newOrderDispatchVo.setExpectAmount(orderInfoForm.getExpectAmount());
    newOrderDispatchVo.setExpectDistance(orderInfoForm.getExpectDistance());
    newOrderDispatchVo.setExpectTime(drivingLineVo.getDuration());
    newOrderDispatchVo.setFavourFee(orderInfoForm.getFavourFee());
    newOrderDispatchVo.setCreateTime(new Date());
    //远程调用
    Long jobId = newOrderFeignClient.addAndStartTask(newOrderDispatchVo).getData();
    //返回订单id
    return orderId;
}

相关文章:

  • print(f“Random number below 100: {random_number}“)的其他写法
  • 【Linux】:网络协议
  • c++--变量内存分配
  • C语言进阶习题【3】5 枚举——找单身狗2
  • Pytest快速入门
  • 【MySQL】第五弹---数据类型全解析:从基础到高级应用
  • Linux 上安装 PostgreSQL
  • AI时代:架构师的困境与救赎
  • 计时器任务实现(保存视频和图像)
  • 牛客小白月赛110
  • GGUF格式的DeepSeek-R1-Distill-Qwen-1.5B模型的字段解析
  • 机器学习·最近邻方法(k-NN)
  • 第七天:数据提取-正则表达式
  • 已知自动驾驶的一个场景,如变道,如何做好预期功能安全
  • 空天技术赋能:毫米波基站+高速数字微波构筑应急通信新范式
  • 函数调用过程的详细解析
  • halcon激光三角测量(十七)calibrate_sheet_of_light_3d_calib_object
  • 容器、pod和缓存
  • 快速入门 Tailwind CSS:现代前端开发的利器
  • 【deepseek api 第三方平台使用参考】
  • 广东缉捕1名象牙走私潜逃非洲“红通”逃犯
  • 海外市场,押注中国无人驾驶龙头
  • 法律顾问被控配合他人诈骗酒店资产一审判8年,二审辩称无罪
  • 张巍任中共河南省委副书记
  • 马上评|文玩字画竞拍轻松赚差价?严防这类新型传销
  • 外企聊营商|武田制药:知识产权保护助创新药研发