当前位置: 首页 > news >正文

maxwell

一、maxwell简介

它是一款轻量级工具,主要用于实现 MySQL 到 Kafka 的数据实时同步,尤其适合对实时性要求较高的场景。

1.核心功能

借助解析 MySQL 的 Binlog,能够实时捕获数据变更,并将这些变更数据发送至 Kafka。

2.缺点

仅支持 MySQL 数据源,适用范围较窄。

3.maxwell官网

官网默认下载的是最新版本

https://maxwells-daemon.io/

文档查询地址

http://maxwells-daemon.io/quickstart/

历史版本下载

https://github.com/zendesk/maxwell/releases

4.版本选择

4.1 JDK版本

v1.29.2及更早版本:仅支持 JDK 1.8,若使用更高版本 JDK 会导致运行失败。
v1.30.0 及以上版本:不再支持 JDK 1.8,需升级至 JDK 11+

4.2 maxwell版本

小规模场景或 JDK 1.8 用户:选择 v1.29.2,稳定性较好且轻量。
需高兼容性或新功能:尝试 v1.30.0,但需确保 JDK 环境符合要求

二、mysql环境

1.版本

mysql> select version();
+-----------+
| version() |
+-----------+
| 8.0.40    |
+-----------+
1 row in set (0.00 sec)

2.查看二进制日志位置

mysql8 默认二进制日志是开启的

mysql> show variables like "%log_bin%";
+---------------------------------+-------------------------------+
| Variable_name                   | Value                         |
+---------------------------------+-------------------------------+
| log_bin                         | ON                            |
| log_bin_basename                | /data/mysql/data/binlog       |
| log_bin_index                   | /data/mysql/data/binlog.index |
| log_bin_trust_function_creators | OFF                           |
| log_bin_use_v1_row_events       | OFF                           |
| sql_log_bin                     | ON                            |
+---------------------------------+-------------------------------+

3.创建连接用户

1.创建用户
create user 'root'@'127.0.0.1' identified by '666666';

2.授权用户
grant all on *.* to 'root'@'127.0.0.1' with grant option;

3.刷新权限
flush privileges;

4.连接测试

[root@node-1 local]# mysql -u root -p -h 127.0.0.1
Enter password: 

创建测试数据库

mysql> create database test_db;

三、搭建maxwell

1.版本

这里使用的是最新版本

tar xf maxwell-1.41.2.tar.gz
mv maxwell-1.41.2 /usr/local/maxwell

2.配置

[root@node-1 root]# cd /usr/local/maxwell/
[root@node-1 maxwell]# cp config.properties.example config.properties
[root@node-1 maxwell]# egrep -v "^$|#" config.properties
log_level=info
# 将结果输出到屏幕上
producer=stdout
# mysql连接地址,也就是要收集的数据源地址
host=localhost
# 连接的用户名
user=root
# 连接的密码
password=666666

#以下都是默认配置
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1

3.启动maxwell

[root@node-1 maxwell]# ./bin/maxwell --config ./config.properties
[root@node-1 maxwell]# ./bin/maxwell --config ./config.properties
2025-03-12 02:51:22 INFO  Maxwell - Starting Maxwell. maxMemory: 247332864 bufferMemoryUsage: 0.25
2025-03-12 02:51:22 INFO  SchemaStoreSchema - Creating maxwell database
2025-03-12 02:51:22 INFO  Maxwell - Maxwell v1.41.2 is booting (StdoutProducer), starting at Position[BinlogPosition[binlog.000006:115920], lastHeartbeat=0]
2025-03-12 02:51:22 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
2025-03-12 02:51:22 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: binlog.000006:115920
2025-03-12 02:51:22 INFO  BinaryLogClient - Connected to localhost:3306 at binlog.000006/115920 (sid:6379, cid:71)
2025-03-12 02:51:22 INFO  BinlogConnectorReplicator - Binlog connected.

4.测试

4.1 创建测试表

mysql> use test_db
Database changed

mysql> create table t1(id int primary key auto_increment,name varchar(20));
Query OK, 0 rows affected (0.06 sec)

4.2 插入测试数据

mysql> insert into t1 values(NULL,'zhangsan');
Query OK, 1 row affected (0.00 sec)

4.3 maxwell控制台结果

{"database":"test_db","table":"t1","type":"insert","ts":1741763417,"xid":4915,"commit":true,"data":{"id":1,"name":"zhangsan"}}

这里说明已经能够实时记录用mysql数据的变化

5.数据匹配filter

