当前位置: 首页 > news >正文

使用 Flink CDC Elasticsearch Pipeline Connector 打通 MySQL 与 Elasticsearch 的实时链路

1. 场景与整体 Pipeline 结构

我们先看一下官方提供的 最小可用 Pipeline,感受下整体长什么样:

source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: elasticsearchname: Elasticsearch Sinkhosts: http://127.0.0.1:9092,http://127.0.0.1:9093route:- source-table: adb.\.*sink-table: default_indexdescription: sync adb.\.* table to default_indexpipeline:name: MySQL to Elasticsearch Pipelineparallelism: 2

可以看到,Flink CDC Pipeline 的结构非常直观:

  • source:定义上游 MySQL 数据源及要捕获的表;
  • sink:定义 Elasticsearch 集群地址及写入行为;
  • route:定义从“源表”到“索引名”的映射规则;
  • pipeline:定义整个任务的一些全局属性,如名称、并行度等。

下面我们从 Source 开始,一步步往下看。

2. MySQL → Elasticsearch 的 Pipeline 示例解析

2.1 MySQL Source:多库多表变更捕获

source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404

关键配置说明:

  • type: mysql
    使用的是 Flink CDC 的 MySQL Source。

  • tables 支持多库 + 正则匹配:

    • adb.\.*:同步 adb 库下全部表;
    • bdb.user_table_[0-9]+:同步 user_table_0 / user_table_1 / ... 等分表;
    • [app|web].order_\.*:同步 appweb 两个库中所有以 order_ 开头的表。
  • server-id: 5401-5404
    使用 server-id 范围方便在并行执行时为每个 subtask 分配不同的 server-id。

这部分基本延续了 Flink CDC MySQL Source 的常规使用方式,不多展开。

2.2 Elasticsearch Sink:连接 ES 集群

sink:type: elasticsearchname: Elasticsearch Sinkhosts: http://127.0.0.1:9092,http://127.0.0.1:9093
  • type: elasticsearch
    告诉 Pipeline 使用 Elasticsearch Connector 作为 Data Sink。

  • hosts
    配置一个或多个 ES 节点的 HTTP 地址,逗号分隔。例如:

    hosts: http://es-node1:9200,http://es-node2:9200
    

在生产环境中,你通常会:

  • 同时指定多台 ES 节点,增强高可用与负载均衡;
  • 如果集群启用了认证,还需要补充 username / password(后文有表格说明)。

2.3 route:源表 → 索引名的映射

route:- source-table: adb.\.*sink-table: default_indexdescription: sync adb.\.* table to default_index

这是 Pipeline 模式里非常实用的一块配置:

  • source-table:使用正则选择一批源表;
  • sink-table:指定它们写入的目标索引名;
  • description:纯说明文字,方便人类阅读。

上面的配置含义是:

所有匹配 adb.\.* 的表,统一写入 Elasticsearch 索引 default_index

如果你不配置 route,那么默认的索引名会使用 namespace.schemaName.tableName 的形式(即 TableId 字符串),这在多源多表场景下可能会有很多小索引,不一定符合你的设计;通过 route 可以:

  • 多表合并写入一个索引;
  • 或者重命名索引名称,使用更符合业务语义的名字。

2.4 pipeline:任务全局属性

pipeline:name: MySQL to Elasticsearch Pipelineparallelism: 2
  • name:任务名称,主要用于日志和监控展示;
  • parallelism:Flink CDC Pipeline 的整体并行度,对 Source 和 Sink 的吞吐都有直接影响。

3. Elasticsearch Pipeline Connector 配置项详解

官方文档给了一张「Pipeline Connector Options」表,这里不逐行翻译,而是按功能分组讲一下。

3.1 基础连接与版本

配置项是否必填默认值类型说明
typerequired(none)String固定为 elasticsearch
nameoptional(none)StringSink 名字,主要用于标识
hostsrequired(none)String一个或多个 ES 地址,逗号分隔,如 http://es1:9200,http://es2:9200
versionoptional7IntegerES 版本:6 / 7 / 8
usernameoptional(none)StringElasticsearch 用户名
passwordoptional(none)StringElasticsearch 密码

示例:

sink:type: elasticsearchname: ES Sinkhosts: http://es1:9200,http://es2:9200version: 8username: elasticpassword: my-secret

注意version 要和真实 ES 集群版本匹配,否则某些底层 API 行为可能不一致。

3.2 批量写入 & 缓冲策略

Elasticsearch 写入性能高度依赖 bulk 请求的配置。Connector 提供了一组参数,控制“攒多少再写、写多大的包、在飞多少请求”等:

