Mac系统,Docker的MySQL + 本地 Canal
MySQL 是 Docker 容器安装的8.0.29
,这里选择 Canal 1.1.6
版本。
先查看当前 MySQL 是否开启了 binlog 模式
这里使用 docker 客户端,直接就能进入到容器内,使用终端就得需要 image Id
# 进入到MySQL容器的工作目录下
docker exec -it 3a0532e74496 bash# 登录MySQL
mysql -uroot -proot # 开启binlog模式
SHOW VARIABLES LIKE '%log_bin%';
- 如果不确定,可以对配置文件进行配置
通过我的摸索基本可以确定,MySQL 的核心配置文件为/etc/my.cnf
,就算是没有配置任何的配置文件,它也不会是空的
查看该文件夹,可以看到!includedir /etc/mysql/conf.d/
,这代表配置还会去这个文件夹下去找
sh-4.4# cat my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/8.0/en/server-configuration-defaults.html[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M# Remove leading # to revert to previous value for default_authentication_plugin,
# this will increase compatibility with older clients. For background, see:
# https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_default_authentication_plugin
# default-authentication-plugin=mysql_native_password
skip-host-cache
skip-name-resolve
datadir=/var/lib/mysql
socket=/var/run/mysqld/mysqld.sock
secure-file-priv=/var/lib/mysql-files
user=mysqlpid-file=/var/run/mysqld/mysqld.pid
[client]
socket=/var/run/mysqld/mysqld.sock!includedir /etc/mysql/conf.d/
conf.d
目录的作用
该目录用于存放 MySQL 的分片配置文件,通常以 .cnf
结尾(如 mysql.cnf
或 mysqld.cnf
)。这些文件会被主配置文件(如 /etc/mysql/mysql.conf.d/mysqld.cnf
)通过 !includedir /etc/mysql/conf.d/
指令动态加载。
进入/etc/mysql/conf.d/
文件夹,看到自己在本机建立,然后映射到 docker 容器的文件
所以如果害怕没有害怕log_bin
就可以在这建立一个xxx.cnf
文件,并且进行配置,图中的my.cnf
就是我后来添加的
[mysqld]
log-bin=mysql-bin # 开启 binlog,`mysql-bin` 为日志前缀(可自定义路径,如 `log-bin=/var/lib/mysql/mysql-bin`)
binlog-format=ROW # 推荐 ROW 模式(保证主从复制一致性,减少误操作风险)
server-id=1 # 主从复制必需,需确保每台 MySQL 服务器 ID
【总结】mysql 的配置文件
- 核心是
/etc/my.cnf
- 其他会被加载的配置文件,
/etc/mysql/conf.d/*.cnf
【创建账号并授权】
- 创建
canal
用户,允许远程连接; - 授予 SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SUPER 权限,使 Canal 能读取 binlog 并监控数据库变更;
- 修改认证方式(针对 MySQL 8.0+ 兼容性);
- 刷新权限,确保配置立即生效;
在刚才mysql
容器内,依次执行下面的命令
# 创建 canal 用户并设置密码
create user canal@'%' IDENTIFIED by 'canal';
# canal@'%':创建一个用户名为 canal,允许从 任意主机(% 表示所有IP) 连接 MySQL
# IDENTIFIED BY 'canal':设置该用户的密码为 canal# 授予 canal 用户必要的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
# SELECT:允许查询数据(Canal 需要读取 binlog,因此需要查询权限)
# REPLICATION SLAVE:允许作为从库读取主库的 binlog(Canal 模拟 MySQL 从库拉取变更日志)
# REPLICATION CLIENT:允许查看主库/从库状态(Canal 需要检查复制状态)
# SUPER:允许执行某些管理命令(如 SET GLOBAL,某些 MySQL 版本 Canal 需要此权限)
# ON *.*:对所有数据库(*.*)生效。# 修改 canal 用户的认证方式(MySQL 8.0+ 需要)
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
# mysql_native_password:使用 旧版密码认证(MySQL 8.0+ 默认使用 caching_sha2_password,但部分客户端(如 Canal)可能不支持,需切换回旧方式)
# BY 'canal':保持密码不变。# 刷新权限,使更改立即生效
FLUSH PRIVILEGES;
MacOS 安装
这里 MySQL 使用 Docker 安装,但是 Canal 没有 苹果芯片架构的。
所以使用官网进行下载,这里是 1.1.6。
下载地址
下载完毕后,直接在本地打开到bin
目录下, 可以看到startup.sh
以后就要使用该脚本进行启动。但是这个文件是有问题的,它默认支持的是Java 8
。
所以如果本地是Java >= 17
的,那么就会报出参数错误,可以丢给AI
让它帮忙删除,人眼很难找到。
这里给一份修改后的:
#!/bin/bash current_path=`pwd`
case "`uname`" inLinux)bin_abs_path=$(readlink -f $(dirname $0));;*)bin_abs_path=`cd $(dirname $0); pwd`;;
esac
base=${bin_abs_path}/..
canal_conf=$base/conf/canal.properties
canal_local_conf=$base/conf/canal_local.properties
logback_configurationFile=$base/conf/logback.xml
export LANG=en_US.UTF-8
export BASE=$baseif [ -f $base/bin/canal.pid ] ; thenecho "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2exit 1
fiif [ ! -d $base/logs/canal ] ; then mkdir -p $base/logs/canal
fi## set java path
if [ -z "$JAVA" ] ; thenJAVA=$(which java)
fiALIBABA_JAVA="/usr/alibaba/java/bin/java"
TAOBAO_JAVA="/opt/taobao/java/bin/java"
if [ -z "$JAVA" ]; thenif [ -f $ALIBABA_JAVA ] ; thenJAVA=$ALIBABA_JAVAelif [ -f $TAOBAO_JAVA ] ; thenJAVA=$TAOBAO_JAVAelseecho "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2exit 1fi
ficase "$#"
in
0 ) ;;
1 ) var=$*if [ "$var" = "local" ]; thencanal_conf=$canal_local_confelseif [ -f $var ] ; then canal_conf=$varelseecho "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."exitfifi;;
2 ) var=$1if [ "$var" = "local" ]; thencanal_conf=$canal_local_confelseif [ -f $var ] ; thencanal_conf=$varelse if [ "$1" = "debug" ]; thenDEBUG_PORT=$2DEBUG_SUSPEND="n"JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"fififi;;
* )echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."exit;;
esacJavaVersion=`$JAVA -version 2>&1 |awk 'NR==1{ gsub(/"/,""); print $3 }' | awk -F '.' '{print $1}'`
str=`file -L $JAVA | grep 64-bit`
JAVA_OPTS="$JAVA_OPTS -Xss256k -XX:-UseBiasedLocking -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$base/logs"if [ $JavaVersion -ge 11 ] ; then#JAVA_OPTS="$JAVA_OPTS -Xlog:gc*:$base_log/gc.log:time "JAVA_OPTS="$JAVA_OPTS"
else#JAVA_OPTS="$JAVA_OPTS -Xloggc:$base/logs/canal/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime"JAVA_OPTS="$JAVA_OPTS -XX:+UseFastAccessorMethods -XX:+PrintAdaptiveSizePolicy -XX:+PrintTenuringDistribution"
fiif [ -n "$str" ]; then# JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC $JAVA_OPTS"# For G1JAVA_OPTS="-server -Xms2g -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent $JAVA_OPTS"
elseJAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m $JAVA_OPTS"
fiJAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"if [ -e $canal_conf -a -e $logback_configurationFile ]
then for i in $base/lib/*;do CLASSPATH=$i:"$CLASSPATH";doneCLASSPATH="$base/conf:$CLASSPATH";echo "cd to $bin_abs_path for workaround relative path"cd $bin_abs_pathecho LOG CONFIGURATION : $logback_configurationFileecho canal conf : $canal_conf echo CLASSPATH :$CLASSPATH$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal_stdout.log 2>&1 &echo $! > $base/bin/canal.pid echo "cd to $current_path for continue"cd $current_path
else echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
fi
除此之外,canal.properties
文件也是很重要的,在/canal/conf
下。
我们除此使用的时候,很可能是监听 MySQL ,那么就需要配置canal.instance
,该文件中有两行,是因为它默认 canal.properties
为主配置文件,然后进行创建其他实例,在/conf/实例名/实例名.properties
中配置与对应的客户端进行通信(MQ、Kafka)。
但是对于小白,可能已经蒙了,所以附一份修改后的 canal.properties
,不需要关心其他实例了
需要修改属性,其他可以复制
canal.destinations = mysql-master # docker 容器中 MySQL 实例名
canal.instance.master.address = 127.0.0.1:3307 # 更改ip和端口
canal.instance.defaultDatabaseName = tingshu_album # 指定数据库
# MySQL 连接用户名和密码 这两个,在上面mysql 设置已经创建了该用户
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# Canal Server 基本配置文件
# Canal Server 端口配置
canal.port = 11111
canal.metrics.pull.port = 11112# Canal Server 模式配置 (tcp, kafka, rocketMQ, rabbitMQ)
canal.serverMode = tcp# Canal destinations 配置,指定单个实例
canal.destinations = mysql-master# Canal 实例配置目录
canal.conf.dir = ../conf
# Canal 日志目录
canal.log.dir = ../logs# Canal 实例全局配置
# MySQL 主库地址
canal.instance.master.address = 127.0.0.1:3307
# MySQL 从库地址(可选)
canal.instance.standby.address = # MySQL 连接用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal# 字符集配置
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = tingshu_album# binlog 解析配置
canal.instance.binlog.format = ROW
canal.instance.binlog.image = FULL# position 记录配置
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.master.gtid = # 时间戳数据库配置
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../data}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;# GTID 配置
canal.instance.gtidon = false# 表过滤配置 - 只监听指定表的变更
# 格式:数据库名.表名,支持正则表达式
canal.instance.filter.regex = tingshu_album.album_info# 黑名单配置(不监听的表)
canal.instance.filter.black.regex = # 心跳检查配置
canal.instance.detecting.enable = false
canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# 内存存储配置
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.raw.mode = true# binlog 解析并行度配置
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 60
canal.instance.parser.parallelBufferSize = 256# binlog 接收缓冲区配置
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# 批量获取配置
canal.instance.get.ddl.isolation = false
canal.instance.parser.isTableError = true# 数据库连接池配置
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid = false# 监控配置
canal.admin.manager =
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = # 实例自动扫描配置 - 禁用自动扫描,只监听指定实例
canal.auto.scan = false
canal.auto.scan.interval = 5# HA 配置
canal.zkServers =
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false# TCP 配置
canal.tcp.no.delay = true
canal.tcp.keep.alive = true
canal.tcp.recv.buffer.size = 16384
canal.tcp.send.buffer.size = 16384# 文件数据存储配置
canal.file.data.dir = ../data
canal.file.flush.period = 1000# 实例配置重载
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address =
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
最后修改 SpringBoot中yaml
文件
# canal
canal:destination: mysql-master # 要监听的实例名称,这里是 mysql 实例名称server: 127.0.0.1:11111
【注意】本教程只适合,是监听 MySQL 数据变化,通过 JAVA API 接口监控MySQL数据变化的代码编写。
不是引入客户端,因为发现,我们从头到尾都没有在项目的pom.xml
中引入 canal 客户端依赖。
通过这样的方式监控到数据变化,后续做删除缓存操作的。
@Component
@CanalTable("album_info") // 监听变更表
@Slf4j
public class CdcEntityHandler implements EntryHandler<CdcEntity> {/*** 监听的表中有数据新增的时候,会回调该方法*/@Overridepublic void insert(CdcEntity cdcEntity) {log.info("Canal客户端监听到了album_info表中有数据的新增的id:{}", cdcEntity.getId());}/*** 监听的表中有数据变更的时候,会回调该方法* @param before 修改之前的老数据* @param after 修改后的新数据*/@Overridepublic void update(CdcEntity before, CdcEntity after) {log.info("Canal客户端监听到了album_info表中有数据的修改,修改之前的id:{}", before.getId());log.info("Canal客户端监听到了album_info表中有数据的修改,修改之后的id:{}", after.getId());// TODO 删除缓存}/*** 监听的表中有数据删除的时候,会回调该方法* @param cdcEntity 删除的对象*/@Overridepublic void delete(CdcEntity cdcEntity) {log.info("Canal客户端监听到了album_info表中有数据的删除,删除的数据的id:{}", cdcEntity.getId());}
}