当前位置: 首页 > news >正文

kafka connect 大概了解

kafka connect

Introduction

Kafka Connect is the component of Kafka that provides data integration between databases, key-value stores, search indexes, file systems, and Kafka brokers.

kafka connect 是一个框架,用来帮助集成其他系统的数据到kafka,或者将kafka 的数据吐到其他系统,例如数据库,Elasticsearch 之类的外部系统。

在kafka connect 里,有 kafka woker, kafka connectorkafka connect plugin的概念。

一个kafka worker 就是一个instance,类似一个pod 这种独立的实例。

kafka connector 就是用来搬运数据的连接器。有 source connector 和 sink connector.

Source 就是数据来源的connector,sink 就是吐出数据的 connector.

例如

Connector TypeNamePurpose
SourceJdbcSourceConnectorPull data from relational DBs
SinkElasticsearchSinkConnectorPush Kafka data to Elasticsearch
SourceFileStreamSourceConnectorRead lines from file into Kafka

connector 简略配置

{"name": "my-jdbc-source","connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:mysql://localhost:3306/mydb","topic.prefix": "mysql-"
}

A Kafka Connect Plugin is the packaged code (JAR files) that implements one or more connectors.

It’s usually installed by placing the plugin into Kafka Connect’s plugin.path directory.

A plugin might include:

  • The connector logic

  • Converters (e.g., JSON, Avro)

  • Transformations (optional logic to modify data)

Think of a connector as a configuration, and a plugin as the actual implementation that makes it work.

实现结构

从部署的角度来看,kafka connect 是一个独立的service cluster。
下面的docker-compose.yml 配置可以看出 cp-kafka-connect 这个image 就可可以load 在 CONNECT_PLUGIN_PATH 配置目录下的 connect plugin 来实现不同的 connector 功能。

confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1 这个命令就是部署mysql 相关的source connector 到 CONNECT_PLUGIN_PATH 配置的目录usr/share/confluent-hub-components 下面。

kafka-connect:image: confluentinc/cp-kafka-connect:7.1.0-1-ubi8environment:CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-componentscommand:- bash- -c- |confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1/etc/confluent/docker/run

然后再通过kafka connect 的REST API 就可以enable 这个connector 了。

http://www.dtcms.com/a/191084.html

相关文章:

  • idea挂掉,会导致进程不结束,切换profile环境,导致token认证不通过
  • Linux Bash | Capture Output / Recall
  • Android Studio Meerkat与Gradle构建工具升级实战指南
  • 同设备访问php的多个接口会有先后等待问题
  • 电机的导程和脉冲之间的关系
  • Rust入门之高级Trait
  • 嵌入式学习笔记DAY20(链表,gdb调试)
  • 前端~三维地图(cesium)动态材质飞线
  • 香橙派/树莓派读取GY39数据
  • QMK键盘编码器(Encoder)(理论部分)
  • 实战案例:采集 51job 企业招聘信息
  • 安卓system/文件夹下的哪些文件夹可以修改为别的设备的
  • SRM电子采购管理系统:Java+Vue,集成供应商管理,实现采购流程数字化与协同优化
  • 麒麟环境下Selenium的使用
  • 如何高效集成MySQL数据到金蝶云星空
  • Spring的 @Validate注解详细分析
  • 力扣-108.将有序数组转换为二叉搜索树
  • idea 启动Springboot项目在编译阶段报错:java: OutOfMemoryError: insufficient memory
  • VS Code 新旧版本 Remote-SSH 内网离线连接服务器方法(版本 ≤ 1.78.x 及 ≥ 1.79.0)
  • 实验五:以太网UDP全协议栈的实现(通过远程实验系统)
  • 代码随想录算法训练营Day58
  • CSP信奥赛新增的算法-马拉车算法(Manacher‘s Algorithm)
  • 初识java
  • git切换分支后需要pull吗
  • cGAS-STING通路
  • 解决 TypeError: unsupported operand type(s) for -: ‘NoneType‘ and ‘float‘ 错误
  • 前端服务器部署分类总结
  • 带直流音量控制的立体声音频功率放大器—D2668
  • 第二十四天打卡
  • 服务器被打了怎么应对