当前位置: 首页 > news >正文

Kafka集成Debezium监听postgresql变更

在这里插入图片描述

下载postgres的插件:https://debezium.io/documentation/reference/2.7/install.html

2.7版本支持postgresql12数据库。

debezium-connector-postgres-2.7.4.Final-plugin.tar.gz

上传插件并解压

mkdir /usr/local/kafka/kafka_2.12-2.2.1/connector
cd /usr/local/kafka/kafka_2.12-2.2.1/connector
tar -zxvf debezium-connector-postgres-2.7.4.Final-plugin.tar.gz

进入kafka配置目录配置连接器(单机kafka)

cd /usr/local/kafka/kafka_2.12-2.2.1/config
#修改配置
vi connect-standalone.properties
#配置kafka地址
bootstrap.servers=192.168.159.100:9092

#采集的数据存储到kafka的数据格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#是否存储schemas,存储则格式会复杂些
key.converter.schemas.enable=false
value.converter.schemas.enable=false

#插件的路径
plugin.path=/usr/local/kafka/kafka_2.12-2.2.1/connector

使用命令启动 Kafka Connect 服务(单机kafka)

/usr/local/kafka/kafka_2.12-2.2.1/bin/connect-standalone.sh -daemon /usr/local/kafka/kafka_2.12-2.2.1/config/connect-standalone.properties /usr/local/kafka/kafka_2.12-2.2.1/config/connect-file-source.properties /usr/local/kafka/kafka_2.12-2.2.1/config/connect-file-sink.properties

配置Debezium Connector:

pgsql修改配置文件:postgresql.conf

listen_addresses = '*'
#开启逻辑复制
wal_level = logical
# 允许多少个流复制协议连接过来,一个流复制协议会产生一个walsender进程,该数不能低于master的配置数量
max_wal_senders = 2	
max_replication_slots = 1

修改pg_hba.conf:

#添加
host    all             all             0.0.0.0/0                 trust
host    replication     all             0.0.0.0.0                 trust

注册连接器:

{
    "name": "my-postgresql-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "192.168.159.103",
        "database.port": "15432",
        "database.dbname": "db_test",
        "database.user": "postgres",
        "database.password": "123456",
        "plugin.name": "pgoutput",
        "database.server.name": "server5",
        "topic.prefix": "linging"
    }
}

执行:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.159.100:8083/connectors/ -d '{"name":"my-postgresql-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"192.168.159.103","database.port":"15432","database.dbname":"db_test","database.user":"postgres","database.password":"123456","plugin.name":"pgoutput","database.server.name":"server5","topic.prefix":"linging"}}'

修改数据库表,查看kafka消息:
在这里插入图片描述

http://www.dtcms.com/a/84238.html

相关文章:

  • 快速入手-Django项目模版和静态文件(二)
  • 2025年03月10日人慧前端面试(外包滴滴)
  • 随笔(1)
  • 操作系统复习(第五章 输入与输出管理)
  • 重复的子字符串
  • linux常用符号
  • dcat-admin已完成项目部署注意事项
  • 软件工程面试题(三)
  • redis集群的原理是什么?
  • 【C语言】深入理解指针(一):从基础到高级应用
  • 新手村:逻辑回归-理解02:逻辑回归中的伯努利分布
  • 项目生命周期 和 项目管理生命周期的差异
  • 【002安卓开发方案调研】之Kotlin+Jetpack开发方案
  • 动态规划入门详解
  • 知识图谱中NLP新技术
  • HTML CSS 使div中的子元素横向排列,并均匀分布
  • Android集成Facebook登录与分享的常见问题及解决方案
  • VSCode 抽风之 两个conda环境同时在被激活
  • 用AI在云平台上用自然语言生成定制化SQL查询复杂数据库
  • Spring框架入门指南:从Hello World到IOC容器
  • TPCTF 2025 web 复现
  • 【项目设计】网页版五子棋
  • 2025知识图谱峰会(脱敏)PPT合集(18份).zip
  • css基础-选择器
  • SRS-GB28181 实现浅析之二--基本逻辑与结构
  • AI + 医疗 Qwq大模型离线本地应用
  • 【贝叶斯定理(Bayesian Theorem)】
  • 深入剖析Java虚拟机(JVM):从零开始掌握Java核心引擎
  • flutter doctor提示cmdline-tools component is missing错误的解决
  • 【006安卓开发方案调研】之大厂APP混合开发方案