当前位置: 首页 > news >正文

技术聚焦:Debezium 如何将数据库数据精准注入 Kafka

#作者:任少近

文章目录

  • 第一章 Debezium抽取mysql数据给kafka原理
  • 第二章 Debezium 与kafka抽取方法及验证
    • 2.1 debezium2.0+kafka3.3.1+mysql8
    • 2.2 debezium2.0+kafka2.6.1+mysql8
    • 2.3 debezium2.0+kafka2.6.1+mysql5.7

第一章 Debezium抽取mysql数据给kafka原理

debezium的connector捕获到源数据库数据更新,发送到kafka集群中,利用kafka connect提供的sink connector,将数据同步到其他数据库,数据仓库等存储介质中。
在这里插入图片描述
上图中是采集mysql的binlog日志。真正主要的是SnapshotReader和BinlogReader,实现了对MySQL数据的全量读取和增量读取,采集到的数据默认一个表一个kafka topic,也可以指定topic名。将数据同步到其他系统、数据库、数据仓库等。

第二章 Debezium 与kafka抽取方法及验证

准备基于服务:
使用Debezium需要三个独立的服务:ZooKeeper、Kafka和Debezium connector服务

为了验证适配置版本问题,需要部署环境、安装部署:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 -d quay.io/debezium/zookeeper:2.0
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper -d quay.io/debezium/kafka:2.0
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -d quay.io/debezium/example-mysql:2.0
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql -d quay.io/debezium/connect:2.0

情况如下:
在这里插入图片描述

2.1 debezium2.0+kafka3.3.1+mysql8

  • 官网最新稳定版debezium的为2.0版本
  • Mysql为Mysql 8.0.31版本
  • Kafka为3.3.1版本,scala为2.13版本
  • Zookeeper为3.6.3版本

验证如下:
在这里插入图片描述
配置文件说明:

 { 
"name": "inventory-connector", 
"config": { 
//官方提供的类,已经写死,直接可以用
"connector.class": "io.debezium.connector.mysql.MySqlConnector", 

 "tasks.max": "1", 

"database.hostname": "mysql",
 "database.port": "3306", 
"database.user": "debezium", 
"database.password": "dbz", 
//这个连接器作为mysql一个slave来工作,ID可以随便写。
"database.server.id": "184054",
//作为topic的前缀 ,比如一个表生成一个topic可以这么命名
"topic.prefix": "dbserver1",  
// 选择某一个数据库
"database.include.list": "inventory",
//存一些历史记录,比如kafka的地址
 "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
//主要用来存储debezium的schema来用
 "schema.history.internal.kafka.topic": "schemahistory.inventory"
  }
 }'

插入数据验证:
在这里插入图片描述
查看下采集的binlog

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.0 watch-topic -a -k dbserver1.inventory.customers

在这里插入图片描述

2.2 debezium2.0+kafka2.6.1+mysql8

  • 官网最新稳定版debezium的为2.0版本
  • Mysql为Mysql 8.0.31版本
  • Kafka为2.6.1,scala为2.12版本
  • Zookeeper为3.6.3版本
    在这里插入图片描述
    对数据库进行操作:
    在这里插入图片描述
    查看下采集的binlog
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.4 watch-topic -a -k dbserver1.inventory.customers

在这里插入图片描述

2.3 debezium2.0+kafka2.6.1+mysql5.7

  • 官网最新稳定版debezium的为2.0版本
  • Mysql为5.7版本
  • Kafka为2.6.1
  • Zookeeper为3.6.3版本
    在这里插入图片描述
    对数据库进行操作:
    在这里插入图片描述
    查看下采集的binlog
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.4 watch-topic -a -k dbserver1.inventory.customers

在这里插入图片描述

http://www.dtcms.com/a/61187.html

相关文章:

  • Ubuntu 22.04 升级到 Ubuntu 24.04 全流程指南
  • 群晖DS223 Docker搭建为知笔记
  • 【每日八股】Golang篇(三):关键字(下)
  • 【论文解读】《START: Self-taught Reasoner with Tools》
  • 2025年网络安全(黑客技术)120天自学手册
  • 面向高质量视频生成的扩散模型方法-算法、架构与实现【附核心代码】
  • SQLAlchemy系列教程:如何执行原生SQL
  • 【网络安全 | 漏洞挖掘】$15,000——通过持久token获取个人身份信息(PII)
  • 小智智能体语言大模型硬件软件开发
  • Linux 使用 docker 安装 Gogs 公司私有 Git 仓库
  • vue打包编译【自动删除node_modules下的.cache缓存文件夹】
  • Linux 系统负载过高的排查思路
  • 【10】单片机编程核心技巧:指令周期与晶振频率
  • SSTI注入笔记
  • QT系列教程(20) Qt 项目视图便捷类
  • Spring Boot基础使用详解
  • Oracle 数据库基础入门(七):触发器与事务的深度探究
  • JVM 类加载原理之双亲委派机制(JDK8版本)
  • spring boot3.4.3+MybatisPlus3.5.5+swagger-ui2.7.0
  • Qt 加载插件:实现可扩展应用的秘诀
  • halcon机器人视觉(二)固定相机抓取hand_eye_stationarycam_grasp_nut
  • 【Mastering Vim 2_12】(完结篇)第九章:以终为始 —— Vim 推荐编辑习惯与相关学习资源整理
  • mapbox高阶,结合threejs(threebox)添加管道
  • vscode(cursor)配置python环境,含远程调试
  • MATLAB并行计算加速,用 parfor 和 spmd 榨干多核CPU性能
  • Jeinkins配置sonarqube
  • 网络安全之数据加密(DES、AES、RSA、MD5)
  • PPO算法 - AI学习记录
  • bug-Ant中a-select的placeholder不生效(绑定默认值为undefined)
  • 代理IP与反爬技术的博弈