Kafka Connect实战:从环境搭建到全流程操作
引言
在深入了解Kafka Connect的架构与原理后,是时候将理论转化为实践。本篇博客将以实际操作场景为导向,带你完成Kafka Connect从环境搭建、内置连接器配置到任务管理与监控的全流程,助力你快速上手并落地数据集成项目。
一、环境搭建与基础配置
1.1 软件安装与版本选择
- Kafka安装:从Apache Kafka官网下载最新稳定版本(如
kafka_2.13-3.5.0
),解压后进入安装目录。确保系统已安装Java 8或更高版本,通过java -version
命令检查。
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
- Kafka Connect配置:Kafka Connect包含在Kafka安装包中,无需额外下载。主要配置文件位于
config
目录下,核心配置文件包括connect-standalone.properties
(单机模式)和connect-distributed.properties
(分布式模式)。生产环境建议使用分布式模式,开发测试可选择单机模式快速验证。
1.2 配置文件详解
单机模式配置(connect-standalone.properties
)
# Kafka集群地址
bootstrap.servers=localhost:9092
# 配置文件路径,可指定多个,逗号分隔
plugin.path=/path/to/kafka/plugins
# 数据转换格式,默认JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 偏移量存储方式,单机模式存于本地文件
offset.storage.file.filename=/tmp/connect.offsets
# 定期刷新偏移量存储的时间间隔
offset.flush.interval.ms=10000
分布式模式配置(connect-distributed.properties
)
# Kafka集群地址
bootstrap.servers=localhost:9092
# 配置文件路径
plugin.path=/path/to/kafka/plugins
# 数据转换格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 组ID,用于标识Connect集群
group.id=connect-cluster-1
# 偏移量存储主题
offset.storage.topic=__connect_offsets
# 配置存储主题
config.storage.topic=__connect_configs
# 状态存储主题
status.storage.topic=__connect_status
1.3 启动Kafka与Connect服务
- 启动Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动Kafka Broker:
bin/kafka-server-start.sh config/server.properties
- 单机模式启动Connect:
bin/connect-standalone.sh config/connect-standalone.properties config/source-connector-config.json config/sink-connector-config.json
- 分布式模式启动Connect:在每个Worker节点执行以下命令,确保
connect-distributed.properties
配置一致。
bin/connect-distributed.sh config/connect-distributed.properties
二、内置连接器实战应用
2.1 File Connector:文件数据同步
File Source Connector
将本地文件数据读取并写入Kafka主题。
- 创建配置文件
file-source-config.json
:
{"name": "file-source-connector","config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max": "1","file.path": "/path/to/input.txt","file.reader.class": "org.apache.kafka.connect.file.reader.SimpleLineReader","topic": "file-data-topic","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}
- 启动连接器(单机模式):
bin/connect-standalone.sh config/connect-standalone.properties file-source-config.json
- 验证:向
input.txt
写入数据,使用Kafka消费者查看file-data-topic
主题数据。
bin/kafka-console-consumer.sh --topic file-data-topic --bootstrap-server localhost:9092 --from-beginning
File Sink Connector
将Kafka主题数据写入本地文件。
- 创建配置文件
file-sink-config.json
:
{"name": "file-sink-connector","config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max": "1","file.path": "/path/to/output.txt","topics": "file-data-topic","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}
- 启动连接器,数据将从
file-data-topic
主题写入output.txt
文件。
2.2 JDBC Connector:数据库数据同步
以MySQL数据库为例,实现数据增量同步。
- 准备工作:
- 下载MySQL JDBC驱动(如
mysql-connector-java-8.0.26.jar
),放置在config/plugin.path
指定目录下。 - 创建测试表:
- 下载MySQL JDBC驱动(如
CREATE TABLE users (id INT PRIMARY KEY AUTO_INCREMENT,name VARCHAR(100),age INT,email VARCHAR(200)
);
JDBC Source Connector
- 创建配置文件
jdbc-source-config.json
:
{"name": "jdbc-source-connector","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:mysql://localhost:3306/mydb?user=root&password=123456","table.whitelist": "users","mode": "incrementing","incrementing.column.name": "id","topic.prefix": "jdbc-","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter"}
}
- 启动连接器,新插入的
users
表数据将同步到以jdbc-
为前缀的Kafka主题。
JDBC Sink Connector
将Kafka主题数据写入MySQL表。
- 创建配置文件
jdbc-sink-config.json
:
{"name": "jdbc-sink-connector","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","connection.url": "jdbc:mysql://localhost:3306/mydb?user=root&password=123456","topics": "jdbc-users","auto.create": "true","insert.mode": "upsert","pk.mode": "record_value","pk.fields": "id","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter"}
}
- 启动连接器,
jdbc-users
主题数据将写入users
表。
2.3 REST Connector:API数据交互
通过REST API实现数据与Kafka的双向传输。
- 创建REST Source Connector:
{"name": "rest-source-connector","config": {"connector.class": "io.confluent.connect.rest.RestSourceConnector","tasks.max": "1","connect.rest.source.uri": "https://api.example.com/data","connect.rest.method.name": "GET","topic.prefix": "rest-","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter"}
}
- 创建REST Sink Connector:
{"name": "rest-sink-connector","config": {"connector.class": "io.confluent.connect.rest.RestSinkConnector","tasks.max": "1","topics": "rest-data-topic","connect.rest.sink.url": "https://api.example.com/submit","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter"}
}
三、任务管理与监控
3.1 任务生命周期管理
- 查看任务状态:使用REST API获取连接器和任务状态。
curl -X GET http://localhost:8083/connectors
curl -X GET http://localhost:8083/connectors/jdbc-source-connector/status
- 暂停与重启任务:
# 暂停连接器
curl -X PUT -H "Content-Type: application/json" --data '{"pause": true}' http://localhost:8083/connectors/jdbc-source-connector/pause
# 重启连接器
curl -X PUT -H "Content-Type: application/json" --data '{"resume": true}' http://localhost:8083/connectors/jdbc-source-connector/resume
- 删除任务:
curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector
3.2 监控指标与日志分析
- JMX监控:通过JMX获取Connect运行指标,如
kafka.connect:type=WorkerSourceTaskManager,name=task-0
。可使用jconsole
或Prometheus + Grafana搭建可视化监控面板。 - 日志分析:Kafka Connect日志位于
logs
目录下,通过分析connect.log
排查任务故障,例如连接器配置错误、数据转换异常等问题。