车辆轨迹数据实时同步方案:从 “定时轮询” 到 “消息驱动” 的升级实践
引言
- 文章字数:约 12800 字;
- 大致阅读时间:35 分钟。
在车辆监控、物流追踪等场景中,轨迹数据的实时性直接决定了业务价值 —— 比如网约车的实时定位、货运车辆的路径监控,若依赖定时任务轮询第三方接口,轻则出现 “车辆位置滞后”,重则导致调度决策失误。本文将系统分析定时任务的痛点,论证消息队列(MQ)在该场景下的适用性,并提供从 “方案设计→技术选型→代码落地→运维保障” 的全流程可执行方案,帮你彻底解决轨迹数据实时同步问题。
一、先明确核心矛盾:定时任务为何不适合实时轨迹同步?
在讨论替代方案前,我们必须先理清 “定时任务轮询” 的本质缺陷 —— 它的设计逻辑与 “轨迹数据实时性” 需求存在根本冲突,具体可归纳为 3 个核心痛点:
1.1 延迟不可避免:“轮询间隔” 是天然的瓶颈
定时任务的核心逻辑是 “每隔 N 秒调用一次第三方接口”,无论 N 设置为 1 秒、5 秒还是 10 秒,都会存在 “数据产生→接口被调用” 的时间差。例如:
- 若 N=5 秒:车辆在第 2 秒产生新轨迹,需等到第 5 秒才会被同步,至少 3 秒延迟;
- 若 N=1 秒:看似延迟降低,但会导致接口调用频率暴增(1 台车辆 1 秒 1 次,1000 台车辆就是 1000 次 / 秒),极易触发第三方接口的限流阈值,反而导致同步失败。
这种 “延迟” 并非技术优化能解决,而是 “轮询模式” 的固有属性 —— 它是一种 “被动等待 + 批量拉取” 的模式,无法实时响应数据变化。
1.2 资源浪费严重:空轮询与重复拉取
第三方接口返回的轨迹数据,并非每次调用都有更新(比如车辆临时停车时,10 次轮询可能只有 1 次有新数据)。此时:
- 空轮询:9 次无数据的调用,浪费了本地服务器的线程资源、网络带宽,也占用了第三方接口的调用配额;
- 重复拉取:若第三方接口未做 “增量返回”(即每次都返回全量轨迹),还需本地处理 “去重” 逻辑 —— 既要对比历史数据,又要避免重复入库,增加了业务复杂度和数据库压力。
1.3 扩展性差:车辆规模增长后的 “雪崩风险”
当监控车辆从 100 台增长到 1000 台、10000 台时,定时任务的压力会呈线性上升:
- 单任务处理:若用 1 个定时任务处理所有车辆,会出现 “任务执行时间超过轮询间隔”(比如处理 1000 台车辆需 15 秒,但轮询间隔仅 10 秒),导致任务堆积;
- 多任务拆分:若按车辆分组拆分多个定时任务,又会面临 “任务管理混乱”“部分任务失败导致数据丢失” 等问题,且无法动态适应车辆数量变化。
综上,定时任务仅适合 “实时性要求低、数据量小、接口调用成本低” 的场景,完全无法满足车辆轨迹的实时同步需求。
二、消息队列(MQ)为何是更优解?核心价值拆解
消息队列(如 RabbitMQ、Kafka、RocketMQ)的 “异步通信 + 解耦 + 削峰” 特性,恰好能解决定时任务的所有痛点。我们先从 “轨迹同步场景” 的需求出发,拆解 MQ 的核心价值:
2.1 实时性:从 “被动轮询” 到 “主动推送”
MQ 的本质是 “事件驱动”—— 第三方系统只要在 “车辆产生新轨迹” 时,主动将数据发送到 MQ 的指定队列,我们的系统就能实时消费(延迟通常在毫秒级),彻底消除 “轮询间隔” 带来的延迟。
对比两种模式的流程差异,更易理解:
- 定时任务模式:本地系统→每隔 N 秒调用第三方接口→拉取轨迹数据→入库;
- MQ 模式:第三方系统→车辆产生新轨迹→发送数据到 MQ→本地系统消费 MQ 消息→入库。
前者是 “我问你要”,后者是 “你有了就给我”,实时性差距一目了然。
2.2 解耦与弹性:隔离第三方依赖,应对流量波动
- 解耦第三方系统:若第三方接口临时故障,定时任务会持续失败;而 MQ 模式下,第三方只需把数据发送到 MQ(MQ 通常有高可用保障),即使本地系统暂时下线,消息也会在 MQ 中暂存,恢复后可继续消费,避免数据丢失。
- 应对轨迹流量波动:车辆在高速行驶时,轨迹数据产生频率高(比如 1 秒 1 条);停车时频率低(比如 1 分钟 1 条)。MQ 能自动 “削峰填谷”—— 高峰时消息暂存队列,消费端按自身能力(如多线程)平稳处理,避免本地系统被瞬时高流量压垮。
2.3 可追溯与可靠性:避免数据丢失或重复
成熟的 MQ(如 RocketMQ、Kafka)都具备 “消息持久化”“消费确认(ACK)”“重试机制” 等特性,完美解决轨迹数据的 “可靠性” 问题:
- 消息持久化:即使 MQ 重启,未消费的轨迹消息也不会丢失;
- 消费确认(ACK):本地系统只有成功将轨迹数据入库后,才向 MQ 发送 “消费成功” 确认,若入库失败,MQ 会重新投递消息;
- 重复消费处理:通过 “消息唯一 ID + 数据库唯一索引”,可轻松解决 MQ 重试导致的重复消费问题(后文代码会详细实现)。
三、方案落地前提:明确第三方接口的 “MQ 适配方式”
要使用 MQ 方案,核心前提是 “第三方系统能将轨迹数据推送到 MQ”。但实际场景中,第三方可能无法直接对接我们的 MQ(比如对方技术栈限制、安全策略等),此时需分两种情况处理,确保方案可落地:
场景 | 对接方式 | 优势 | 注意事项 |
---|---|---|---|
第三方支持 MQ 推送 | 第三方直接将轨迹数据发送到我们提供的 MQ 队列(需约定消息格式、队列名称) | 实时性最强,无中间环节 | 需与第三方约定消息结构、ACK 机制、异常重试策略 |
第三方仅提供 HTTP 接口 | 我们搭建 “HTTP 转 MQ 适配器”:定时调用第三方接口拉取数据,再发送到 MQ 队列 | 兼容性强,无需第三方改造 | 适配器需做 “增量拉取”“限流控制”,避免空轮询 |
重点说明 “HTTP 转 MQ 适配器”:若第三方仅支持 HTTP 接口,我们并非回到 “定时任务” 的老路,而是将 “定时轮询” 限制在 “适配器层”,后续的轨迹处理(入库、业务逻辑)仍通过 MQ 异步化。这样做的好处是:
- 适配器仅负责 “拉取数据→推 MQ”,逻辑简单,即使轮询有延迟,后续消费可并行处理,整体延迟降低;
- 业务层(入库、轨迹分析)与 “第三方接口调用” 解耦,若第三方接口调整,只需修改适配器,无需改动业务代码;
- 适配器可按 “车辆 ID 分片” 或 “时间范围” 实现增量拉取,避免重复拉取(比如每次拉取 “上一次拉取时间至今” 的轨迹数据)。
下文将以 “第三方支持 MQ 推送” 为核心场景(实时性最优),同时提供 “HTTP 转 MQ 适配器” 的实现方案,覆盖绝大多数实际需求。
四、技术选型:MQ 及周边组件的版本与理由(2024 最新稳定版)
技术选型的核心原则是 “稳定优先、生态适配、性能匹配”,结合 SpringBoot(主流开发框架)和轨迹数据场景,具体选型如下:
组件 | 版本 | 选型理由 |
---|---|---|
SpringBoot | 3.2.5(2024 稳定版) | 兼容 JDK17,生态完善,对 MQ、MyBatis-Plus 等组件有成熟集成方案 |
消息队列(MQ) | RocketMQ 5.2.0 | 1. 支持事务消息、延迟消息,适合轨迹数据的可靠性需求;2. 单机吞吐量可达 10 万 +/ 秒,满足大规模车辆场景;3. 与 SpringBoot 集成友好(Spring Cloud Stream RocketMQ) |
数据库 | MySQL 8.0.36 | 主流关系型数据库,支持索引优化(车辆 ID + 时间戳),适合轨迹数据的查询需求 |
ORM 框架 | MyBatis-Plus 3.5.5 | 简化 CRUD 操作,支持批量插入(轨迹数据通常批量产生)、逻辑删除等功能 |
工具类 | commons-lang3 3.14.0、spring-core 6.1.5 | 提供字符串判空、集合判空等工具,符合阿里巴巴开发规范 |
接口文档 | Swagger3(springdoc-openapi 2.3.0) | 自动生成 API 文档,方便调试轨迹相关接口 |
日志框架 | SLF4J+Logback | 符合阿里巴巴规范,通过 @Slf4j 注解简化日志打印 |
五、全流程落地:从 MQ 配置到轨迹数据入库(含完整代码)
我们以 “第三方推送轨迹数据到 RocketMQ→本地系统消费消息→轨迹数据入库→提供查询接口” 为核心流程,分步骤实现,所有代码基于 JDK17、SpringBoot 3.2.5 编写,符合阿里巴巴规范。
5.1 第一步:环境准备(POM 依赖配置)
首先在pom.xml
中引入核心依赖,包括 SpringBoot、RocketMQ、MyBatis-Plus、Swagger3 等:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.vehicle.trace</groupId><artifactId>vehicle-trace-sync</artifactId><version>0.0.1-SNAPSHOT</version><name>vehicle-trace-sync</name><description>车辆轨迹数据实时同步系统(MQ方案)</description><properties><java.version>17</java.version><rocketmq.version>5.2.0</rocketmq.version><mybatis-plus.version>3.5.5</mybatis-plus.version><commons-lang3.version>3.14.0</commons-lang3.version><springdoc.version>2.3.0</springdoc.version></properties><dependencies><!-- SpringBoot核心依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!-- RocketMQ集成依赖(Spring Cloud Stream) --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>${rocketmq.version}</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><!-- MySQL驱动 --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>${commons-lang3.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>6.1.5</version></dependency><!-- Lombok(@Slf4j、@Data等) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!-- Swagger3(接口文档) --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>${springdoc.version}</version></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
5.2 第二步:核心配置(application.yml)
配置 RocketMQ、MySQL、MyBatis-Plus 等核心参数,注意 RocketMQ 的 NameServer 地址需与实际部署环境一致:
# 服务端口
server:port: 8080# Spring配置
spring:# MySQL数据源datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/vehicle_trace_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8username: rootpassword: root123456# RocketMQ配置rocketmq:# NameServer地址(集群环境用逗号分隔)name-server: 127.0.0.1:9876# 消费者配置consumer:# 消费者组(必须唯一,同一组内的消费者负载均衡消费)group: vehicle_trace_consumer_group# 消息模式:CLUSTERING(集群模式)、BROADCASTING(广播模式),轨迹同步用集群模式message-model: CLUSTERING# 消费线程数(根据车辆规模调整,默认20)consume-thread-max: 50# 批量消费最大条数(轨迹数据可能批量推送,一次消费多条)consume-batch-max-size: 100# 生产者配置(若需要“HTTP转MQ适配器”,需配置生产者)producer:group: vehicle_trace_producer_group# 发送超时时间send-timeout: 3000# 重试次数retry-times-when-send-failed: 2# MyBatis-Plus配置
mybatis-plus:# mapper.xml文件路径mapper-locations: classpath:mapper/**/*.xml# 实体类包路径type-aliases-package: com.vehicle.trace.entityconfiguration:# 开启驼峰命名转换(数据库字段下划线→实体类驼峰)map-underscore-to-camel-case: true# 日志打印(开发环境开启,生产环境关闭)log-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 全局配置global-config:db-config:# 逻辑删除字段(若需要)logic-delete-field: isDeletedlogic-delete-value: 1logic-not-delete-value: 0# Swagger3配置(springdoc-openapi)
springdoc:api-docs:path: /api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: methodpackages-to-scan: com.vehicle.trace.controller# 自定义配置:轨迹相关
vehicle:trace:# MQ队列名称(与第三方约定)mq-topic: vehicle_trace_topicmq-tag: trace_data# 增量拉取配置(仅HTTP适配器用)pull-interval: 1000 # 拉取间隔(毫秒)third-party-url: https://api.third-party.com/vehicle/trace # 第三方HTTP接口地址
5.3 第三步:数据库设计(轨迹表)
创建vehicle_trace
表,存储车辆轨迹数据,核心字段包括 “车辆 ID、轨迹时间戳、经纬度、速度” 等,并建立索引优化查询(车辆 ID + 轨迹时间戳,支持按车辆和时间范围查询):
-- 创建数据库(若不存在)
CREATE DATABASE IF NOT EXISTS vehicle_trace_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;-- 使用数据库
USE vehicle_trace_db;-- 创建车辆轨迹表
CREATE TABLE IF NOT EXISTS vehicle_trace (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',trace_id VARCHAR(64) NOT NULL COMMENT '轨迹唯一ID(第三方提供,用于去重)',vehicle_id VARCHAR(32) NOT NULL COMMENT '车辆ID(如车牌、设备编号)',longitude DECIMAL(10,6) NOT NULL COMMENT '经度(如116.404267)',latitude DECIMAL(10,6) NOT NULL COMMENT '纬度(如39.915341)',speed DECIMAL(5,2) DEFAULT 0.00 COMMENT '车辆速度(km/h)',direction INT DEFAULT 0 COMMENT '行驶方向(角度,0-360)',trace_time DATETIME NOT NULL COMMENT '轨迹产生时间(第三方数据中的时间戳)',create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据入库时间',update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据更新时间',is_deleted TINYINT NOT NULL DEFAULT 0 COMMENT '逻辑删除(0-未删除,1-已删除)',PRIMARY KEY (id),-- 唯一索引:防止重复插入(trace_id唯一)UNIQUE KEY uk_trace_id (trace_id),-- 联合索引:支持按车辆ID+轨迹时间范围查询(核心查询场景)KEY idx_vehicle_time (vehicle_id, trace_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='车辆轨迹数据表';
5.4 第四步:实体类与 Mapper(MyBatis-Plus)
5.4.1 轨迹实体类(VehicleTrace.java)
使用 Lombok 的@Data
简化 get/set 方法,字段与数据库表对应,并添加 Swagger3 注解说明:
package com.vehicle.trace.entity;import com.baomidou.mybatisplus.annotation.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;/*** 车辆轨迹实体类* 对应数据库表:vehicle_trace*/
@Data
@TableName("vehicle_trace")
@Schema(description = "车辆轨迹数据实体")
public class VehicleTrace {/*** 主键ID*/@TableId(type = IdType.AUTO)@Schema(description = "主键ID")private Long id;/*** 轨迹唯一ID(第三方提供,用于去重)*/@Schema(description = "轨迹唯一ID(第三方提供,用于去重)")private String traceId;/*** 车辆ID(如车牌、设备编号)*/@Schema(description = "车辆ID(如车牌、设备编号)")private String vehicleId;/*** 经度*/@Schema(description = "经度(如116.404267)")private BigDecimal longitude;/*** 纬度*/@Schema(description = "纬度(如39.915341)")private BigDecimal latitude;/*** 车辆速度(km/h)*/@Schema(description = "车辆速度(km/h)")private BigDecimal speed;/*** 行驶方向(角度,0-360)*/@Schema(description = "行驶方向(角度,0-360)")private Integer direction;/*** 轨迹产生时间(第三方数据中的时间戳)*/@Schema(description = "轨迹产生时间(第三方数据中的时间戳)")private LocalDateTime traceTime;/*** 数据入库时间(自动填充)*/@TableField(fill = FieldFill.INSERT)@Schema(description = "数据入库时间")private LocalDateTime createTime;/*** 数据更新时间(自动填充)*/@TableField(fill = FieldFill.INSERT_UPDATE)@Schema(description = "数据更新时间")private LocalDateTime updateTime;/*** 逻辑删除(0-未删除,1-已删除)*/@TableLogic@Schema(description = "逻辑删除(0-未删除,1-已删除)")private Integer isDeleted;
}
5.4.2 MyBatis-Plus 自动填充配置(MetaObjectHandler.java)
实现MetaObjectHandler
接口,自动填充createTime
和updateTime
字段,避免手动设置:
package com.vehicle.trace.config;import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.reflection.MetaObject;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;/*** MyBatis-Plus字段自动填充配置* 处理createTime、updateTime等字段的自动填充*/
@Component
@Slf4j
public class MyMetaObjectHandler implements MetaObjectHandler {/*** 插入操作时自动填充*/@Overridepublic void insertFill(MetaObject metaObject) {log.info("开始执行插入操作的字段自动填充");// 填充createTimestrictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now());// 填充updateTime(插入时与createTime一致)strictInsertFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());}/*** 更新操作时自动填充*/@Overridepublic void updateFill(MetaObject metaObject) {log.info("开始执行更新操作的字段自动填充");// 填充updateTimestrictUpdateFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());}
}
5.4.3 Mapper 接口(VehicleTraceMapper.java)
继承 MyBatis-Plus 的BaseMapper
,无需手动编写 SQL(简单 CRUD):
package com.vehicle.trace.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.vehicle.trace.entity.VehicleTrace;
import org.apache.ibatis.annotations.Mapper;/*** 车辆轨迹Mapper接口* 继承BaseMapper,提供基础CRUD操作*/
@Mapper
public interface VehicleTraceMapper extends BaseMapper<VehicleTrace> {
}
5.5 第五步:Service 层(轨迹数据业务处理)
Service 层负责 “轨迹数据校验→去重→入库” 的核心业务逻辑,同时处理异常重试:
5.5.1 Service 接口(VehicleTraceService.java)
package com.vehicle.trace.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.vehicle.trace.entity.VehicleTrace;
import com.vehicle.trace.vo.request.VehicleTraceBatchReq;
import com.vehicle.trace.vo.request.VehicleTraceQueryReq;
import com.vehicle.trace.vo.response.PageResult;
import java.util.List;/*** 车辆轨迹服务接口*/
public interface VehicleTraceService extends IService<VehicleTrace> {/*** 单条轨迹数据入库* @param trace 轨迹数据实体* @return 入库结果(true-成功,false-失败)*/boolean saveTrace(VehicleTrace trace);/*** 批量轨迹数据入库* @param traceBatchReq 批量轨迹请求参数* @return 入库成功的条数*/int batchSaveTrace(VehicleTraceBatchReq traceBatchReq);/*** 按条件分页查询轨迹数据* @param queryReq 查询条件(车辆ID、时间范围、页码、页大小)* @return 分页查询结果(轨迹列表+总条数)*/PageResult<VehicleTrace> queryTraceByPage(VehicleTraceQueryReq queryReq);
}
5.5.2 Service 实现类(VehicleTraceServiceImpl.java)
核心逻辑:
- 数据校验:判空、字段合法性(如经纬度范围);
- 去重处理:依赖数据库
uk_trace_id
唯一索引,捕获DuplicateKeyException
避免重复入库; - 批量入库:使用 MyBatis-Plus 的
saveBatch
提升效率(适合批量轨迹数据);
package com.vehicle.trace.service.impl;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.vehicle.trace.entity.VehicleTrace;
import com.vehicle.trace.mapper.VehicleTraceMapper;
import com.vehicle.trace.service.VehicleTraceService;
import com.vehicle.trace.vo.request.VehicleTraceBatchReq;
import com.vehicle.trace.vo.request.VehicleTraceQueryReq;
import com.vehicle.trace.vo.response.PageResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;/*** 车辆轨迹服务实现类*/
@Service
@Slf4j
public class VehicleTraceServiceImpl extends ServiceImpl<VehicleTraceMapper, VehicleTrace> implements VehicleTraceService {/*** 经度范围校验(东经-180~180,北纬-90~90)*/private static final BigDecimal MIN_LONGITUDE = new BigDecimal("-180.000000");private static final BigDecimal MAX_LONGITUDE = new BigDecimal("180.000000");private static final BigDecimal MIN_LATITUDE = new BigDecimal("-90.000000");private static final BigDecimal MAX_LATITUDE = new BigDecimal("90.000000");/*** 单条轨迹数据入库* @param trace 轨迹数据实体* @return 入库结果(true-成功,false-失败)*/@Overridepublic boolean saveTrace(VehicleTrace trace) {// 1. 数据校验if (!validateTraceData(trace)) {log.error("轨迹数据校验失败,数据:{}", trace);return false;}try {// 2. 入库(依赖uk_trace_id唯一索引去重)boolean saveResult = save(trace);if (saveResult) {log.info("单条轨迹入库成功,traceId:{},vehicleId:{}", trace.getTraceId(), trace.getVehicleId());return true;} else {log.error("单条轨迹入库失败,traceId:{},vehicleId:{}", trace.getTraceId(), trace.getVehicleId());return false;}} catch (DuplicateKeyException e) {// 3. 捕获唯一索引冲突异常(重复数据)log.warn("轨迹数据已存在,无需重复入库,traceId:{}", trace.getTraceId(), e);return true; // 重复数据视为“处理成功”,避免MQ重复重试} catch (Exception e) {log.error("单条轨迹入库异常,traceId:{},vehicleId:{}", trace.getTraceId(), trace.getVehicleId(), e);return false;}}/*** 批量轨迹数据入库* @param traceBatchReq 批量轨迹请求参数* @return 入库成功的条数*/@Overridepublic int batchSaveTrace(VehicleTraceBatchReq traceBatchReq) {// 1. 请求参数校验if (Objects.isNull(traceBatchReq) || CollectionUtils.isEmpty(traceBatchReq.getTraceList())) {log.error("批量轨迹请求参数为空");return 0;}List<VehicleTrace> traceList = traceBatchReq.getTraceList();log.info("开始处理批量轨迹入库,总条数:{},vehicleId:{}", traceList.size(), traceBatchReq.getVehicleId());// 2. 过滤无效数据(校验不通过的轨迹)List<VehicleTrace> validTraceList = traceList.stream().filter(this::validateTraceData).toList();if (CollectionUtils.isEmpty(validTraceList)) {log.warn("批量轨迹中无有效数据,vehicleId:{}", traceBatchReq.getVehicleId());return 0;}try {// 3. 批量入库(MyBatis-Plus默认批次提交,可通过配置调整batchSize)boolean saveResult = saveBatch(validTraceList);if (saveResult) {log.info("批量轨迹入库成功,成功条数:{},vehicleId:{}", validTraceList.size(), traceBatchReq.getVehicleId());return validTraceList.size();} else {log.error("批量轨迹入库失败,总有效条数:{},vehicleId:{}", validTraceList.size(), traceBatchReq.getVehicleId());return 0;}} catch (DuplicateKeyException e) {// 4. 若批量中存在重复数据,尝试单条处理(避免全量失败)log.warn("批量轨迹中存在重复数据,开始单条处理,vehicleId:{}", traceBatchReq.getVehicleId(), e);int successCount = 0;for (VehicleTrace trace : validTraceList) {if (saveTrace(trace)) {successCount++;}}log.info("批量轨迹单条处理完成,成功条数:{},总有效条数:{},vehicleId:{}",successCount, validTraceList.size(), traceBatchReq.getVehicleId());return successCount;} catch (Exception e) {log.error("批量轨迹入库异常,总有效条数:{},vehicleId:{}", validTraceList.size(), traceBatchReq.getVehicleId(), e);return 0;}}/*** 按条件分页查询轨迹数据* @param queryReq 查询条件(车辆ID、时间范围、页码、页大小)* @return 分页查询结果(轨迹列表+总条数)*/@Overridepublic PageResult<VehicleTrace> queryTraceByPage(VehicleTraceQueryReq queryReq) {// 1. 参数校验Objects.requireNonNull(queryReq, "轨迹查询参数不能为空");String vehicleId = queryReq.getVehicleId();StringUtils.hasText(vehicleId, "车辆ID不能为空");// 2. 构建分页条件(页码从1开始,页大小默认10)int pageNum = Objects.isNull(queryReq.getPageNum()) ? 1 : queryReq.getPageNum();int pageSize = Objects.isNull(queryReq.getPageSize()) ? 10 : queryReq.getPageSize();Page<VehicleTrace> page = new Page<>(pageNum, pageSize);// 3. 构建查询条件(车辆ID+时间范围)LambdaQueryWrapper<VehicleTrace> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(VehicleTrace::getVehicleId, vehicleId).ge(Objects.nonNull(queryReq.getStartTime()), VehicleTrace::getTraceTime, queryReq.getStartTime()).le(Objects.nonNull(queryReq.getEndTime()), VehicleTrace::getTraceTime, queryReq.getEndTime()).orderByDesc(VehicleTrace::getTraceTime); // 按轨迹时间倒序(最新的在前)// 4. 分页查询IPage<VehicleTrace> tracePage = page(page, queryWrapper);// 5. 封装结果PageResult<VehicleTrace> pageResult = new PageResult<>();pageResult.setList(tracePage.getRecords());pageResult.setTotal(tracePage.getTotal());pageResult.setPageNum(pageNum);pageResult.setPageSize(pageSize);log.info("轨迹分页查询完成,vehicleId:{},页码:{},页大小:{},总条数:{}",vehicleId, pageNum, pageSize, tracePage.getTotal());return pageResult;}/*** 轨迹数据合法性校验* @param trace 轨迹数据实体* @return 校验结果(true-合法,false-非法)*/private boolean validateTraceData(VehicleTrace trace) {// 1. 非空校验if (Objects.isNull(trace)) {log.error("轨迹数据为空");return false;}if (StringUtils.isBlank(trace.getTraceId())) {log.error("轨迹唯一ID(traceId)为空");return false;}if (StringUtils.isBlank(trace.getVehicleId())) {log.error("车辆ID(vehicleId)为空,traceId:{}", trace.getTraceId());return false;}if (Objects.isNull(trace.getLongitude()) || Objects.isNull(trace.getLatitude())) {log.error("经纬度为空,traceId:{},vehicleId:{}", trace.getTraceId(), trace.getVehicleId());return false;}if (Objects.isNull(trace.getTraceTime())) {log.error("轨迹时间(traceTime)为空,traceId:{},vehicleId:{}", trace.getTraceId(), trace.getVehicleId());return false;}// 2. 经纬度范围校验(防止无效数据)BigDecimal longitude = trace.getLongitude();BigDecimal latitude = trace.getLatitude();if (longitude.compareTo(MIN_LONGITUDE) < 0 || longitude.compareTo(MAX_LONGITUDE) > 0) {log.error("经度超出范围(-180~180),traceId:{},longitude:{}", trace.getTraceId(), longitude);return false;}if (latitude.compareTo(MIN_LATITUDE) < 0 || latitude.compareTo(MAX_LATITUDE) > 0) {log.error("纬度超出范围(-90~90),traceId:{},latitude:{}", trace.getTraceId(), latitude);return false;}// 3. 轨迹时间校验(不能超过当前时间,避免未来数据)if (trace.getTraceTime().isAfter(LocalDateTime.now())) {log.error("轨迹时间为未来时间,traceId:{},traceTime:{}", trace.getTraceId(), trace.getTraceTime());return false;}return true;}
}
5.6 第六步:MQ 消费端(实时处理轨迹消息)
使用 RocketMQ 的@RocketMQMessageListener
注解,实现消息消费逻辑,核心是 “解析 MQ 消息→调用 Service 入库”:
package com.vehicle.trace.mq.consumer;import com.alibaba.fastjson.JSON;
import com.vehicle.trace.entity.VehicleTrace;
import com.vehicle.trace.service.VehicleTraceService;
import com.vehicle.trace.vo.request.VehicleTraceBatchReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Objects;/*** 车辆轨迹MQ消息消费者* 监听第三方推送的轨迹消息,实时入库*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "${vehicle.trace.mq-topic}", // MQ主题(从配置文件读取)selectorExpression = "${vehicle.trace.mq-tag}", // 消息标签(过滤消息)consumerGroup = "${spring.rocketmq.consumer.group}" // 消费者组
)
public class VehicleTraceConsumer implements RocketMQListener<String> {@Autowiredprivate VehicleTraceService vehicleTraceService;/*** 消费MQ消息(处理轨迹数据)* @param message MQ消息内容(JSON格式,可能是单条或批量轨迹)*/@Overridepublic void onMessage(String message) {// 1. 消息非空校验if (StringUtils.isBlank(message)) {log.error("MQ消费的轨迹消息为空");return;}log.info("开始消费轨迹MQ消息,消息内容:{}", message);try {// 2. 判断消息类型:批量轨迹(VehicleTraceBatchReq)或单条轨迹(VehicleTrace)// (注:需与第三方约定消息格式,此处假设两种格式都支持)if (message.contains("\"traceList\"")) {// 2.1 批量轨迹消息:解析为VehicleTraceBatchReqVehicleTraceBatchReq batchReq = JSON.parseObject(message, VehicleTraceBatchReq.class);int successCount = vehicleTraceService.batchSaveTrace(batchReq);log.info("批量轨迹消息消费完成,成功入库条数:{}", successCount);} else {// 2.2 单条轨迹消息:解析为VehicleTraceVehicleTrace trace = JSON.parseObject(message, VehicleTrace.class);boolean saveResult = vehicleTraceService.saveTrace(trace);log.info("单条轨迹消息消费完成,入库结果:{},traceId:{}", saveResult, trace.getTraceId());}} catch (Exception e) {// 3. 消费异常:打印日志,MQ会自动重试(重试次数由配置决定)log.error("轨迹MQ消息消费异常,消息内容:{}", message, e);// 若需要“死信队列”处理(重试多次失败后),可在此处手动抛异常,触发RocketMQ死信机制throw new RuntimeException("轨迹消息消费失败,触发MQ重试", e);}}
}
5.7 第七步:HTTP 转 MQ 适配器(应对第三方仅支持 HTTP 接口的场景)
若第三方无法直接推送 MQ 消息,需搭建 “HTTP 转 MQ 适配器”—— 通过定时任务调用第三方 HTTP 接口拉取数据,再发送到 MQ 队列。核心是 “增量拉取 + 限流控制”:
5.7.1 适配器配置(定时任务 + MQ 生产者)
package com.vehicle.trace.mq.adapter;import com.alibaba.fastjson.JSON;
import com.vehicle.trace.entity.VehicleTrace;
import com.vehicle.trace.vo.request.ThirdPartyTraceReq;
import com.vehicle.trace.vo.response.ThirdPartyTraceResp;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;/*** HTTP转MQ适配器* 定时调用第三方HTTP接口拉取轨迹数据,发送到MQ队列*/
@Component
@Slf4j
public class HttpToMqAdapter {@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 第三方HTTP接口地址*/@Value("${vehicle.trace.third-party-url}")private String thirdPartyUrl;/*** MQ主题+标签(格式:topic:tag)*/@Value("${vehicle.trace.mq-topic}:${vehicle.trace.mq-tag}")private String mqTopicTag;/*** 上次拉取时间(原子引用,保证线程安全)* 初始值:当前时间前1分钟(避免首次拉取过多历史数据)*/private final AtomicReference<LocalDateTime> lastPullTime = new AtomicReference<>(LocalDateTime.now().minusMinutes(1));/*** 定时拉取第三方轨迹数据(间隔从配置文件读取,单位:毫秒)* 注:@Scheduled支持fixedRate(固定间隔)、cron表达式(灵活时间),此处用fixedRate*/@Scheduled(fixedRateString = "${vehicle.trace.pull-interval}")public void pullTraceFromThirdParty() {log.info("开始执行HTTP拉取轨迹数据任务,上次拉取时间:{}", lastPullTime.get());// 1. 构建拉取参数(增量拉取:上次拉取时间至今)ThirdPartyTraceReq pullReq = new ThirdPartyTraceReq();pullReq.setStartTime(lastPullTime.get());pullReq.setEndTime(LocalDateTime.now());// (可选)按车辆分组拉取,避免单次拉取数据量过大// pullReq.setVehicleId("粤A12345");// 2. 调用第三方HTTP接口ThirdPartyTraceResp pullResp = callThirdPartyApi(pullReq);if (Objects.isNull(pullResp) || CollectionUtils.isEmpty(pullResp.getTraceList())) {log.warn("第三方HTTP接口无轨迹数据返回,拉取时间范围:{}~{}", pullReq.getStartTime(), pullReq.getEndTime());// 更新上次拉取时间(避免下次重复拉取相同时间范围)lastPullTime.set(LocalDateTime.now());return;}// 3. 提取轨迹数据,发送到MQList<VehicleTrace> traceList = pullResp.getTraceList();log.info("第三方HTTP接口拉取到轨迹数据,条数:{},开始发送到MQ", traceList.size());try {// 3.1 批量发送到MQ(也可单条发送,根据数据量调整)String mqMessage = JSON.toJSONString(traceList);rocketMQTemplate.convertAndSend(mqTopicTag, mqMessage);log.info("轨迹数据从HTTP转MQ完成,发送条数:{}", traceList.size());// 3.2 更新上次拉取时间(仅在发送成功后更新,避免数据丢失)lastPullTime.set(LocalDateTime.now());} catch (Exception e) {log.error("轨迹数据从HTTP转MQ失败,拉取条数:{}", traceList.size(), e);// 发送失败不更新上次拉取时间,下次重试拉取}}/*** 调用第三方HTTP接口* @param pullReq 拉取参数(时间范围、车辆ID等)* @return 第三方返回的轨迹数据*/private ThirdPartyTraceResp callThirdPartyApi(ThirdPartyTraceReq pullReq) {try {// 1. 构建HTTP请求头(根据第三方要求调整,如添加认证token)HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);// (示例)添加认证tokenheaders.set("Authorization", "Bearer your-third-party-token");// 2. 构建HTTP请求实体HttpEntity<ThirdPartyTraceReq> requestEntity = new HttpEntity<>(pullReq, headers);// 3. 发送POST请求(根据第三方接口类型调整为GET/POST)return restTemplate.exchange(thirdPartyUrl,HttpMethod.POST,requestEntity,ThirdPartyTraceResp.class).getBody();} catch (Exception e) {log.error("调用第三方HTTP接口异常,拉取参数:{}", JSON.toJSONString(pullReq), e);return null;}}
}
5.7.2 RestTemplate 配置(用于 HTTP 请求)
package com.vehicle.trace.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;/*** RestTemplate配置类* 用于调用第三方HTTP接口*/
@Configuration
public class RestTemplateConfig {@Beanpublic RestTemplate restTemplate() {// 可在此处配置超时时间、拦截器(如日志拦截、重试拦截)等RestTemplate restTemplate = new RestTemplate();// (示例)配置超时时间(需配合ClientHttpRequestFactory)// SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();// factory.setConnectTimeout(3000); // 连接超时3秒// factory.setReadTimeout(5000); // 读取超时5秒// restTemplate.setRequestFactory(factory);return restTemplate;}
}
5.8 第八步:Controller 层(提供轨迹查询接口)
通过 Swagger3 暴露轨迹查询接口,供前端或其他系统调用:
package com.vehicle.trace.controller;import com.vehicle.trace.entity.VehicleTrace;
import com.vehicle.trace.service.VehicleTraceService;
import com.vehicle.trace.vo.request.VehicleTraceQueryReq;
import com.vehicle.trace.vo.response.PageResult;
import com.vehicle.trace.vo.response.Result;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Objects;/*** 车辆轨迹Controller* 提供轨迹查询接口(Swagger3文档)*/
@RestController
@RequestMapping("/api/vehicle/trace")
@Slf4j
@Tag(name = "车辆轨迹管理", description = "车辆轨迹数据查询接口")
public class VehicleTraceController {@Autowiredprivate VehicleTraceService vehicleTraceService;/*** 分页查询车辆轨迹数据* @param queryReq 查询条件(车辆ID、时间范围、页码、页大小)* @return 分页查询结果(成功/失败+数据)*/@PostMapping("/queryPage")@Operation(summary = "分页查询轨迹",description = "按车辆ID和时间范围分页查询轨迹数据,支持页码和页大小调整",responses = {@ApiResponse(responseCode = "200", description = "查询成功",content = @Content(schema = @Schema(implementation = PageResult.class))),@ApiResponse(responseCode = "400", description = "参数错误",content = @Content(schema = @Schema(implementation = Result.class)))})public Result<PageResult<VehicleTrace>> queryTraceByPage(@Parameter(description = "轨迹查询条件", required = true)@RequestBody VehicleTraceQueryReq queryReq) {try {// 参数校验(非空)Objects.requireNonNull(queryReq, "查询参数不能为空");PageResult<VehicleTrace> pageResult = vehicleTraceService.queryTraceByPage(queryReq);return Result.success(pageResult, "轨迹查询成功");} catch (IllegalArgumentException e) {log.error("轨迹查询参数错误:{}", e.getMessage());return Result.fail(400, e.getMessage());} catch (Exception e) {log.error("轨迹查询异常", e);return Result.fail(500, "轨迹查询失败,请联系管理员");}}
}
5.9 第九步:通用 VO 与工具类(确保代码完整性)
5.9.1 分页查询请求 VO(VehicleTraceQueryReq.java)
package com.vehicle.trace.vo.request;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Objects;/*** 车辆轨迹分页查询请求VO*/
@Data
@Schema(description = "车辆轨迹分页查询请求参数")
public class VehicleTraceQueryReq {/*** 车辆ID(必填)*/@Schema(description = "车辆ID(如车牌、设备编号)", required = true)private String vehicleId;/*** 轨迹开始时间(可选)*/@Schema(description = "轨迹开始时间(如2024-05-01T00:00:00)")private LocalDateTime startTime;/*** 轨迹结束时间(可选)*/@Schema(description = "轨迹结束时间(如2024-05-01T23:59:59)")private LocalDateTime endTime;/*** 页码(默认1)*/@Schema(description = "页码(默认1)")private Integer pageNum;/*** 页大小(默认10)*/@Schema(description = "页大小(默认10)")private Integer pageSize;/*** 重写pageNum的getter,默认值1*/public Integer getPageNum() {return Objects.isNull(pageNum) ? 1 : pageNum;}/*** 重写pageSize的getter,默认值10*/public Integer getPageSize() {return Objects.isNull(pageSize) ? 10 : pageSize;}
}
5.9.2 批量轨迹请求 VO(VehicleTraceBatchReq.java)
package com.vehicle.trace.vo.request;import com.vehicle.trace.entity.VehicleTrace;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;/*** 批量轨迹入库请求VO*/
@Data
@Schema(description = "批量轨迹入库请求参数")
public class VehicleTraceBatchReq {/*** 车辆ID(批量数据所属车辆,可选)*/@Schema(description = "车辆ID(批量数据所属车辆,可选)")private String vehicleId;/*** 轨迹列表(必填)*/@Schema(description = "轨迹数据列表", required = true)private List<VehicleTrace> traceList;/*** 校验请求参数合法性* @return 校验结果(true-合法,false-非法)*/public boolean validate() {if (CollectionUtils.isEmpty(traceList)) {return false;}// 若指定了vehicleId,需确保所有轨迹的vehicleId一致if (Objects.nonNull(vehicleId) && !traceList.stream().allMatch(trace -> vehicleId.equals(trace.getVehicleId()))) {return false;}return true;}
}
5.9.3 第三方接口请求 / 响应 VO(ThirdPartyTraceReq.java、ThirdPartyTraceResp.java)
package com.vehicle.trace.vo.request;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;/*** 调用第三方轨迹接口的请求VO*/
@Data
@Schema(description = "第三方轨迹拉取请求参数")
public class ThirdPartyTraceReq {/*** 拉取开始时间(必填)*/@Schema(description = "拉取开始时间", required = true)private LocalDateTime startTime;/*** 拉取结束时间(必填)*/@Schema(description = "拉取结束时间", required = true)private LocalDateTime endTime;/*** 车辆ID(可选,为空则拉取所有车辆)*/@Schema(description = "车辆ID(可选,为空则拉取所有车辆)")private String vehicleId;
}
package com.vehicle.trace.vo.response;import com.vehicle.trace.entity.VehicleTrace;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;/*** 第三方轨迹接口的响应VO*/
@Data
@Schema(description = "第三方轨迹拉取响应结果")
public class ThirdPartyTraceResp {/*** 响应码(0-成功,其他-失败)*/@Schema(description = "响应码(0-成功,其他-失败)")private Integer code;/*** 响应信息*/@Schema(description = "响应信息")private String message;/*** 轨迹数据列表*/@Schema(description = "轨迹数据列表")private List<VehicleTrace> traceList;/*** 判断响应是否成功* @return 成功标识(true-成功,false-失败)*/public boolean isSuccess() {return code != null && code == 0;}
}
5.9.4 通用响应结果 VO(Result.java、PageResult.java)
package com.vehicle.trace.vo.response;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** 通用API响应结果VO* @param <T> 响应数据类型*/
@Data
@Schema(description = "通用API响应结果")
public class Result<T> {/*** 响应码(200-成功,其他-失败)*/@Schema(description = "响应码(200-成功,其他-失败)")private Integer code;/*** 响应信息*/@Schema(description = "响应信息")private String message;/*** 响应数据*/@Schema(description = "响应数据")private T data;/*** 成功响应(无数据)* @return Result*/public static Result<Void> success() {Result<Void> result = new Result<>();result.setCode(200);result.setMessage("操作成功");return result;}/*** 成功响应(有数据)* @param data 响应数据* @param <T> 数据类型* @return Result<T>*/public static <T> Result<T> success(T data) {Result<T> result = new Result<>();result.setCode(200);result.setMessage("操作成功");result.setData(data);return result;}/*** 成功响应(有数据+自定义消息)* @param data 响应数据* @param message 自定义消息* @param <T> 数据类型* @return Result<T>*/public static <T> Result<T> success(T data, String message) {Result<T> result = new Result<>();result.setCode(200);result.setMessage(message);result.setData(data);return result;}/*** 失败响应(自定义码+消息)* @param code 响应码* @param message 失败消息* @return Result<Void>*/public static Result<Void> fail(Integer code, String message) {Result<Void> result = new Result<>();result.setCode(code);result.setMessage(message);return result;}/*** 失败响应(默认码400+消息)* @param message 失败消息* @return Result<Void>*/public static Result<Void> fail(String message) {Result<Void> result = new Result<>();result.setCode(400);result.setMessage(message);return result;}
}
package com.vehicle.trace.vo.response;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;/*** 分页查询结果VO* @param <T> 数据类型*/
@Data
@Schema(description = "分页查询结果")
public class PageResult<T> {/*** 数据列表*/@Schema(description = "数据列表")private List<T> list;/*** 总条数*/@Schema(description = "总条数")private Long total;/*** 当前页码*/@Schema(description = "当前页码")private Integer pageNum;/*** 页大小*/@Schema(description = "页大小")private Integer pageSize;/*** 总页数* @return 总页数*/@Schema(description = "总页数")public Integer getTotalPage() {if (total == null || total == 0) {return 0;}return (total.intValue() + pageSize - 1) / pageSize;}
}
六、关键问题解决方案:重复消费、消息丢失、延迟优化
即使方案落地,仍需解决 MQ 场景下的 3 个核心问题:重复消费、消息丢失、延迟优化,确保系统稳定运行。
6.1 重复消费:如何避免 “一条轨迹多次入库”?
问题原因:MQ 重试机制(消费失败后重新投递)、网络波动导致的重复推送,都会导致 “同一条轨迹消息被消费多次”。
解决方案:基于 “唯一 ID + 幂等性处理”,具体
- 唯一 ID:第三方为每条轨迹生成唯一的
traceId
(如 UUID),作为去重标识; - 数据库唯一索引:在
vehicle_trace
表上创建uk_trace_id
唯一索引,确保重复的traceId
无法插入; - 代码层捕获异常:在 Service 层捕获
DuplicateKeyException
,视为 “处理成功”,避免 MQ 继续重试。
代码验证:参考VehicleTraceServiceImpl.saveTrace()
方法中的DuplicateKeyException
捕获逻辑。
6.2 消息丢失:如何确保 “轨迹数据不丢失”?
问题场景:
- MQ 服务宕机,未消费的消息丢失;
- 消费端处理失败,消息未重试;
- 生产者发送消息失败,未重试。
解决方案:从 “生产者→MQ→消费者” 全链路保障:
- 生产者端:
- 开启 RocketMQ 生产者重试(
spring.rocketmq.producer.retry-times-when-send-failed=2
); - 关键消息使用 “同步发送 + 确认机制”,确保消息成功发送到 MQ。
- 开启 RocketMQ 生产者重试(
- MQ 端:
- 开启消息持久化(RocketMQ 默认开启,消息存储在磁盘);
- 配置 RocketMQ 集群(主从架构),避免单点故障;
- 启用 “死信队列”:重试多次(如 16 次)失败的消息,进入死信队列,后续人工处理。
- 消费者端:
- 消费确认(ACK):RocketMQ 默认 “消费成功后自动 ACK”,若消费异常,不 ACK,MQ 会重试;
- 业务逻辑幂等:即使消息重复,也不会导致数据错误(依赖
traceId
去重)。
配置验证:参考application.yml
中 RocketMQ 的retry-times-when-send-failed
、message-model
等配置。
6.3 延迟优化:如何进一步降低 “轨迹同步延迟”?
优化方向:
- 消费端多线程:增加 RocketMQ 消费者线程数(
spring.rocketmq.consumer.consume-thread-max=50
),提高消费速度; - 批量处理:
- 第三方尽量批量推送轨迹数据(减少 MQ 消息数量);
- 消费端使用
batchSaveTrace
批量入库,减少数据库 IO 次数;
- 索引优化:在
vehicle_trace
表上建立idx_vehicle_time
联合索引,加快查询速度; - 避免消费阻塞:消费端业务逻辑尽量简单(仅入库 + 必要校验),复杂逻辑(如轨迹分析)通过 “二次投递 MQ” 异步处理。
七、方案架构图与流程图
7.1 整体架构图(MQ 模式)
7.2 HTTP 转 MQ 适配器架构图
7.3 轨迹消费流程图
八、参考
- RocketMQ 可靠性保障:参考《Apache RocketMQ 官方文档》中 “消息持久化”“重试机制”“死信队列” 章节(官方文档链接);
- 定时任务的局限性:参考 Spring 官方文档中
@Scheduled
注解的适用场景说明,明确其不适合实时性要求高的场景(Spring 官方文档链接); - 数据库唯一索引去重:参考《MySQL 8.0 官方文档》中 “唯一约束” 的使用场景,用于防止重复数据插入(MySQL 官方文档链接);
- 批量入库性能优化:参考 MyBatis-Plus 官方文档中
saveBatch
方法的实现原理,通过批量 SQL 提升入库效率(MyBatis-Plus 官方文档链接)。
九、总结
车辆轨迹数据的实时同步,核心是 “从被动轮询转向主动消息驱动”。本文提供的 MQ 方案,通过 RocketMQ 的 “实时推送 + 可靠性保障”,结合 MySQL 的 “唯一索引去重” 和 MyBatis-Plus 的 “批量处理”,彻底解决了定时任务的延迟、资源浪费、扩展性差等问题。同时,针对 “第三方仅支持 HTTP 接口” 的场景,提供了 “HTTP 转 MQ 适配器” 的兼容方案,确保方案的落地性。
在实际项目中,可根据车辆规模(如 1000 台 vs10 万台)调整 MQ 消费者线程数、数据库连接池大小、批量处理条数等参数,进一步优化性能。最终实现 “轨迹数据实时同步、不丢失、不重复” 的业务目标。