当前位置: 首页 > news >正文

Redis缓存与数据库 数据一致性保障

为什么要保证数据一致性

只要使用redis做缓存,就必然存在缓存和DB数据一致性问题。若数据不一致,则业务应用从缓存读取的数据就不是最新数据,可能导致严重错误。比如将商品的库存缓存在Redis,若库存数量不对,则下单时就可能出错,这是不能接受的。

旁路缓存

在使用 Redis 作为缓存时,最常见的缓存模式是 Cache-Aside(旁路缓存)模式因其灵活性、容错性以及与 Redis 的高效配合,适用于大多数业务场景。
Cache-Aside(旁路缓存) 模式,又叫 Lazy-Loading(懒加载)模式,在这种模式下,缓存的读取和写入由应用程序直接管理。

  • 读策略:先查询缓存,若缓存未命中,则从数据库中加载数据并将其存储在缓存中;
  • 写策略:应用程序直接更新数据库,并更新或删除缓存中的相关数据。

旁路缓存因其按需触发、避免冗余操作特点所以被称为懒加载。
旁路缓存模式,存在缓存一致性问题,需要小心处理缓存的清除和更新。

更新数据库并更新缓存

那么如何保证redis缓存和数据库的数据一致性呢?
直观的思维想到的是 更新数据库并更新缓存。

这种直观的做法会有连续两次更新,数据不一致问题。并且这种情况经常出现,所以一般不会采用。
在这里插入图片描述

对于缓存命中率要求很高的场景,我们可以使用更新数据库+更新缓存的方案,因为只要缓存不过期,都是可以命中的。

解决方案:

  • 加分布式锁,丢失一部分性能。
  • 加一个比较短的过期时间,哪怕缓存数据不一致也很快过期。

先删除缓存再更新数据库

请求A想要将数据库的数据修改为21,先去redis中删除了缓存,这个时候请求B来访问查询该数据,发现redis中没有该数据,就去数据库中查询了旧数据并写回到了缓存。这个时候请求A才更新数据库。
此时缓存中的数据为20,而数据库中存储的为21产生了数据不一致。

在这里插入图片描述

可以看到先删除缓存,再更新数据库会在【读+写】并发的时候,会出现数据不一致的问题。

延时双删

那么针对于先删除缓存再更新数据库的方案我们有没有解决方案呢?
就是先删除缓存再更新数据库,然后睡一段时间再删除一次缓存。

延迟一段时间删除的目的是什么?

是让线程B有足够的时间去将(旧值)写回到缓存。确保延迟一段时间后能够将线程B的脏数据删除。也是保证了最终一致性。

但是具体延迟多久,很难评估出来,所以这种方案也只能尽可能保证数据一致。但是在极端情况下,还是会有可能出现数据不一致。

先更新数据库再删除缓存

还是在【读+写】并发的场景下分析,请求A查询数据,发现缓存没有命中,查询数据库为20。此时请求B更新数据库并删除了缓存。最后请求A写回了缓存。请求A写回的是旧数据也就是20。
此时数据库中为21(新值),缓存中是20(旧值)。出现了数据不一致。
在这里插入图片描述

可以看到先更新数据库再删除缓存是有可能发生数据库的,但是这个在实际当中发生的概率很小。

因为缓存的写入通常要远远快于数据库的写入,所以实际当中,很少会出现请求B将数据库更新并且删除缓存后,请求A才写回缓存的情况。

所以先更新数据库再删除缓存的方式是可以实现数据一致性的。但是为了保险起见,给缓存数据加上过期时间。也就是即使出现了以上这种小概率时间,还是可以通过过期时间,来达到一个最终一致性的目的。

删除操作失败

但是在实际开发当中还是会出现问题,如果删除缓存操作失败,那么缓存中就会一直存放脏数据,直到key过期。
在这里插入图片描述

也就是我们要保证更新数据库之后缓存能被成功删除?

对应的就有两种解决方案:

  • 消息队列重试机制
  • 订阅 MySQL binlog日志,更新缓存。

消息队列重试机制

引入消息队列,将第二步操作(删除缓存) 要操作的数据放到消息队列中,由消费者来删除缓存。

  • 如果应用删除缓存失败,可以通过消费者重试机制。设置一个最大的重试次数。如果超过最大重试次数还是失败,那就要向服务层发送报错信息。
  • 如果删除缓存成功后返回ack。避免重复消费。

这种方案的缺点就是对代码侵入性比较强,需要修改原本的业务代码。

消费者:交换机和队列都持久化防止消息丢失。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstant.REDIS_MQ_QUEUE, durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
        exchange = @Exchange(name = MqConstant.REDIS_DELETE_EXCHANGE_NAME, type = ExchangeTypes.DIRECT),
        key = MqConstant.REDIS_MQ_QUEUE_ROUTING_KEY
))
public void listenRedisDeleteMqMessage( String msg) {
    log.info("RedisDeleteMqListener:redis删除数据:" + msg);
    Set keys = redisTemplate.keys(msg);
    redisTemplate.delete(keys);
}

配置好相关的生产者消费者确认机制,以及消费者失败重试机制,确保消息正确路由和重试。

spring:      
  rabbitmq:
    host: 192.168.215.140 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /bigevents # 虚拟主机
    username: big-events # 用户名
    password: 123321 # 密码
    publisher-returns: true # 开启publisher return机制
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
        acknowledge-mode: auto  #消费者自动ack 异常nack

Canal+MQ

先更新数据库,对数据库的写操作都会被记录在binlog日志中。所以我们更新数据库,也就会产生一条对应的变更日志在binlog中。

于是我们可以通过订阅binlog日志,拿到具体要操作的数据,然后再执行删除缓存,阿里巴巴开源的Canal中间件就是基于这个实现的。

