当前位置: 首页 > news >正文

通过kafka-connect 实现debezium数据监听采集

1.运行kafka和kafka-connect

services:kafka:image: kafka:4.0.0container_name: kafkaports:- "9092:9092"- "9093:9093"environment:KAFKA_CLUSTER_ID: ""KAFKA_INITAL_CONTROLLERS: CONTROLLERKAFKA_CFG_NODE_ID: 1KAFKA_CFG_PROCESS_ROLES: 'broker,controller'KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094KAFKA_CFG_LISTENERS: INTERNAL://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://127.0.0.1:9093KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXTKAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.confKAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAINKAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINKAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAINKAFKA_INTER_BROKER_USER: ******KAFKA_INTER_BROKER_PASSWORD: ******KAFKA_CLIENT_USERS: adminClientKAFKA_CLIENT_PASSWORDS: ******KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizerKAFKA_SUPER_USERS: User:adminClientKAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"volumes:- /etc/kafka/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf:ro- /root/****/kafka_data:/bitnami/kafkanetworks:net:ipv4_address: ******(ip)kafka-connect:image: kafka-connect:3.2container_name: kafka-connectports:- "8083:8083"environment:GROUP_ID: kafka_connect_groupCONFIG_STORAGE_TOPIC: connect_configsOFFSET_STORAGE_TOPIC: connect_offsetsSTATUS_STORAGE_TOPIC: connect_statuses#      kafka的IP:PortBOOTSTRAP_SERVERS: kafka:9092#      mysql的IP:PortMYSQL_HOST: ******:3306CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXTCONNECT_SASL_MECHANISM: PLAINCONNECT_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="adminBroker" password="*******";'CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXTCONNECT_PRODUCER_SASL_MECHANISM: PLAINCONNECT_PRODUCER_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="******";'CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_PLAINTEXTCONNECT_CONSUMER_SASL_MECHANISM: PLAINCONNECT_CONSUMER_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="******";'CONFIG_STORAGE_REPLICATION_FACTOR: 1OFFSET_STORAGE_REPLICATION_FACTOR: 1STATUS_STORAGE_REPLICATION_FACTOR: 1depends_on:- kafkanetworks:YW-net:ipv4_address: *******
networks:net:external: true

2.添加kafka_server 和client配置文件

/etc/kafka/kafka_server_jaas.confKafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="****"password="*****"user_adminClient="******"user_adminBroker="*****";};KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="*****"password="*****";};

3.新增连接器,相当于监听某张表(项目启动后直接执行)

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" ip:8083/connectors/ -d '{"name": "connector","config": {"decimal.handling.mode": "string","connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql8","database.port": "3306","database.user": "root","database.password": "000000","database.server.id": "*不同的连接器不同*","topic.prefix": "cdc_","database.include.list": "inventory","schema.history.internal.kafka.bootstrap.servers": "kafka:9092","schema.history.internal.kafka.topic": "history_inventory","table.include.list": "监听具体的表,用,隔开","transforms": "convertTimezone","transforms.convertTimezone.type": "io.debezium.transforms.TimezoneConverter","transforms.convertTimezone.converted.timezone": "Asia/Shanghai","schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT","schema.history.internal.producer.sasl.mechanism": "PLAIN","schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"adminClient\" password=\"000000\";","schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT","schema.history.internal.consumer.sasl.mechanism": "PLAIN","schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"adminClient\" password=\"000000\";"}}';

4.连接器相关操作

1.查看所有连接器列表

curl http://localhost:8083/connectors


2.删除连接器

curl -X DELETE http://localhost:8083/connectors/<connector-name>


3.启动时全量同步一次

1.在连接器加入相关配置:"snapshot.mode": "when_needed"

2.强制同步一次(因为如果kafka的offset未情况会则不会去全量同步):"snapshot.mode": "initial"

3.如果上述两条都不起作用则修改name的名称,换一个 connector 名字,这种方式不需要清 offset topic,对其他 connector 没影响

4.其他方式(自测未生效)在配置中加入 "snapshot.signal.data.collection": "表名.signals",通过signal去触发全量同步
CREATE TABLE inventory.signals (id VARCHAR(64) PRIMARY KEY,type VARCHAR(64) NOT NULL,data VARCHAR(2048)
);

插入一条触发全量快照的记录

INSERT INTO inventory.signals (id, type, data)
VALUES ('1', 'execute-snapshot', '{"data-collections": ["库名.表名","库.表名"], "type": "incremental"}');

http://www.dtcms.com/a/405970.html

相关文章:

  • GTSAM 中自定义因子(Custom Factor)的详解和实战示例
  • 要建一个网站怎么做老板合作网站开发
  • 【Linux基础知识系列:第一百三十九篇】使用Bash编写函数提升脚本功能
  • 【MyBatis-Plus 动态数据源的默认行为】
  • GaussDB 和 openGauss 怎么区分?
  • 云服务器里的IP是什么意思,他们之间有什么关系?
  • @Transactional 事务注解
  • PaddleLabel百度飞桨Al Studio图像标注平台安装和使用指南(包冲突 using the ‘flask‘ extra、眼底医疗分割数据集演示)
  • 锦州网站建设工作如何快速网络推广
  • 科技网站建设公司wordpress必做
  • Webpack5 第二节
  • npm、pnpm、npx 三者的定位、核心差异和「什么时候该用谁」
  • 在 C# .NETCore 中使用 MongoDB(第 2 部分):使用过滤子句检索文档
  • AWS Quicksight实践:从零到可视化分析
  • 微服务注册中心 Spring Cloud Eureka是什么?
  • websocket链接
  • 【oceanbase】Oracle模式查看pl慢sql
  • 电子商务网站规划的流程网站备案申请模板
  • 旺道网站优化公众号怎么推广
  • 内存卡标识全解析:从存储到性能的密码
  • 动态的魔法:列表与条件渲染
  • 乐清联科网站建设wordpress divi 数据
  • ARM单片机中断及中断优先级管理详解
  • python软件操作
  • c++_day2
  • 数据通信与计算机网络-交换
  • 2026考研时间,定了
  • 转:Ubuntu20.04安装NVIDIA驱动+CUDA超详细安装指南
  • 软件系统设计课程-Day1-从用户投诉到系统需求
  • 飞浪网站建设网站开发毕业设计任务书