kafka connect 大概了解
kafka connect
Introduction
Kafka Connect is the component of Kafka that provides data integration between databases, key-value stores, search indexes, file systems, and Kafka brokers.
kafka connect 是一个框架,用来帮助集成其他系统的数据到kafka,或者将kafka 的数据吐到其他系统,例如数据库,Elasticsearch 之类的外部系统。
在kafka connect 里,有 kafka woker
, kafka connector
,kafka connect plugin
的概念。
一个kafka worker 就是一个instance,类似一个pod 这种独立的实例。
kafka connector 就是用来搬运数据的连接器。有 source connector 和 sink connector.
Source 就是数据来源的connector,sink 就是吐出数据的 connector.
例如
Connector Type | Name | Purpose |
---|---|---|
Source | JdbcSourceConnector | Pull data from relational DBs |
Sink | ElasticsearchSinkConnector | Push Kafka data to Elasticsearch |
Source | FileStreamSourceConnector | Read lines from file into Kafka |
connector 简略配置
{"name": "my-jdbc-source","connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:mysql://localhost:3306/mydb","topic.prefix": "mysql-"
}
A Kafka Connect Plugin is the packaged code (JAR files) that implements one or more connectors.
It’s usually installed by placing the plugin into Kafka Connect’s plugin.path directory.
A plugin might include:
-
The connector logic
-
Converters (e.g., JSON, Avro)
-
Transformations (optional logic to modify data)
Think of a connector as a configuration, and a plugin as the actual implementation that makes it work.
实现结构
从部署的角度来看,kafka connect 是一个独立的service cluster。
下面的docker-compose.yml 配置可以看出 cp-kafka-connect
这个image 就可可以load 在 CONNECT_PLUGIN_PATH
配置目录下的 connect plugin 来实现不同的 connector 功能。
而 confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1
这个命令就是部署mysql 相关的source connector 到 CONNECT_PLUGIN_PATH
配置的目录usr/share/confluent-hub-components
下面。
kafka-connect:image: confluentinc/cp-kafka-connect:7.1.0-1-ubi8environment:CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-componentscommand:- bash- -c- |confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1/etc/confluent/docker/run
然后再通过kafka connect 的REST API 就可以enable 这个connector 了。