当前位置: 首页 > news >正文

kafka connect 大概了解

kafka connect

Introduction

Kafka Connect is the component of Kafka that provides data integration between databases, key-value stores, search indexes, file systems, and Kafka brokers.

kafka connect 是一个框架,用来帮助集成其他系统的数据到kafka,或者将kafka 的数据吐到其他系统,例如数据库,Elasticsearch 之类的外部系统。

在kafka connect 里,有 kafka woker, kafka connectorkafka connect plugin的概念。

一个kafka worker 就是一个instance,类似一个pod 这种独立的实例。

kafka connector 就是用来搬运数据的连接器。有 source connector 和 sink connector.

Source 就是数据来源的connector,sink 就是吐出数据的 connector.

例如

Connector TypeNamePurpose
SourceJdbcSourceConnectorPull data from relational DBs
SinkElasticsearchSinkConnectorPush Kafka data to Elasticsearch
SourceFileStreamSourceConnectorRead lines from file into Kafka

connector 简略配置

{"name": "my-jdbc-source","connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:mysql://localhost:3306/mydb","topic.prefix": "mysql-"
}

A Kafka Connect Plugin is the packaged code (JAR files) that implements one or more connectors.

It’s usually installed by placing the plugin into Kafka Connect’s plugin.path directory.

A plugin might include:

  • The connector logic

  • Converters (e.g., JSON, Avro)

  • Transformations (optional logic to modify data)

Think of a connector as a configuration, and a plugin as the actual implementation that makes it work.

实现结构

从部署的角度来看,kafka connect 是一个独立的service cluster。
下面的docker-compose.yml 配置可以看出 cp-kafka-connect 这个image 就可可以load 在 CONNECT_PLUGIN_PATH 配置目录下的 connect plugin 来实现不同的 connector 功能。

confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1 这个命令就是部署mysql 相关的source connector 到 CONNECT_PLUGIN_PATH 配置的目录usr/share/confluent-hub-components 下面。

kafka-connect:image: confluentinc/cp-kafka-connect:7.1.0-1-ubi8environment:CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-componentscommand:- bash- -c- |confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1/etc/confluent/docker/run

然后再通过kafka connect 的REST API 就可以enable 这个connector 了。

相关文章:

  • idea挂掉,会导致进程不结束,切换profile环境,导致token认证不通过
  • Linux Bash | Capture Output / Recall
  • Android Studio Meerkat与Gradle构建工具升级实战指南
  • 同设备访问php的多个接口会有先后等待问题
  • 电机的导程和脉冲之间的关系
  • Rust入门之高级Trait
  • 嵌入式学习笔记DAY20(链表,gdb调试)
  • 前端~三维地图(cesium)动态材质飞线
  • 香橙派/树莓派读取GY39数据
  • QMK键盘编码器(Encoder)(理论部分)
  • 实战案例:采集 51job 企业招聘信息
  • 安卓system/文件夹下的哪些文件夹可以修改为别的设备的
  • SRM电子采购管理系统:Java+Vue,集成供应商管理,实现采购流程数字化与协同优化
  • 麒麟环境下Selenium的使用
  • 如何高效集成MySQL数据到金蝶云星空
  • Spring的 @Validate注解详细分析
  • 力扣-108.将有序数组转换为二叉搜索树
  • idea 启动Springboot项目在编译阶段报错:java: OutOfMemoryError: insufficient memory
  • VS Code 新旧版本 Remote-SSH 内网离线连接服务器方法(版本 ≤ 1.78.x 及 ≥ 1.79.0)
  • 实验五:以太网UDP全协议栈的实现(通过远程实验系统)
  • 因港而兴,“长江黄金水道”上的宜宾故事
  • 端午假期购票日历发布,今日可购买5月29日火车票
  • 白玉兰奖征片综述丨动画的IP生命力
  • “异常”只停留在医院里,用艺术为“泡泡宝贝”加油
  • 人民日报评外卖平台被约谈:摒弃恶性竞争,实现行业健康发展
  • 沈阳一超市疑借领养名义烹食流浪狗,当地市监局:已收到多起投诉