当前位置: 首页 > news >正文

浅谈canal实例 在docker里面安装canal镜像 Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch 示例

目录

1. 环境准备

1.1 MySQL配置

1.2 部署Canal Server

2. Spring Boot项目配置

2.1 添加依赖

2.2 配置参数

3. 实现Canal监听与同步

3.1 Canal客户端监听

3.2 同步到Redis

3.3 同步到Elasticsearch

4. 注意事项


在Spring Boot中通过Canal监听MySQL数据库变更并同步更新Redis和Elasticsearch,可按照以下步骤实现:


1. 环境准备

1.1 MySQL配置
  • 开启Binlog并设置为ROW模式:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
  • 创建Canal用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2 部署Canal Server
  1. 下载Canal Server:Canal Releases
  2. 修改配置 conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306 
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal 
canal.instance.filter.regex=.*\\..*  # 监听所有库表,或指定如test.user

2. Spring Boot项目配置

2.1 添加依赖
<!-- Canal客户端 -->
<dependency>
    <groupId>com.alibaba.otter</groupId> 
    <artifactId>canal.client</artifactId> 
    <version>1.1.6</version>
</dependency>
<!-- Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency>
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2.2 配置参数

application.yml

canal:
  server: 127.0.0.1:11111
  destination: example
  username: canal
  password: canal

spring:
  redis:
    host: localhost
    port: 6379
  data:
    elasticsearch:
      cluster-nodes: localhost:9200

3. 实现Canal监听与同步

3.1 Canal客户端监听
@Component
public class CanalListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private ElasticsearchRestTemplate esTemplate;

    @PostConstruct
    public void init() {
        CanalConnector connector = CanalConnectors.newSingleConnector( 
                new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");
        
        Thread thread = new Thread(() -> {
            connector.connect(); 
            connector.subscribe(".*\\..*"); 
            while (true) {
                Message message = connector.getWithoutAck(100); 
                long batchId = message.getId(); 
                if (batchId != -1) {
                    processEntry(message.getEntries()); 
                    connector.ack(batchId); 
                }
            }
        });
        thread.start(); 
    }

    private void processEntry(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType()  == EntryType.ROWDATA) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 
                for (RowData rowData : rowChange.getRowDatasList())  {
                    String tableName = entry.getHeader().getTableName(); 
                    EventType eventType = rowChange.getEventType(); 
                    
                    // 解析变更前后的数据
                    Map<String, String> before = parseColumns(rowData.getBeforeColumnsList()); 
                    Map<String, String> after = parseColumns(rowData.getAfterColumnsList()); 
                    
                    // 根据事件类型同步数据
                    switch (eventType) {
                        case INSERT:
                        case UPDATE:
                            syncToRedis(tableName, after);
                            syncToElasticsearch(tableName, after);
                            break;
                        case DELETE:
                            deleteFromRedis(tableName, before);
                            deleteFromElasticsearch(tableName, before);
                            break;
                    }
                }
            }
        }
    }

    private Map<String, String> parseColumns(List<Column> columns) {
        return columns.stream() 
                .collect(Collectors.toMap(Column::getName,  Column::getValue));
    }
}
3.2 同步到Redis
private void syncToRedis(String tableName, Map<String, String> data) {
    String key = tableName + ":" + data.get("id");  // 假设主键为id
    redisTemplate.opsForValue().set(key,  data);
}

private void deleteFromRedis(String tableName, Map<String, String> data) {
    String key = tableName + ":" + data.get("id"); 
    redisTemplate.delete(key); 
}
3.3 同步到Elasticsearch
private void syncToElasticsearch(String tableName, Map<String, String> data) {
    IndexQuery indexQuery = new IndexQueryBuilder()
            .withId(data.get("id")) 
            .withObject(data)
            .build();
    esTemplate.index(indexQuery,  IndexCoordinates.of(tableName)); 
}

private void deleteFromElasticsearch(String tableName, Map<String, String> data) {
    esTemplate.delete(data.get("id"),  IndexCoordinates.of(tableName)); 
}

4. 注意事项

  1. 异常处理:增加重试机制或记录错误日志,确保网络波动时的数据一致性。
  2. 性能优化:批量处理Canal消息,减少Redis/ES的频繁写入。
  3. 数据结构:确保Elasticsearch的索引Mapping与MySQL表结构兼容。
  4. 事务管理:如需强一致性,可结合本地事务表或消息队列(如RocketMQ)做可靠投递。

通过以上步骤,Spring Boot应用能够实时监听MySQL变更,并自动同步到Redis和Elasticsearch,保障数据一致性。

相关文章:

  • zabbix原生linux命令部署和docker部署
  • docker安装milvus向量数据库Attu可视化界面
  • 母婴电商企业案例:日事清驱动项目管理执行与OKR目标管理的流程自动化实践
  • 为什么labelme框选图片后闪退
  • 红宝书第七讲:this绑定与强制类型转换详解(小白指南)
  • 51单片机程序变量作用域问题
  • 【Oracle资源损坏类故障】:详细了解坏块
  • PyTorch分布式训练中各节点如何通信
  • Redis 持久化机制
  • 汇编代码中嵌入回调函数的优化说明
  • Centos7快速在线安装MySQL8.0最新版本教程
  • MySQL WHERE 子句详解
  • 蓝桥杯嵌入式赛道复习笔记5(捕获信号发生器的PWM的波形)
  • word报告篇:python生成《蔬菜店销售数据分析报告》案例
  • Spring Boot整合Apache BookKeeper教程
  • 网络安全之前端学习(HTML篇)
  • Leetcode322-零钱兑换
  • 如何在 WordPress 中重新生成永久链接?
  • HarmonyOS next性能优化:多维度策略与实战案例
  • Linux C/C++编程——线程
  • 锚定“双一流”战略坐标,福建农林大学向全球英才“伸出橄榄枝”
  • 魔都眼丨人形机器人“华山论剑”:拳击赛缺席,足球赛抢镜
  • 大卫·第艾维瑞谈历史学与社会理论③丨尼古拉斯·卢曼与历史研究
  • 看正背面月壤、听火星上的声音,记者探营“中国航天日”科普展
  • 李公明|“小时光”与大时代中的地铁阅读者
  • 马上评丨全面取消 “仅退款”,反内卷的必然