配置项默认值类型说明
batch.size.max500Integer每个 bulk 请求最多包含多少 action;设为 0 表示不按条数限制
inflight.requests.max5Integer允许同时在飞(还没返回)的请求数量
buffered.requests.max1000Integer内存中最多缓存多少个待发送请求
batch.size.max.bytes5242880Long单个 bulk 请求最大字节数(约 5MB)
buffer.time.max.ms5000Long批次最长等待时间,超过就强制 flush(毫秒)
record.size.max.bytes10485760Long单条记录允许的最大字节数(约 10MB)

粗略的调优思路:

  • 吞吐优先

    • 增大 batch.size.max(如 1000–2000);
    • 适度提高 batch.size.max.bytes(如 10–20MB);
    • 保持 buffer.time.max.ms 在 1–3 秒之间。
  • 延迟优先

    • 降低 batch.size.max(200–500);
    • buffer.time.max.ms 调小(500–1000ms),保证数据尽快写入。
  • 内存安全

    • 留意 buffered.requests.maxrecord.size.max.bytes,防止超大文档把内存打爆;
    • 如果上游有大 JSON 或大文本字段,建议严格控制字段大小或做预处理。

3.3 分片后缀:sharding.suffix.key & sharding.suffix.separator

文档中还有两个和“分片后缀”相关的配置:

  • sharding.suffix.key
  • sharding.suffix.separator(默认 _

它们的核心用途是:
允许你按某个字段对目标索引做“分片后缀”拼接,从而实现类似“按字段分表”的效果

配置说明里给了一个思路:

默认 sink 表名为 test_table${suffix_key}
默认分片列是第一个 partition 列。
多个规则用 ; 分隔,表名与列名用 : 分隔,例如:table1:col1;table2:col2

虽然描述是“table”,但在 ES 场景下也可以理解为对索引名进行后缀拼接。你可以根据业务的分区需求,自定义 suffix 和 separator 来构造:

  • index_2025-11-14
  • orders-202510

具体使用时需要结合实际 Connector 实现与 ES 索引命名约束来验证。

4. 使用注意事项:索引名、自动建表与路由

官方 Usage Notes 有两句非常关键的话。

4.1 默认索引名与 route 重写

The written index of Elasticsearch will be namespace.schemaName.tableName string of TableId, this can be changed using route function of pipeline.

意思是:

  • 如果你不配置 route,那每个表会被写入一个默认索引:namespace.schema.table
  • 想要自定义索引名(比如统一写到 order_index),就要通过 route 来映射。

例如:

route:- source-table: app\.order_detailsink-table: order_detail_indexdescription: 同步 app.order_detail -> ES 索引 order_detail_index

在多库多表场景下,一般建议显式写明 route,避免默认索引名太“技术味”,也更方便做索引合并与分流。

4.2 不支持自动创建 Elasticsearch 索引

No support for automatic Elasticsearch index creation.

这点和 StarRocks / Doris Sink 有明显不同:

  • StarRocks / Doris 可以根据上游 schema 自动建表;
  • Elasticsearch Connector 不会帮你建索引

这意味着你需要:

  1. 在 ES 中提前创建好索引;
  2. 设计好字段的 mapping(text / keyword / date / long / …);
  3. 规划好分片数、副本数、分析器等。

从工程角度看,这反而是一件好事 —— ES 的索引结构非常关键,自动生成往往不符合搜索需求,自己设计更可控。

5. 数据类型映射:CDC 类型 → JSON → Elasticsearch

Elasticsearch 底层存储的是 JSON 文档,因此这里的类型转换实际上是:

Flink CDC 类型 → JSON 类型
再由 ES mapping 去解析 JSON 字段。

官方给出的映射表如下:

CDC 类型JSON 类型说明
TINYINTNUMBER
SMALLINTNUMBER
INTNUMBER
BIGINTNUMBER
FLOATNUMBER
DOUBLENUMBER
DECIMAL(p, s)STRING用字符串表示小数,避免精度损失
BOOLEANBOOLEAN
DATESTRING格式:yyyy-MM-dd,例:2024-10-21
TIMESTAMPSTRING格式:yyyy-MM-dd HH:mm:ss.SSSSSS,UTC 时区
TIMESTAMP_LTZSTRING同上
CHAR(n)STRING
VARCHAR(n)STRING
ARRAYARRAY
MAPSTRING序列化成 JSON 字符串
ROWSTRING序列化成 JSON 字符串

几个要特别注意的地方:

5.1 DECIMAL → STRING

DECIMAL 会被转成字符串,以避免精度问题。
如果你在 ES 映射中硬把它映射成 doublescaled_float,要注意可能会有精度丢失。

实践中比较常见的做法:

  • 金额以“分”为单位,在上游就转成 BIGINT(long);
  • 或者在 ES 中仍按照字符串存储金额,只在查询端做转换。

5.2 时间类型全部是字符串

DATE / TIMESTAMP / TIMESTAMP_LTZ 最终都是字符串:

  • DATEyyyy-MM-dd
  • TIMESTAMPyyyy-MM-dd HH:mm:ss.SSSSSS(UTC)

你需要在索引 mapping 中指定对应的 date 类型和 format,例如:

"order_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSSSSS||epoch_millis"
}

