maxwell
一、maxwell简介
它是一款轻量级工具,主要用于实现 MySQL 到 Kafka 的数据实时同步,尤其适合对实时性要求较高的场景。
1.核心功能
借助解析 MySQL 的 Binlog,能够实时捕获数据变更,并将这些变更数据发送至 Kafka。
2.缺点
仅支持 MySQL 数据源,适用范围较窄。
3.maxwell官网
官网默认下载的是最新版本
https://maxwells-daemon.io/
文档查询地址
http://maxwells-daemon.io/quickstart/
历史版本下载
https://github.com/zendesk/maxwell/releases
4.版本选择
4.1 JDK版本
v1.29.2及更早版本:仅支持 JDK 1.8,若使用更高版本 JDK 会导致运行失败。
v1.30.0 及以上版本:不再支持 JDK 1.8,需升级至 JDK 11+
4.2 maxwell版本
小规模场景或 JDK 1.8 用户:选择 v1.29.2,稳定性较好且轻量。
需高兼容性或新功能:尝试 v1.30.0,但需确保 JDK 环境符合要求
二、mysql环境
1.版本
mysql> select version();
+-----------+
| version() |
+-----------+
| 8.0.40 |
+-----------+
1 row in set (0.00 sec)
2.查看二进制日志位置
mysql8 默认二进制日志是开启的
mysql> show variables like "%log_bin%";
+---------------------------------+-------------------------------+
| Variable_name | Value |
+---------------------------------+-------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql/data/binlog |
| log_bin_index | /data/mysql/data/binlog.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+-------------------------------+
3.创建连接用户
1.创建用户
create user 'root'@'127.0.0.1' identified by '666666';
2.授权用户
grant all on *.* to 'root'@'127.0.0.1' with grant option;
3.刷新权限
flush privileges;
4.连接测试
[root@node-1 local]# mysql -u root -p -h 127.0.0.1
Enter password:
创建测试数据库
mysql> create database test_db;
三、搭建maxwell
1.版本
这里使用的是最新版本
tar xf maxwell-1.41.2.tar.gz
mv maxwell-1.41.2 /usr/local/maxwell
2.配置
[root@node-1 root]# cd /usr/local/maxwell/
[root@node-1 maxwell]# cp config.properties.example config.properties
[root@node-1 maxwell]# egrep -v "^$|#" config.properties
log_level=info
# 将结果输出到屏幕上
producer=stdout
# mysql连接地址,也就是要收集的数据源地址
host=localhost
# 连接的用户名
user=root
# 连接的密码
password=666666
#以下都是默认配置
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
3.启动maxwell
[root@node-1 maxwell]# ./bin/maxwell --config ./config.properties
[root@node-1 maxwell]# ./bin/maxwell --config ./config.properties
2025-03-12 02:51:22 INFO Maxwell - Starting Maxwell. maxMemory: 247332864 bufferMemoryUsage: 0.25
2025-03-12 02:51:22 INFO SchemaStoreSchema - Creating maxwell database
2025-03-12 02:51:22 INFO Maxwell - Maxwell v1.41.2 is booting (StdoutProducer), starting at Position[BinlogPosition[binlog.000006:115920], lastHeartbeat=0]
2025-03-12 02:51:22 INFO AbstractSchemaStore - Maxwell is capturing initial schema
2025-03-12 02:51:22 INFO BinlogConnectorReplicator - Setting initial binlog pos to: binlog.000006:115920
2025-03-12 02:51:22 INFO BinaryLogClient - Connected to localhost:3306 at binlog.000006/115920 (sid:6379, cid:71)
2025-03-12 02:51:22 INFO BinlogConnectorReplicator - Binlog connected.
4.测试
4.1 创建测试表
mysql> use test_db
Database changed
mysql> create table t1(id int primary key auto_increment,name varchar(20));
Query OK, 0 rows affected (0.06 sec)
4.2 插入测试数据
mysql> insert into t1 values(NULL,'zhangsan');
Query OK, 1 row affected (0.00 sec)
4.3 maxwell控制台结果
{"database":"test_db","table":"t1","type":"insert","ts":1741763417,"xid":4915,"commit":true,"data":{"id":1,"name":"zhangsan"}}
这里说明已经能够实时记录用mysql数据的变化
5.数据匹配filter
maxwell默认是获取所有数据库中的所有表数据。在实际获取数据的时候,并不需要这么多表的数据,那么如何获取部分数据。在配置文件中配置如下
filter=exclude:*.*,include:test_db./t2|t3/
解释:
filter: 表示使用过滤规则
exclude: 拒绝获取所有数据,是的,没错,拒绝获取所有数据,然后后续在获取指定的表数据。
*.* 表示所有库的所有表
incdlue:获取指定的表数据
此处是获取test_db数据库中的 t2和t3表的数据
四、maxwell对接kafka
1.版本
这里的kafka使用的是单机版,3.7版本
2.创建topic
./kafka-topics.sh --bootstrap-server 192.168.1.12:9092 --create --topic MysqlBinLog
3.启用一个匿名消费者
/kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic MysqlBinLog
4.配置maxwell对接kafka
# tl;dr config
log_level=info
#启用kafka模式
producer=kafka
kafka.bootstrap.servers=192.168.1.12:9092
kafka_topic=MysqlBinLog
#下边的配置上边已经讲述过了
host=localhost
user=root
password=666666
filter=exclude:*.*,include:test_db./t2|t3/
client_id=1.10
binlog_connector=true
output_binlog_position=true
output_server_id=true
output_ddl=tru
5.启动maxwell
./bin/maxwell --config ./config.properties
6.修改mysql数据
mysql> insert into t3 values(NULL,'fy');
Query OK, 1 row affected (0.06 sec)
mysql> insert into t2 values(NULL,'fy');
Query OK, 1 row affected (0.01 sec)
mysql> insert into t1 values(NULL,'fy');
Query OK, 1 row affected (0.01 sec)
mysql> insert into t1 values(NULL,'fy');
Query OK, 1 row affected (0.01 sec)
mysql> insert into t2 values(NULL,'fy');
Query OK, 1 row affected (0.02 sec)
7.查看kafka消费者
这里可以看到,修改t3,t2表修改成功,修改t1表没有变化
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic MysqlBinLog
{"database":"test_db","table":"t3","type":"insert","ts":1741834723,"xid":16428,"commit":true,"position":"binlog.000006:549601","server_id":1,"data":{"id":16,"name":"fy"}}
{"database":"test_db","table":"t2","type":"insert","ts":1741834736,"xid":16463,"commit":true,"position":"binlog.000006:550976","server_id":1,"data":{"id":10,"name":"fy"}}
{"database":"test_db","table":"t2","type":"insert","ts":1741834748,"xid":16498,"commit":true,"position":"binlog.000006:552931","server_id":1,"data":{"id":11,"name":"fy"}}