Canal模拟MySQL主从复制的交互协议,把自己伪装成一个MySQL的从节点,向MySQL的主节点发送dump命令请求,MySQL收到请求后,就会推送Binlog给Canal,Canal解析binlog字节流后,转换为便于读取的结构化数据。供下游服务订阅使用。
在这里插入图片描述
前面我们讲了MQ的解决方案对原有的业务代码有很强的侵入性。但是这种方案就不会,直接监听binlog,因此我们可以用binlog+MQ的解决方案。既保证了数据的一致性又不会对原有业务代码有侵入。

具体实现

mysql添加用户和权限

CREATE USER canal IDENTIFIED BY 'canal'; 

# 添加从节点权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

修改canal相关配置:

vi canal-server/conf/example/instance.properties

在这里插入图片描述

通过spring定时任务监听canal消息,解析canal的消息,并投递给mq

/**
 * 每2秒执行一次
 *
 * @throws Exception
 */
@Scheduled(fixedRate = 2 * 1000)
public void run() throws Exception {

    //进行连接
    canalConnector.connect();
    //进行订阅
    canalConnector.subscribe();

    int batchSize = 5 * 1024;


    //获取Message对象
    Message message = canalConnector.getWithoutAck(batchSize);
    long id = message.getId();
    int size = message.getEntries().size();

    System.out.println("当前监控到的binLog消息数量是:" + size);

    //判断是否有数据

    //如果有数据,进行数据解析
    List<CanalEntry.Entry> entries = message.getEntries();

    //遍历获取到的Entry集合
    for (CanalEntry.Entry entry : entries) {
        System.out.println("----------------------------------------");
        System.out.println("当前的二进制日志的条目(entry)类型是:" + entry.getEntryType());

        //如果属于原始数据ROWDATA,进行打印内容
        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
            try {
                //获取存储的内容
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

                //打印事件的类型,增删改查哪种 eventType
                System.out.println("事件类型是:" + rowChange.getEventType());
                ArrayList<Long> idList = new ArrayList<>();
                //打印改变的内容(增量数据)
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println("改变后的数据:" + rowData.getAfterColumnsList());
                    List<Column> after;
                    if (rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {
                        after = rowData.getAfterColumnsList();
                    } else if (rowChange.getEventType().equals(CanalEntry.EventType.DELETE)) {
                        after = rowData.getBeforeColumnsList();
                    } else {
                        //update
                        after = rowData.getAfterColumnsList();
                    }
                    for (Column column : after) {
                        if (column.getName().equals("id")) {
                            sendMessageToMq(column.getValue());
                        }
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    //消息确认已经处理了
    canalConnector.ack(id);
}

消费者监听mq,最后删除缓存。

@Slf4j
@Component
@AllArgsConstructor
public class CanalMqListener {

    private final RedisTemplate redisTemplate;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstant.CANAL_MQ_QUEUE, durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
            exchange = @Exchange(name = MqConstant.CANAL_MQ_EXCHANGE_NAME, type = ExchangeTypes.DIRECT),
            key = MqConstant.CANAL_MQ_EXCHANGE_NAME
    ))
    public void listenCanalMqMessage( String msg) {
        log.info("listenCanalMqMessage:redis变更数据:" + msg);
        try {
            Set keys = redisTemplate.keys(msg);
            redisTemplate.delete(keys);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

总结

[图片]

解决方案

延时双删

优点:

  • 无需引入中间件
  • 实现简单,代码改动小

缺点:

  • 延迟时间不好控制
  • 极端情况会出现数据不一致

消息队列

优点:

  • 确保缓存被删除

缺点:

  • 需要引入消息队列,增加了系统复杂度。
  • 延迟稍高(对于能接受一定延迟的场景使用)
  • 对原有业务代码有侵入

Canal+MQ

优点:

  • 确保缓存被删除
  • 直接监听binlog,对原有代码无侵入

缺点:

  • 需要引入多个中间件对团队运维能力有较高要求
  • 延迟稍高

相关文章:

  • 2:认识数据库
  • 在 .NET 9.0 Web API 中实现 Scalar 接口文档及JWT集成
  • CIFAR10 数据集自定义处理方法
  • Spring Boot 整合 OpenFeign 教程
  • VitePress由 Vite 和 Vue 驱动的静态站点生成器
  • 自然语言处理(5)—— 中文分词
  • 高等数学-第七版-上册 选做记录 习题5-2
  • Linux——线程
  • 构音障碍(Dysarthria)研究全景总结(1996–2024)
  • 在Mac M1/M2芯片上完美安装DeepCTR库:避坑指南与实战验证
  • systemd-networkd 的 /etc/systemd/network/*.network 能不能一个文件配置多块网卡?不能
  • [01-04-02].第20节:PyQt5库初识及实现简易计算器
  • 网络安全基础
  • 文字变央视级语音转换工具
  • OpenRAND可重复的随机数生成库
  • 元宇宙时代下的 Facebook:机遇与挑战
  • IDEA修改默认作者名称
  • Android Compose 约束布局(ConstraintLayout、Modifier.constrainAs)源码深度剖析(十二)
  • #include <hello.h> 与 #include “hello.h“的区别
  • YOLO学习笔记 | YOLO系列算法研究进展及应用综述
  • 上海虹桥国际咖啡文化节周五开幕,来看Coffeewalk通关攻略
  • 工人日报:“鼠标手”被纳入职业病,劳动保障网越织越密
  • 城市轨道交通安全、内河港区布局规划、扎实做好防汛工作……今天的上海市政府常务会议研究了这些重要事项
  • 牟海松任国家信访局副局长
  • 《广州大典研究》集刊发展座谈会:“广州学”的传承与创新
  • 国家统计局今年将在全国开展两次人口固定样本跟访调查