否则 ES 会把它当作普通字符串,时间排序、范围过滤、聚合都会出问题。

5.3 MAP / ROW → JSON 字符串

对于 MAP 和 ROW 这类复杂结构:

  • Connector 会把它们序列化为 JSON 字符串;
  • 如果你希望在 ES 里对内部字段做精细查询或聚合,建议在上游就“摊平”结构,或者自己写一层中间加工逻辑。

6. 实战调优建议

结合上面的配置和类型映射,简单给几点落地建议:

  1. 索引设计先行

    • 根据业务访问路径设计索引结构,而不是简单“表 → 一模一样的索引”;
    • 对于高频文本检索字段,用 text + keyword 双字段模式;
    • 时间字段一定要用 date 类型并配置好 format
  2. 吞吐 vs 延迟:根据业务需求设定批量参数

    • 报表统计 / 后台检索:可以偏吞吐,增大批次和 buffer;
    • 用户实时检索:偏延迟,将 buffer.time.max.ms 等参数调小。
  3. 利用 route 做索引合并或分流

    • 多个结构类似的小表可以合并到一个大索引,减少 index 数量;
    • 也可以按业务线、场景拆索引,例如:app_orders_indexweb_orders_index
  4. 提前做好监控与告警

    • Flink CDC 任务的失败率、重启次数;
    • Elasticsearch 集群的写入 QPS、Bulk 请求耗时、Rejected 数量;
    • 一旦 ES 集群出现写入压力,Connector 端的 bulk 请求就可能频繁重试或堆积。

7. 总结

Elasticsearch Pipeline Connector 帮我们把一件本来很“啰嗦”的事情,变成了几段 YAML 配置:

  • source 捕获 MySQL binlog
  • sink 连接 Elasticsearch 集群
  • route 决定源表与索引的映射关系
  • 一套批量写入、缓冲和并发策略,统一交给 Connector 管理
http://www.dtcms.com/a/617863.html

相关文章:

  • 基于视频识别的大模型项目实战心得
  • Firefly-Modeler 体积雕刻:AI 概念到 3D 基模
  • 提示词工程 - (2) 指南
  • 网络安全 | 深入理解SQL注入的原理和防范
  • python之循环导入
  • 强杀服务、重启系统及断电对 TDengine 影响
  • Odoo 19 制造与会计集成深度解析报告
  • 免费网站软件正能量医院网站建设方案计划书
  • 软件架构趋势:云原生与大模型的融合与重塑
  • 做网站会员登陆长春网站运做思路
  • 排序java
  • Substance 3D Stager:电商“虚拟摄影”工作流
  • 实验题辅导
  • 【Python TensorFlow】BiTCN-BiLSTM双向时间序列卷积双向长短期记忆神经网络时序预测算法(附代码)
  • 番禺制作网站平台邢台123信息网
  • 网页制作软件有那些石家庄seo网站排名
  • 高级边界扫描 --6-- Silicon Nail测试调试
  • Linux 序列化技术、自定义协议实现及守护进程
  • 【Javaweb学习|黑马笔记|Day5】Web后端基础|java操作数据库
  • ArcGIS地统计综合实战 | 洛杉矶臭氧浓度预测-pretict-pretictable-pretiction
  • 【Agent零基础入门课程】告别黑盒:HelloAgents架构深度解析
  • PyTorch 零基础入门:从张量到 GPU 加速完全指南
  • Gradient Accumulation (梯度累积) in PyTorch
  • C++ 哈希表 常用接口总结 力扣 1. 两数之和 每日一题 题解
  • 百度云可以做网站吗wordpress文学模版
  • 数据库高可用架构-分表分库
  • C# 1116 流程控制 常量
  • ASC学习笔记0022:在不打算修改属性集时访问生成的属性集
  • 国外简约企业网站大连做环评网站
  • 【实际项目3】C#把文件夹中的RGB图片变为Gray图片