商品中心—17.缓存与DB一致性的技术文档
大纲
1.缓存与数据库一致性服务的设计
2.缓存与数据库一致性服务的注解
3.缓存与数据库一致性服务的处理入口
4.缓存与数据库一致性服务的消费缓存消息
5.缓存与数据库一致性服务的消费检查
6.缓存与数据库一致性服务的实现总结
一.缓存 + DB双写的注解与AOP切面实现
二.先执行DB写入再基于AOP异步写缓存
三.缓存数据双写之缓存消息写MQ + 缓存数据写内存队列再延迟写DB
四.缓存消息基于内存双队列异步批量写DB
五.内存双队列定时交换与Batch切分
六.基于双内存队列实现定时批量写入DB
七.基于缓存key的Hash值实现内存队列分发
八.消息基于内存队列分发给线程后写入缓存
九.基于定时任务查询DB中缓存消息实现补偿
1.缓存与数据库一致性服务的设计
(1)缓存消息DB记录表
(2)整体流程图
(1)缓存消息DB记录表
CREATE TABLE `data_refresh_detail` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`cache_key` varchar(128) NOT NULL DEFAULT '' COMMENT '缓存key',`operation_type` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1新增/修改,2删除',`cache_json` text NOT NULL COMMENT '缓存内容',`cache_type` tinyint(1) DEFAULT NULL COMMENT '缓存类型,1Redis,2Tair',`cache_status` tinyint(1) DEFAULT '0' COMMENT '缓存的处理状态,默认为0未处理,1为已处理',`version` varchar(32) NOT NULL COMMENT '消息版本号',`message_type` tinyint(1) NOT NULL COMMENT '是否热点消息,0普通,1热点',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8 COMMENT='缓存消息DB记录表';
(2)整体流程
一.为什么获取缓存数据后不直接更新缓存而发送消息到MQ
最简单的实现方案,应该就是AOP切面获取到缓存数据后直接更新缓存。由于MQ可以提升性能、削峰、解耦,以及随着业务的迭代,更新缓存的环节可能会越来越复杂。所以AOP切面获取到缓存数据后,应该先将缓存数据发送到MQ,通过消费MQ的缓存消息来更新缓存,尽量不影响添加了注解的方法。
二.为什么要往DB写入缓存数据的记录
但是将缓存数据以消息的形式发送到MQ后,消费消息时可能会出现故障。所以为了保证缓存数据消息能够最终被消费到并更新缓存,需要当AOP切面获取到缓存数据后,就将缓存数据写入DB。这时为了不影响添加了注解的方法的性能,可以使用异步线程去写入DB。所以DB中的缓存数据主要用来检查消费是否异常,因此允许部分丢失。
三.为什么没有使用最简单的线程池,而添加了多个内存队列
检查消费是否异常时,内存队列可以方便对异常的数据添加处理,消费缓存消息时通过多个内存队列 + 多线程的方式来提升处理速度。在应用启动时会创建一个线程池 + 多个内存队列 + 多个任务线程,每个任务线程都会负责处理其中一个内存队列中的缓存消息,这些任务线程都会被添加到这个线程池中执行。在消费缓存消息时,缓存消息就会不断被添加到对应的内存队列中,这样就实现了多线程处理消费到的缓存消息。
四.为了避免消费缓存消息出现问题启会动定时任务检查消费是否异常
定时任务会每分钟执行一次,检查缓存消息的消费是否出现问题。如果出现问题,则从DB中获取具体的缓存数据来更新缓存。
2.缓存与数据库一致性服务的注解
(1)注解的定义
(2)注解的使用
(1)注解的定义
需要实现缓存与DB一致性的方法在使用该注解时需要注意:
一.指定具体的缓存名称
对应于注解中的cacheKey字段。
二.指定第几个入参参数作为缓存的内容
对应于注解中的index字段。
三.指定操作类型
对应于注解中的operationType字段,1是新增或修改,2是删除。
四.指定发送哪种缓存数据的消息
对应于注解中的messageType,0是普通缓存的消息,1是热点缓存的消息。用来保证DB和缓存的一致性时,消息类型为普通缓存的消息。用来保证本地缓存和分布式缓存的一致性时,消息类型为热点缓存的消息。
五.指定使用的缓存组件类型
对应于注解中的cacheType字段,1是Redis,2是Tair。
六.指定发送的缓存数据消息所属的MQ分组
对应于注解中的mqCacheKey字段,指定了消息分组key。同⼀分组的消息会路由到同⼀Patition,保证消息不会被多个消费者消费。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CacheRefresh {//缓存消息keyString mqCacheKey();//缓存的keyString cacheKey();//需要缓存的值在方法参数中的坐标偏移量,默认不传取第一个参数String index() default "0";//是否热点缓存消息:0普通缓存消息,1热点缓存消息String messageType() default "0";//缓存的操作类型:1新增/修改,2删除String operationType() default "1";//缓存的组件类别:1是Redis,2是TairString cacheType() default "1";
}
(2)注解的使用
一.使用示例
@Component
@Data
public class InventoryBucketCache {...//本地存储关于分桶信息@CacheRefresh(cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE)public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {...}...
}
二.使用场景
场景一:对于需要操作DB和缓存的⽅法,为了保证数据⼀致性,可以通过注解实现DB和缓存的数据⼀致性。
场景二:对于需要操作本地缓存和分布式缓存的方法,为了保证数据一致性,可以通过注解实现本地缓存和分布式缓存的数据一致性。
3.缓存与数据库一致性服务的处理入口
(1)通过自定义的注解 + AOP切面来处理缓存数据
(2)对读写队列中的缓存数据进⾏持久化
(1)通过自定义的注解 + AOP切面来处理缓存数据
在执行完被注解修饰的方法后,例如该方法向数据库更新了数据。那么AOP切面就会先将缓存数据写入到读写队列,然后发一条缓存数据消息到MQ由消息系统消费进行缓存更新处理。
其中,读写队列中的缓存数据会被定时每秒批量写入到DB,而进行定时每秒批量写入是因为直接单条写入DB,可能会对DB造成压力,以及如果将缓存数据同步写入DB会影响添加了注解的方法的性能。而DB中的缓存数据主要用来进行兜底检查,所以允许部分丢失。
//刷新缓存的自定义注解
@Aspect
@Component
public class CacheRefreshAspect {@Autowiredprivate DataRefreshProducer producer;@Autowiredprivate CacheRefreshConverter cacheRefreshConverter;@Autowiredprivate CacheQueue cacheQueue;//切入点,@CacheRefresh注解标注的@Pointcut("@annotation(com.demo.eshop.cache.annotation.CacheRefresh)")public void pointcut() {}//环绕通知,在方法执行前后//@param point 切入点//@return 结果@Around("pointcut() && @annotation(cacheRefresh)")public Object around(ProceedingJoinPoint point, CacheRefresh cacheRefresh) throws Throwable {//签名信息Signature signature = point.getSignature();//强转为方法信息MethodSignature methodSignature = (MethodSignature) signature;//参数名称String[] parameterNames = methodSignature.getParameterNames();//参数值Object[] parameterValues = point.getArgs();Object response;try {//先执行本地方法再执行异步的操作response = point.proceed();} catch (Throwable throwable) {log.error("执行方法: {}失败,异常信息: {}", methodSignature.getMethod().getName(), throwable);throw throwable;}try {MessageCache messageCache = new MessageCache();for (int i = 0; i < parameterValues.length; i++) {if (parameterNames[i].equals(cacheRefresh.cacheKey())) {messageCache.setCacheKey(String.valueOf(parameterValues[i]));}if (Integer.valueOf(cacheRefresh.index()) == i) {messageCache.setCacheJSON(JSONObject.toJSONString(parameterValues[i]));}}messageCache.setOperationType(Integer.valueOf(cacheRefresh.operationType()));//给定一个有序的版本号(默认统一的工作ID和数据中心ID)messageCache.setVersion(SnowflakeIdWorker.getVersion());messageCache.setMessageType(Integer.valueOf(cacheRefresh.messageType()));messageCache.setCacheType(Integer.valueOf(cacheRefresh.cacheType()));messageCache.setCreateDate(new Date());//将缓存数据写入读写队列//缓存数据写入读写队列后,会定时每秒批量写入数据库(缓存数据写入DB只用于兜底,所以偶尔出现丢失并不影响)DataRefreshDetailDO dataRefreshDetailDO = cacheRefreshConverter.converter(messageCache);cacheQueue.submit(dataRefreshDetailDO);//发送MQ消息去处理缓存数据,比如将缓存数据更新到缓存上//一般来说,热点缓存会比普通缓存少很多,所以普通缓存的更新会比较多,热点缓存的更新会比较少//此外,热点缓存的更新会对时效性要求比较高,通过消息去异步处理本来就已存在一定的延迟//所以这里将普通缓存和热点缓存的更新进行分开处理,减少时效性高的消息的处理延迟if (CacheConstant.MESSAGE_TYPE_HOT.equals(cacheRefresh.messageType())) {producer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(messageCache), "热点缓存消息发送");} else {producer.sendMessage(RocketMqConstant.DATA_MESSAGE_CACHE_TOPIC, JSONObject.toJSONString(messageCache), cacheRefresh.mqCacheKey(), "通用缓存消息发送");}} catch (Exception e) {log.error("处理缓存同步:{}失败,异常信息:{}", methodSignature.getMethod().getName(), e);}return response;}
}//消息缓存处理对象
@Data
public class MessageCache implements Serializable {//缓存的keyprivate String cacheKey;//缓存的操作类型private Integer operationType;//使用的缓存类型,1为Redis,2为Tairprivate Integer cacheType;//缓存的消息内容private String cacheJSON;//消息的版本号(默认用时间戳来标记先后顺序)private String version;//缓存的数据状态是否还有效(0默认有效,1无效)private Integer cacheStatus = 0;//消息的创建时间private Date createDate;//是否热点消息,0普通消息,1热点消息private Integer messageType = 0;
}//外部消息处理对象
@Data
@TableName("data_refresh_detail")
public class DataRefreshDetailDO extends BaseEntity {//缓存的keyprivate String cacheKey;//缓存的操作类型private Integer operationType = 1;//使用的缓存类型,1为Redis,2为Tairprivate Integer cacheType;//缓存的消息内容private String cacheJSON;//消息的版本号(默认用时间戳来标记先后顺序)private String version;//是否热点消息,0普通消息,1热点消息private Integer messageType = 0;
}//缓存数据的读写队列
@Component
public class CacheQueue {//提供锁的实例对象private final PutDataLock lock = new PutDataLock();//缓存数据的写队列private volatile List<DataRefreshDetailDO> writeQueue = new LinkedList<>();//缓存数据的读队列private volatile List<DataRefreshDetailDO> readQueue = new LinkedList<>();//是否正在写入数据private volatile boolean isWrite = false;...//缓存数据写入写队列//@param dataRefreshDetailDO db存储对象public void submit(DataRefreshDetailDO dataRefreshDetailDO) {lock.lock();try {writeQueue.add(dataRefreshDetailDO);} finally {lock.unlock();}}...
}//锁竞争类对象
//由于使用这个自旋锁是用于处理内存操作的,所以会很快处理完,可以忽略CPU的消耗
public class PutDataLock {private final AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);//上锁public void lock() {boolean flag;do {flag = this.putMessageSpinLock.compareAndSet(true, false);}while (!flag);}//解锁public void unlock() {this.putMessageSpinLock.compareAndSet(false, true);}
}
(2)对读写队列中的缓存数据进⾏持久化
项⽬启动在初始化数据源时,会同时启动⼀个定时调度任务,这个定时任务就会负责每隔1秒把读写队列中的缓存数据批量写⼊DB。DB中的这些数据主要用来兜底,允许部分丢失。
//数据源配置
@Component
public class DataSourceConfig extends AbstractDataSourceConfig {private SqlSessionTemplate sqlSessionTemplate;//存储数据源对象@Autowiredprivate CacheQueue cacheQueue;//初始化加载目前需要进行数据源的相关配置@PostConstructpublic void initMigrateDateSource() throws Exception {//加载数据源DataSource dataSource = buildDataSource();SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();sqlSessionFactory.setDataSource(dataSource);sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory.getObject());//启动一个定时任务触发写入缓存数据到DB,每隔1秒触发一次,避免每次有缓存请求都执行DB操作ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//判断当前是否正在将读写队列中的缓存数据写入DBif (!cacheQueue.getIsWrite()) {//提交写队列中的缓存数据,然后写入DBcacheQueue.doCommit();}}}, 1000, 1000, TimeUnit.MILLISECONDS);}//获取数据源public SqlSession getSqlSession() {try {return SqlSessionUtils.getSqlSession(sqlSessionTemplate.getSqlSessionFactory(), sqlSessionTemplate.getExecutorType(), sqlSessionTemplate.getPersistenceExceptionTranslator());} catch (Exception e) {log.error("加载数据源对应连接池失败", e);}return null;}//关闭sqlSessionpublic void closeSqlSession(SqlSession session) {SqlSessionUtils.closeSqlSession(session, sqlSessionTemplate.getSqlSessionFactory());}
}//数据源配置
public abstract class AbstractDataSourceConfig {//构建数据源public DruidDataSource buildDataSource() throws IOException {Properties prop = new Properties();InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("cache.properties");prop.load(inputStream);inputStream.close();DruidDataSource druidDataSource = DataSourceBuilder.create().type(DruidDataSource.class).driverClassName(prop.getProperty("datasource.driver-class-name")).url(prop.getProperty("datasource.url")).username(prop.getProperty("datasource.username")).password(prop.getProperty("datasource.password")).build();druidDataSource.setTestOnBorrow(true);druidDataSource.setTestWhileIdle(true);return druidDataSource;}
}//缓存数据的读写队列
@Component
public class CacheQueue {//提供锁的实例对象private final PutDataLock lock = new PutDataLock();//缓存数据的写队列private volatile List<DataRefreshDetailDO> writeQueue = new LinkedList<>();//缓存数据的读队列private volatile List<DataRefreshDetailDO> readQueue = new LinkedList<>();//是否正在写入数据private volatile boolean isWrite = false;@Autowiredprivate DataRefreshService dataRefreshService;...//交换读写队列private void swapRequests() {lock.lock();try {List<DataRefreshDetailDO> tmp = writeQueue;writeQueue = readQueue;readQueue = tmp;} finally {lock.unlock();}}//提交写队列中的缓存数据,然后写入DBpublic void doCommit() {this.isWrite = true;//交互读写队列后,再将读队列中的缓存数据写入DBswapRequests();if (!readQueue.isEmpty()) {//先进行数据切割,每次写入DB的记录为500条List<List<DataRefreshDetailDO>> dataRefreshDetailList = DataCuttingUtil.dataCuttingString(readQueue, CollectionSize.WRITE_SIZE);for (List<DataRefreshDetailDO> dataRefreshDetailDOS : dataRefreshDetailList) {dataRefreshService.saveDataRefreshDetailList(dataRefreshDetailDOS);}}readQueue.clear();this.isWrite = false;}//每隔1秒执行的定时任务会调用这个方法,判断当前是否正在将读写队列中的缓存数据写入DB//@return 是否正在读取public Boolean getIsWrite() {return this.isWrite;}...
}//将缓存数据写入DB
@Service
public class DataRefreshServiceImpl implements DataRefreshService {@Resourceprivate DataSourceConfig dataSourceConfig;@Overridepublic void saveDataRefreshDetailList(List<DataRefreshDetailDO> dataRefreshDetailDOList) {SqlSession session = null;PreparedStatement pst = null;try {StringBuffer sql = new StringBuffer();sql.append("INSERT INTO data_refresh_detail(cache_key, operation_type, cache_json, version, message_type, cache_type, create_time, update_time) values (?,?,?,?,?,?,now(),now())");session = dataSourceConfig.getSqlSession();pst = session.getConnection().prepareStatement(sql.toString());for (DataRefreshDetailDO dataRefreshDetailDO : dataRefreshDetailDOList) {pst.setString(1, dataRefreshDetailDO.getCacheKey());pst.setInt(2, dataRefreshDetailDO.getOperationType());pst.setString(3, dataRefreshDetailDO.getCacheJSON());pst.setString(4, dataRefreshDetailDO.getVersion());pst.setInt(5, dataRefreshDetailDO.getMessageType());pst.setInt(6, dataRefreshDetailDO.getCacheType());pst.addBatch();}pst.executeBatch();} catch (Exception e) {log.error("sql执行失败:{}", e);} finally {closeSqlSession(session, pst);}}//关闭连接private void closeSqlSession(SqlSession session, PreparedStatement pst) {if (pst != null) {try {pst.close();} catch (SQLException e) {e.printStackTrace();}}dataSourceConfig.closeSqlSession(session);}
}
4.缓存与数据库一致性服务的消费缓存消息
(1)消费缓存消息时会将缓存消息添加到内存队列
(2)应用启动时会初始化多个内存队列并创建对应的任务线程处理每个内存队列
(3)当缓存的操作类型为新增或者修改时的处理逻辑
(4)当缓存的操作类型为删除时的处理逻辑
(1)消费缓存消息时会将缓存消息添加到内存队列
通过消费缓存消息来执行缓存的处理逻辑。首先根据缓存消息里的缓存key,通过Hash定位获取对应的内存队列。然后将消息添加到该内存队列中,从而实现多线程处理消费到的缓存消息。
为了对缓存消息进行兜底处理,每秒会记录⼀次缓存消息的最新消费时间。如果最新消费时间,⽐写入DB的最新数据的创建时间晚了超过1分钟,则会认为⽬前的消息消费出现问题,此时会有另外⼀个定时任务进⾏处理。
//缓存消息处理
@Component
public class DataMessageCacheListener implements MessageListenerConcurrently {@Autowiredprivate CacheQueue cacheQueue;@Autowiredprivate RedisCache redisCache;//上次记录消费的时间private long lastTimestamp = -1L;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : list) {try {String messageData = new String(messageExt.getBody());log.info("DataMessageCacheListener缓存数据变更刷新,消息内容:{}", messageData);MessageCache messageCache = JsonUtil.json2Object(messageData, MessageCache.class);//根据消息的缓存key,获取到对应的内存队列,分散队列提高处理效率,并保证单key的执行不会并发BlockingQueue blockingQueue = cacheQueue.getBlockingQueue(messageCache.getCacheKey());//将缓存消息添加到对应的内存队列中blockingQueue.offer(messageCache);//记录最新的消费数据时间setCacheRefreshTime();} catch (Exception e) {log.error("consume error, 缓存消息写入队列失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//记录最新的消费时间,为避免无效的set操作,这里控制每秒最多执行一次setprivate synchronized void setCacheRefreshTime() {//获取到当前的时间,精确到秒long timestamp = System.currentTimeMillis() / 1000;//同一个时间则默认不处理if (lastTimestamp == timestamp) {return;}//标记最新MQ消息的接收时间//如果写入DB的最新数据的创建时间,⽐消息的最新消费时间早1分钟以上//则认为⽬前的消息消费出现问题,此时会有另外⼀个线程去查询出DB的数据来刷新缓存(兜底处理)redisCache.set(CacheConstant.CACHE_ROCKET_TIME_KEY, DateFormatUtils.format(new Date(), DateConstant.DATE_TIME_FORMAT_PATTERN), 0);lastTimestamp = timestamp;}
}
(2)应用启动时会初始化多个内存队列并创建对应的任务线程处理每个内存队列
每个内存队列会对应线程池中的一个任务线程,该任务线程会处理添加到其负责处理的内存队列中的缓存消息。缓存消息按操作类型可分为2类:⼀是新增或修改,⼀是删除。
线程池在应用启动时就会开始运行,并创建多个任务线程,每个任务线程都会处理其负责的内存队列中的缓存消息。而在消费缓存消息时,这些缓存消息会不断被添加到对应的内存队列中,这样就实现了多线程处理消费到的缓存消息。
//处理缓存消息时使用的内存队列
@Component
public class CacheQueue {//处理缓存消息的内存队列private final List<BlockingQueue> messageQueue = new ArrayList<>();//配置的消息内存队列数量@Value("${message.queue-num}")private Integer messageQueueNum;@PostConstructpublic void init() {ExecutorService executors = Executors.newFixedThreadPool(messageQueueNum);for (int i = 0; i < messageQueueNum; i++) {//设置一个队列最大容纳数量BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);messageQueue.add(blockingQueue);//每个内存队列都对应线程池executors里的一个任务线程任务,该任务线程会处理添加到该内存队列中的缓存消息executors.execute(new CacheConsistencyRunner(blockingQueue));}}//对消息的key进行hash处理,从而定位到具体的内存队列上public BlockingQueue getBlockingQueue(String key) {//先获取到传入的消息key对应的hash值long hash = HashUtil.murMurHash(key.getBytes());//计算出对应的内存队列int c = (int) (hash %= messageQueue.size());BlockingQueue blockingQueue = messageQueue.get(c);return blockingQueue;}
}//处理内存队列中缓存消息的线程
public class CacheConsistencyRunner implements Runnable {//缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改private final static Integer failureTime = 60;//存放缓存消息的内存队列private BlockingQueue blockingQueue;private RedisCache redisCache;private TairCache tairCache;private DefaultProducer defaultProducer;public CacheConsistencyRunner(BlockingQueue blockingQueue) {this.blockingQueue = blockingQueue;this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);this.tairCache = ApplicationContextUtil.getBean(TairCache.class);this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);}//处理一个内存队列中的缓存消息的线程@Overridepublic void run() {try {//TODO 对blockingQueue使用wait+notify实现内存队列为空时线程挂起while (true) {MessageCache cache = (MessageCache) blockingQueue.take();//先判断是不是删除类型的缓存操作if (MessageOperationEnum.DELETE.getCode().equals(cache.getOperationType())) {deleteCache(cache);} else {//其它类型操作都是修改或者新增refreshCache(cache);}}} catch (Exception e) {e.printStackTrace();log.error("处理缓存消息异常", e);}}...
}
(3)当缓存的操作类型为新增或者修改时的处理逻辑
⾸先判断这个消息是否为僵⼫消息,⽐如1⼩时前的消息就没必要处理了。然后根据缓存消息中的缓存组件类型 + 缓存key,从缓存⾥获取缓存数据。如果缓存不存在则直接覆盖,如果缓存存在就需要判断⼀下各⾃的版本号,以版本号最新的为准。
同时要注意:第一.消息缓存组件是什么,根据缓存组件去使用对应的缓存⼯具类。第二.消息是否是热点缓存,热点数据还需要进⾏⼀次⼴播,从而让其它对这个本地缓存有需要的服务也刷新其本地缓存。
//处理内存队列中缓存消息的线程
public class CacheConsistencyRunner implements Runnable {//缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改private final static Integer failureTime = 60;//存放缓存消息的内存队列private BlockingQueue blockingQueue;private RedisCache redisCache;private TairCache tairCache;private DefaultProducer defaultProducer;public CacheConsistencyRunner(BlockingQueue blockingQueue) {this.blockingQueue = blockingQueue;this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);this.tairCache = ApplicationContextUtil.getBean(TairCache.class);this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);}...//刷新缓存,或者新增缓存private void refreshCache(MessageCache cache) {log.info("开始处理新增或者修改缓存{}", JSONObject.toJSONString(cache));//处理缓存之前,先看看这个消息的时间是多少,避免复活一些僵尸数据Boolean isCache = DateFormatUtil.compareTo(cache.getCreateDate(), failureTime);//如果这条消息的创建时间已经超过的有效期,那么视为无效消息不处理if (!isCache) {return;}//获取缓存String cacheStr = getCache(cache.getCacheType(), cache.getCacheKey());//当前没有缓存记录,直接覆盖一条if (StringUtils.isEmpty(cacheStr)) {setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(cache), 0);send(cache);return;}MessageCache messageCache = JsonUtil.json2Object(cacheStr, MessageCache.class);//判断版本和缓存版本的记录谁更新,记录以最新的为准,避免低版本覆盖高版本if (cache.getVersion().compareTo(messageCache.getVersion()) > 0) {setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(cache), 0);send(cache);}}//获取缓存keyprivate String getCache(Integer cacheType, String cacheKey) {if (CaCheTypeEnum.REDIS.getCode().equals(cacheType)) {return redisCache.get(cacheKey);}return tairCache.get(cacheKey);}//设置缓存private void setCache(Integer cacheType, String cacheKey, String cacheValue, Integer seconds) {if (CaCheTypeEnum.REDIS.getCode().equals(cacheType)) {redisCache.set(cacheKey, cacheValue, seconds);} else {tairCache.set(cacheKey, cacheValue, seconds);}}//发送消息public void send(MessageCache cache) {//热点数据才处理发送广播消息if (cache.getMessageType().equals(1)) {defaultProducer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(cache), 0, "广播消息发送");}}...
}
(4)当缓存的操作类型为删除时的处理逻辑
进行缓存删除时需要注意的是缓存穿透,也就是说删除缓存操作其实只是将这个记录标记为已删除的状态。如果本来就没有这个缓存则需要设置它的默认版本号为-1,后续对比的时候默认将⼤于这个版本号的删除缓存标记为已删除,也和正常的增改缓存操作对应的版本号对比起来,从而避免错误的顺序影响实际的缓存结果。
//处理内存队列中缓存消息的线程
public class CacheConsistencyRunner implements Runnable {//缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改private final static Integer failureTime = 60;//存放缓存消息的内存队列private BlockingQueue blockingQueue;private RedisCache redisCache;private TairCache tairCache;private DefaultProducer defaultProducer;public CacheConsistencyRunner(BlockingQueue blockingQueue) {this.blockingQueue = blockingQueue;this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);this.tairCache = ApplicationContextUtil.getBean(TairCache.class);this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);}...//删除缓存private void deleteCache(MessageCache cache) {//获取缓存String cacheStr = getCache(cache.getCacheType(), cache.getCacheKey());MessageCache messageCache = null;if (StringUtils.isEmpty(cacheStr)) {messageCache = new MessageCache();messageCache.setVersion("-1");} else {messageCache = JsonUtil.json2Object(cacheStr, MessageCache.class);}//判断一下新的请求版本是否超过或者等于缓存的版本//如果是则标记为已删除,同时缓存60分钟,避免缓存穿透以及可能的新增无效请求无版本可比对if (cache.getVersion().compareTo(messageCache.getVersion()) >= 0) {cache.setCacheStatus(1);setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(messageCache), failureTime * 60);send(cache);}}//设置缓存private void setCache(Integer cacheType, String cacheKey, String cacheValue, Integer seconds) {if (CaCheTypeEnum.REDIS.getCode().equals(cacheType)) {redisCache.set(cacheKey, cacheValue, seconds);} else {tairCache.set(cacheKey, cacheValue, seconds);}}//发送消息public void send(MessageCache cache) {//热点数据才处理发送广播消息if (cache.getMessageType().equals(1)) {defaultProducer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(cache), 0, "广播消息发送");}}...
}//消息缓存处理对象
@Data
public class MessageCache implements Serializable {//缓存的keyprivate String cacheKey;//缓存的操作类型private Integer operationType;//使用的缓存类型,1为Redis,2为Tairprivate Integer cacheType;//缓存的消息内容private String cacheJSON;//消息的版本号(默认用时间戳来标记先后顺序)private String version;//缓存的数据状态是否还有效(0默认有效,1无效)private Integer cacheStatus = 0;//消息的创建时间private Date createDate;//是否热点消息,0普通消息,1热点消息private Integer messageType = 0;
}
5.缓存与数据库一致性服务的消费检查
(1)定时任务检查消费是否发生异常
(2)从缓存中获取MQ缓存消息的最新消费时间
(3)按最新消费时间来分⻚查询DB的缓存数据
(1)定时任务检查消费是否发生异常
为了避免从DB查询出的缓存数据量过⼤,导致处理时间超过1分钟,从而出现同时有多个定时检查任务在执行,所以需要加⼀个分布式锁。
//负责定时检查消费是否发生异常而需要从DB查询缓存数据并刷新缓存
@Component
public class DataRefreshTask {@Autowiredprivate RedisLock redisLock;@Autowiredprivate MessageService messageService;//每分钟验证下是否触发缓存DB兜底@Scheduled(fixedDelay = 60000)void DataRefreshTask() {boolean lock = redisLock.lock(CacheConstant.CACHE_LOCK_KEY);try {if (lock) {messageService.outDataCacheRefresh();}} finally {redisLock.unlock(CacheConstant.CACHE_LOCK_KEY);}}
}
(2)从缓存中获取MQ缓存消息的最新消费时间
如果没有最新消费时间,则默认取前1个⼩时的时间。如果有最新消费时间,则减去⼀分钟,避免和消息处理出现重复。
(3)按最新消费时间来分⻚查询DB的缓存数据
将查询到的数据按缓存key定位hash写⼊到具体的内存队列中,复⽤消息消费缓存的定时线程任务处理对应的缓存逻辑。
@Service
public class MessageServiceImpl implements MessageService {@Autowiredprivate RedisCache redisCache;@Autowiredprivate DataRefreshRepository dataRefreshRepository;@Autowiredprivate CacheQueue cacheQueue;...//从DB查询缓存数据并刷新缓存@Overridepublic void outDataCacheRefresh() {//1.先获取缓存中最新的消息消费时间,这里先减去1分钟,避免和MQ的处理直接重复String createDate = redisCache.get(CacheConstant.CACHE_ROCKET_TIME_KEY);if (StringUtils.isEmpty(createDate)) {//如果缓存都不存在最新消费时间,默认处理1个小时内的DB数据,再超过前的缓存没有处理必要createDate = DateFormatUtil.getHoursDate(-1);} else {//缓存时间存在,对时间减去一分钟进行处理createDate = DateFormatUtil.getMinuteDate(createDate, -1);}//每次处理DB兜底的查询之前,先删除掉查询时间范围外的数据,避免数据一直写入导致数据量过大影响性能dataRefreshRepository.deleteDataRefresh(DateFormatUtil.getHoursDate(-1));//2.获取是否有超过的最新消费时间的数据落入DB(默认查询超过1分钟还未消费的数据)int pageNum = 1;//设置每次查询的数据量,最大为500int pageSize = CollectionSize.WRITE_SIZE;Page<DataRefreshDetailDO> page = new Page<>(pageNum, pageSize);Page<DataRefreshDetailDO> pageResult = dataRefreshRepository.queryDataRefreshDetailDOList(page, createDate);List<DataRefreshDetailDO> dataRefreshDetailList = pageResult.getRecords();//将缓存数据写入内存队列进行处理dataRefreshQueue(dataRefreshDetailList);try {while (pageNum <= pageResult.getTotal()) {pageNum += 1;page.setCurrent(pageNum);pageResult = dataRefreshRepository.queryDataRefreshDetailDOList(page, createDate);//将缓存数据写入内存队列进行处理dataRefreshQueue(pageResult.getRecords());//每次循环获取数据后,休眠20ms,避免对数据库造成太大压力Thread.sleep(20);}} catch (InterruptedException e) {throw new BaseBizException(ProductExceptionCode.PRODUCT_SQL);}}//将数据写入队列进行处理private void dataRefreshQueue(List<DataRefreshDetailDO> dataRefreshDetailList) {if (!CollectionUtils.isEmpty(dataRefreshDetailList)) {for (DataRefreshDetailDO dataRefreshDetailDO : dataRefreshDetailList) {MessageCache messageCache = dataMessageConverter.converter(dataRefreshDetailDO);BlockingQueue blockingQueue = cacheQueue.getBlockingQueue(messageCache.getCacheKey());blockingQueue.offer(messageCache);}}}...
}@Repository
public class DataRefreshRepository {@Resourceprivate DataRefreshMapper dataRefreshMapper;//获取得到消息最近消费时间的记录(默认查询大于这个时间点大于1分钟的数据)public Page<DataRefreshDetailDO> queryDataRefreshDetailDOList(Page<DataRefreshDetailDO> page, String createDate) {LambdaQueryWrapper<DataRefreshDetailDO> queryWrapper = Wrappers.lambdaQuery();//查询创建时间大于这个版本号且未处理的数据queryWrapper.gt(DataRefreshDetailDO::getVersion, createDate);queryWrapper.eq(DataRefreshDetailDO::getCacheStatus, 0);return dataRefreshMapper.selectPage(page, queryWrapper);}//删除超过一定时间区间的缓存DB数据public void deleteDataRefresh(String createDate) {LambdaUpdateWrapper<DataRefreshDetailDO> updateWrapper = Wrappers.lambdaUpdate();updateWrapper.lt(DataRefreshDetailDO::getVersion, createDate);dataRefreshMapper.delete(updateWrapper);}
}
6.缓存与数据库一致性服务的实现总结
一.缓存 + DB双写的注解与AOP切面实现
二.先执行DB写入再基于AOP异步写缓存
三.缓存数据双写之缓存消息写MQ + 缓存数据写内存队列再延迟写DB
四.缓存消息基于内存双队列异步批量写DB
五.内存双队列定时交换与Batch切分
六.基于双内存队列实现定时批量写入DB
七.基于缓存key的Hash值实现内存队列分发
八.消息基于内存队列分发给线程后写入缓存
九.基于定时任务查询DB中缓存消息实现补偿
一.缓存 + DB双写的注解与AOP切面实现
二.先执行DB写入再基于AOP异步写缓存
三.缓存数据双写之缓存消息写MQ + 缓存数据写内存队列再延迟写DB
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CacheRefresh {//缓存消息keyString mqCacheKey();//缓存的keyString cacheKey();//需要缓存的值在方法参数中的坐标偏移量,默认不传取第一个参数String index() default "0";//是否热点缓存消息:0普通缓存消息,1热点缓存消息String messageType() default "0";//缓存的操作类型:1新增/修改,2删除String operationType() default "1";//缓存的组件类别:1是Redis,2是TairString cacheType() default "1";
}//刷新缓存的自定义注解
@Aspect
@Component
public class CacheRefreshAspect {@Autowiredprivate DataRefreshProducer producer;@Autowiredprivate CacheRefreshConverter cacheRefreshConverter;@Autowiredprivate CacheQueue cacheQueue;//切入点,@CacheRefresh注解标注的@Pointcut("@annotation(com.demo.eshop.cache.annotation.CacheRefresh)")public void pointcut() {}//环绕通知,在方法执行前后//@param point 切入点//@return 结果@Around("pointcut() && @annotation(cacheRefresh)")public Object around(ProceedingJoinPoint point, CacheRefresh cacheRefresh) throws Throwable {//签名信息Signature signature = point.getSignature();//强转为方法信息MethodSignature methodSignature = (MethodSignature) signature;//参数名称String[] parameterNames = methodSignature.getParameterNames();//参数值Object[] parameterValues = point.getArgs();Object response;try {//先执行本地方法再执行异步的操作response = point.proceed();} catch (Throwable throwable) {log.error("执行方法: {}失败,异常信息: {}", methodSignature.getMethod().getName(), throwable);throw throwable;}try {MessageCache messageCache = new MessageCache();for (int i = 0; i < parameterValues.length; i++) {if (parameterNames[i].equals(cacheRefresh.cacheKey())) {messageCache.setCacheKey(String.valueOf(parameterValues[i]));}if (Integer.valueOf(cacheRefresh.index()) == i) {messageCache.setCacheJSON(JSONObject.toJSONString(parameterValues[i]));}}messageCache.setOperationType(Integer.valueOf(cacheRefresh.operationType()));//给定一个有序的版本号(默认统一的工作ID和数据中心ID)messageCache.setVersion(SnowflakeIdWorker.getVersion());messageCache.setMessageType(Integer.valueOf(cacheRefresh.messageType()));messageCache.setCacheType(Integer.valueOf(cacheRefresh.cacheType()));messageCache.setCreateDate(new Date());//将缓存数据写入读写队列//缓存数据写入读写队列后,会定时每秒批量写入数据库(缓存数据写入DB只用于兜底,所以偶尔出现丢失并不影响)DataRefreshDetailDO dataRefreshDetailDO = cacheRefreshConverter.converter(messageCache);cacheQueue.submit(dataRefreshDetailDO);//发送MQ消息去处理缓存数据,比如将缓存数据更新到缓存上//一般来说,热点缓存会比普通缓存少很多,所以普通缓存的更新会比较多,热点缓存的更新会比较少//此外,热点缓存的更新会对时效性要求比较高,通过消息去异步处理本来就已存在一定的延迟//所以这里将普通缓存和热点缓存的更新进行分开处理,减少时效性高的消息的处理延迟if (CacheConstant.MESSAGE_TYPE_HOT.equals(cacheRefresh.messageType())) {producer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(messageCache), "热点缓存消息发送");} else {producer.sendMessage(RocketMqConstant.DATA_MESSAGE_CACHE_TOPIC, JSONObject.toJSONString(messageCache), cacheRefresh.mqCacheKey(), "通用缓存消息发送");}} catch (Exception e) {log.error("处理缓存同步:{}失败,异常信息:{}", methodSignature.getMethod().getName(), e);}return response;}
}
四.缓存消息基于内存双队列异步批量写DB
五.内存双队列定时交换与Batch切分
六.基于双内存队列实现定时批量写入DB
//缓存数据的读写队列
@Component
public class CacheQueue {//提供锁的实例对象private final PutDataLock lock = new PutDataLock();//缓存数据的写队列private volatile List<DataRefreshDetailDO> writeQueue = new LinkedList<>();//缓存数据的读队列private volatile List<DataRefreshDetailDO> readQueue = new LinkedList<>();//是否正在写入数据private volatile boolean isWrite = false;@Autowiredprivate DataRefreshService dataRefreshService;//缓存数据写入写队列//@param dataRefreshDetailDO db存储对象public void submit(DataRefreshDetailDO dataRefreshDetailDO) {lock.lock();try {writeQueue.add(dataRefreshDetailDO);} finally {lock.unlock();}}//交换读写队列private void swapRequests() {lock.lock();try {List<DataRefreshDetailDO> tmp = writeQueue;writeQueue = readQueue;readQueue = tmp;} finally {lock.unlock();}}//提交写队列中的缓存数据,然后写入DBpublic void doCommit() {this.isWrite = true;//交互读写队列后,再将读队列中的缓存数据写入DBswapRequests();if (!readQueue.isEmpty()) {//先进行数据切割,每次写入DB的记录为500条List<List<DataRefreshDetailDO>> dataRefreshDetailList = DataCuttingUtil.dataCuttingString(readQueue, CollectionSize.WRITE_SIZE);for (List<DataRefreshDetailDO> dataRefreshDetailDOS : dataRefreshDetailList) {dataRefreshService.saveDataRefreshDetailList(dataRefreshDetailDOS);}}readQueue.clear();this.isWrite = false;}//每隔1秒执行的定时任务会调用这个方法,判断当前是否正在将读写队列中的缓存数据写入DB//@return 是否正在读取public Boolean getIsWrite() {return this.isWrite;}
}//数据源配置
@Component
public class DataSourceConfig extends AbstractDataSourceConfig {private SqlSessionTemplate sqlSessionTemplate;//存储数据源对象@Autowiredprivate CacheQueue cacheQueue;//初始化加载目前需要进行数据源的相关配置@PostConstructpublic void initMigrateDateSource() throws Exception {//加载数据源DataSource dataSource = buildDataSource();SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();sqlSessionFactory.setDataSource(dataSource);sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory.getObject());//启动一个定时任务触发写入缓存数据到DB,每隔1秒触发一次,避免每次有缓存请求都执行DB操作ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//判断当前是否正在将读写队列中的缓存数据写入DBif (!cacheQueue.getIsWrite()) {//提交写队列中的缓存数据,然后写入DBcacheQueue.doCommit();}}}, 1000, 1000, TimeUnit.MILLISECONDS);}...
}
七.基于缓存key的Hash值实现内存队列分发
八.消息基于内存队列分发给线程后写入缓存
九.基于定时任务查询DB中缓存消息实现补偿
//缓存消息处理
@Component
public class DataMessageCacheListener implements MessageListenerConcurrently {@Autowiredprivate CacheQueue cacheQueue;@Autowiredprivate RedisCache redisCache;//上次记录消费的时间private long lastTimestamp = -1L;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : list) {try {String messageData = new String(messageExt.getBody());log.info("DataMessageCacheListener缓存数据变更刷新,消息内容:{}", messageData);MessageCache messageCache = JsonUtil.json2Object(messageData, MessageCache.class);//根据消息的缓存key,获取到对应的内存队列,分散队列提高处理效率,并保证单key的执行不会并发BlockingQueue blockingQueue = cacheQueue.getBlockingQueue(messageCache.getCacheKey());//将缓存消息添加到对应的内存队列中blockingQueue.offer(messageCache);//记录最新的消费数据时间setCacheRefreshTime();} catch (Exception e) {log.error("consume error, 缓存消息写入队列失败", e);//本次消费失败,下次重新消费return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}...
}//处理缓存消息时使用的内存队列
@Component
public class CacheQueue {//处理缓存消息的内存队列private final List<BlockingQueue> messageQueue = new ArrayList<>();//配置的消息内存队列数量@Value("${message.queue-num}")private Integer messageQueueNum;@PostConstructpublic void init() {ExecutorService executors = Executors.newFixedThreadPool(messageQueueNum);for (int i = 0; i < messageQueueNum; i++) {//设置一个队列最大容纳数量BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);messageQueue.add(blockingQueue);//每个内存队列都对应线程池executors里的一个任务线程任务,该任务线程会处理添加到该内存队列中的缓存消息executors.execute(new CacheConsistencyRunner(blockingQueue));}}//对消息的key进行hash处理,从而定位到具体的内存队列上public BlockingQueue getBlockingQueue(String key) {//先获取到传入的消息key对应的hash值long hash = HashUtil.murMurHash(key.getBytes());//计算出对应的内存队列int c = (int) (hash %= messageQueue.size());BlockingQueue blockingQueue = messageQueue.get(c);return blockingQueue;}
}//处理内存队列中缓存消息的线程
public class CacheConsistencyRunner implements Runnable {//缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改private final static Integer failureTime = 60;//存放缓存消息的内存队列private BlockingQueue blockingQueue;private RedisCache redisCache;private TairCache tairCache;private DefaultProducer defaultProducer;public CacheConsistencyRunner(BlockingQueue blockingQueue) {this.blockingQueue = blockingQueue;this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);this.tairCache = ApplicationContextUtil.getBean(TairCache.class);this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);}//处理一个内存队列中的缓存消息的线程@Overridepublic void run() {try {//TODO 对blockingQueue使用wait+notify实现内存队列为空时线程挂起while (true) {MessageCache cache = (MessageCache) blockingQueue.take();//先判断是不是删除类型的缓存操作if (MessageOperationEnum.DELETE.getCode().equals(cache.getOperationType())) {deleteCache(cache);} else {//其它类型操作都是修改或者新增refreshCache(cache);}}} catch (Exception e) {e.printStackTrace();log.error("处理缓存消息异常", e);}}//删除缓存private void deleteCache(MessageCache cache) {//获取缓存String cacheStr = getCache(cache.getCacheType(), cache.getCacheKey());MessageCache messageCache = null;if (StringUtils.isEmpty(cacheStr)) {messageCache = new MessageCache();messageCache.setVersion("-1");} else {messageCache = JsonUtil.json2Object(cacheStr, MessageCache.class);}//判断一下新的请求版本是否超过或者等于缓存的版本//如果是则标记为已删除,同时缓存60分钟,避免缓存穿透以及可能的新增无效请求无版本可比对if (cache.getVersion().compareTo(messageCache.getVersion()) >= 0) {cache.setCacheStatus(1);setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(messageCache), failureTime * 60);send(cache);}}//刷新缓存,或者新增缓存private void refreshCache(MessageCache cache) {log.info("开始处理新增或者修改缓存{}", JSONObject.toJSONString(cache));//处理缓存之前,先看看这个消息的时间是多少,避免复活一些僵尸数据Boolean isCache = DateFormatUtil.compareTo(cache.getCreateDate(), failureTime);//如果这条消息的创建时间已经超过的有效期,那么视为无效消息不处理if (!isCache) {return;}//获取缓存String cacheStr = getCache(cache.getCacheType(), cache.getCacheKey());//当前没有缓存记录,直接覆盖一条if (StringUtils.isEmpty(cacheStr)) {setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(cache), 0);send(cache);return;}MessageCache messageCache = JsonUtil.json2Object(cacheStr, MessageCache.class);//判断版本和缓存版本的记录谁更新,记录以最新的为准,避免低版本覆盖高版本if (cache.getVersion().compareTo(messageCache.getVersion()) > 0) {setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(cache), 0);send(cache);}}...
}