maxwell默认是获取所有数据库中的所有表数据。在实际获取数据的时候,并不需要这么多表的数据,那么如何获取部分数据。在配置文件中配置如下

filter=exclude:*.*,include:test_db./t2|t3/

解释:
filter: 表示使用过滤规则
exclude: 拒绝获取所有数据,是的,没错,拒绝获取所有数据,然后后续在获取指定的表数据。
	*.* 表示所有库的所有表
incdlue:获取指定的表数据
	此处是获取test_db数据库中的 t2和t3表的数据

四、maxwell对接kafka

1.版本

这里的kafka使用的是单机版,3.7版本

2.创建topic

./kafka-topics.sh --bootstrap-server 192.168.1.12:9092 --create --topic MysqlBinLog

3.启用一个匿名消费者

/kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic MysqlBinLog

4.配置maxwell对接kafka

# tl;dr config
log_level=info

#启用kafka模式
producer=kafka
kafka.bootstrap.servers=192.168.1.12:9092
kafka_topic=MysqlBinLog

#下边的配置上边已经讲述过了
host=localhost
user=root
password=666666
filter=exclude:*.*,include:test_db./t2|t3/
client_id=1.10
binlog_connector=true
output_binlog_position=true
output_server_id=true
output_ddl=tru

5.启动maxwell

./bin/maxwell --config ./config.properties

6.修改mysql数据

mysql> insert into t3 values(NULL,'fy');
Query OK, 1 row affected (0.06 sec)

mysql> insert into t2 values(NULL,'fy');
Query OK, 1 row affected (0.01 sec)

mysql> insert into t1 values(NULL,'fy');
Query OK, 1 row affected (0.01 sec)

mysql> insert into t1 values(NULL,'fy');
Query OK, 1 row affected (0.01 sec)

mysql> insert into t2 values(NULL,'fy');
Query OK, 1 row affected (0.02 sec)

7.查看kafka消费者

这里可以看到,修改t3,t2表修改成功,修改t1表没有变化

[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic MysqlBinLog
{"database":"test_db","table":"t3","type":"insert","ts":1741834723,"xid":16428,"commit":true,"position":"binlog.000006:549601","server_id":1,"data":{"id":16,"name":"fy"}}
{"database":"test_db","table":"t2","type":"insert","ts":1741834736,"xid":16463,"commit":true,"position":"binlog.000006:550976","server_id":1,"data":{"id":10,"name":"fy"}}
{"database":"test_db","table":"t2","type":"insert","ts":1741834748,"xid":16498,"commit":true,"position":"binlog.000006:552931","server_id":1,"data":{"id":11,"name":"fy"}}

相关文章:

  • Ubuntu 24.04 Rootless Docker 安装指南
  • 动态参数二维码统计:构建APP渠道追踪体系
  • DeepSeek-R1 面试 -—— GRPO
  • 使用联核科技四向穿梭车能给企业带来哪些效益?
  • LeetCode 第4题:寻找两个正序数组的中位数
  • Linux的chmod命令,给文件设置权限
  • 【Agent实战】货物上架位置推荐助手(RAG方式+结构化prompt(CoT)+API工具结合ChatGPT4o能力Agent项目实践)
  • STC89C52单片机学习——第17节: [7-1]定时器
  • vue中常见面试题(会不断更新版)
  • 深度解读DeepSeek部署使用安全(48页PPT)(文末有下载方式)
  • 鸿蒙移动应用开发--UI组件及应用
  • Unity打包Android平台调用sherpa-onnx
  • 【VUE2】第五期——VueCli创建项目、Vuex多组件共享数据、json-server——模拟服务端api
  • MySQL隐式依赖引发的字段长度溢出:一次触发器事故的深度剖析
  • RK3588 openssl-3.4.1 编译安装
  • esProc SPL vs DuckDB:多源数据处理谁更胜一筹?
  • 编程自学指南:java程序设计开发,反射与注解,反射机制,注解
  • 【商城实战(31)】从0到1:商城项目部署全攻略
  • 提升模型准确性的关键技术与实践指南
  • Qt5中视口(ViewPort)与窗口(Window)
  • 长三角议事厅·周报|从模速空间看上海街区化AI孵化模式
  • 有关部门负责人就《新时代的中国国家安全》白皮书答记者问
  • 当我们提起拉动消费时,应该拉动什么消费?
  • 经济日报金观平:充分发挥超大规模市场优势
  • 当创业热土遇上年轻气息,上海南汇新城发展如何再发力?
  • 竞彩湃|德甲欧冠资格竞争白热化,伯恩茅斯主场迎恶战