通过kafka-connect 实现debezium数据监听采集
1.运行kafka和kafka-connect
services:kafka:image: kafka:4.0.0container_name: kafkaports:- "9092:9092"- "9093:9093"environment:KAFKA_CLUSTER_ID: ""KAFKA_INITAL_CONTROLLERS: CONTROLLERKAFKA_CFG_NODE_ID: 1KAFKA_CFG_PROCESS_ROLES: 'broker,controller'KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094KAFKA_CFG_LISTENERS: INTERNAL://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://127.0.0.1:9093KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXTKAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.confKAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAINKAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINKAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAINKAFKA_INTER_BROKER_USER: ******KAFKA_INTER_BROKER_PASSWORD: ******KAFKA_CLIENT_USERS: adminClientKAFKA_CLIENT_PASSWORDS: ******KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizerKAFKA_SUPER_USERS: User:adminClientKAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"volumes:- /etc/kafka/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf:ro- /root/****/kafka_data:/bitnami/kafkanetworks:net:ipv4_address: ******(ip)kafka-connect:image: kafka-connect:3.2container_name: kafka-connectports:- "8083:8083"environment:GROUP_ID: kafka_connect_groupCONFIG_STORAGE_TOPIC: connect_configsOFFSET_STORAGE_TOPIC: connect_offsetsSTATUS_STORAGE_TOPIC: connect_statuses# kafka的IP:PortBOOTSTRAP_SERVERS: kafka:9092# mysql的IP:PortMYSQL_HOST: ******:3306CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXTCONNECT_SASL_MECHANISM: PLAINCONNECT_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="adminBroker" password="*******";'CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXTCONNECT_PRODUCER_SASL_MECHANISM: PLAINCONNECT_PRODUCER_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="******";'CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_PLAINTEXTCONNECT_CONSUMER_SASL_MECHANISM: PLAINCONNECT_CONSUMER_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="******";'CONFIG_STORAGE_REPLICATION_FACTOR: 1OFFSET_STORAGE_REPLICATION_FACTOR: 1STATUS_STORAGE_REPLICATION_FACTOR: 1depends_on:- kafkanetworks:YW-net:ipv4_address: *******
networks:net:external: true
2.添加kafka_server 和client配置文件
/etc/kafka/kafka_server_jaas.confKafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="****"password="*****"user_adminClient="******"user_adminBroker="*****";};KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="*****"password="*****";};
3.新增连接器,相当于监听某张表(项目启动后直接执行)
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" ip:8083/connectors/ -d '{"name": "connector","config": {"decimal.handling.mode": "string","connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql8","database.port": "3306","database.user": "root","database.password": "000000","database.server.id": "*不同的连接器不同*","topic.prefix": "cdc_","database.include.list": "inventory","schema.history.internal.kafka.bootstrap.servers": "kafka:9092","schema.history.internal.kafka.topic": "history_inventory","table.include.list": "监听具体的表,用,隔开","transforms": "convertTimezone","transforms.convertTimezone.type": "io.debezium.transforms.TimezoneConverter","transforms.convertTimezone.converted.timezone": "Asia/Shanghai","schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT","schema.history.internal.producer.sasl.mechanism": "PLAIN","schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"adminClient\" password=\"000000\";","schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT","schema.history.internal.consumer.sasl.mechanism": "PLAIN","schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"adminClient\" password=\"000000\";"}}';
4.连接器相关操作
1.查看所有连接器列表
curl http://localhost:8083/connectors
2.删除连接器
curl -X DELETE http://localhost:8083/connectors/<connector-name>
3.启动时全量同步一次
1.在连接器加入相关配置:"snapshot.mode": "when_needed"
2.强制同步一次(因为如果kafka的offset未情况会则不会去全量同步):"snapshot.mode": "initial"
3.如果上述两条都不起作用则修改name的名称,换一个 connector 名字,这种方式不需要清 offset topic,对其他 connector 没影响
4.其他方式(自测未生效)在配置中加入 "snapshot.signal.data.collection": "表名.signals",通过signal去触发全量同步
CREATE TABLE inventory.signals (id VARCHAR(64) PRIMARY KEY,type VARCHAR(64) NOT NULL,data VARCHAR(2048)
);
插入一条触发全量快照的记录:
INSERT INTO inventory.signals (id, type, data)
VALUES ('1', 'execute-snapshot', '{"data-collections": ["库名.表名","库.表名"], "type": "incremental"}');