使用 Flink CDC Elasticsearch Pipeline Connector 打通 MySQL 与 Elasticsearch 的实时链路
1. 场景与整体 Pipeline 结构
我们先看一下官方提供的 最小可用 Pipeline,感受下整体长什么样:
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: elasticsearchname: Elasticsearch Sinkhosts: http://127.0.0.1:9092,http://127.0.0.1:9093route:- source-table: adb.\.*sink-table: default_indexdescription: sync adb.\.* table to default_indexpipeline:name: MySQL to Elasticsearch Pipelineparallelism: 2
可以看到,Flink CDC Pipeline 的结构非常直观:
source:定义上游 MySQL 数据源及要捕获的表;sink:定义 Elasticsearch 集群地址及写入行为;route:定义从“源表”到“索引名”的映射规则;pipeline:定义整个任务的一些全局属性,如名称、并行度等。
下面我们从 Source 开始,一步步往下看。
2. MySQL → Elasticsearch 的 Pipeline 示例解析
2.1 MySQL Source:多库多表变更捕获
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404
关键配置说明:
-
type: mysql
使用的是 Flink CDC 的 MySQL Source。 -
tables支持多库 + 正则匹配:adb.\.*:同步adb库下全部表;bdb.user_table_[0-9]+:同步user_table_0 / user_table_1 / ...等分表;[app|web].order_\.*:同步app和web两个库中所有以order_开头的表。
-
server-id: 5401-5404
使用 server-id 范围方便在并行执行时为每个 subtask 分配不同的 server-id。
这部分基本延续了 Flink CDC MySQL Source 的常规使用方式,不多展开。
2.2 Elasticsearch Sink:连接 ES 集群
sink:type: elasticsearchname: Elasticsearch Sinkhosts: http://127.0.0.1:9092,http://127.0.0.1:9093
-
type: elasticsearch
告诉 Pipeline 使用 Elasticsearch Connector 作为 Data Sink。 -
hosts
配置一个或多个 ES 节点的 HTTP 地址,逗号分隔。例如:hosts: http://es-node1:9200,http://es-node2:9200
在生产环境中,你通常会:
- 同时指定多台 ES 节点,增强高可用与负载均衡;
- 如果集群启用了认证,还需要补充
username/password(后文有表格说明)。
2.3 route:源表 → 索引名的映射
route:- source-table: adb.\.*sink-table: default_indexdescription: sync adb.\.* table to default_index
这是 Pipeline 模式里非常实用的一块配置:
source-table:使用正则选择一批源表;sink-table:指定它们写入的目标索引名;description:纯说明文字,方便人类阅读。
上面的配置含义是:
所有匹配
adb.\.*的表,统一写入 Elasticsearch 索引default_index。
如果你不配置 route,那么默认的索引名会使用 namespace.schemaName.tableName 的形式(即 TableId 字符串),这在多源多表场景下可能会有很多小索引,不一定符合你的设计;通过 route 可以:
- 多表合并写入一个索引;
- 或者重命名索引名称,使用更符合业务语义的名字。
2.4 pipeline:任务全局属性
pipeline:name: MySQL to Elasticsearch Pipelineparallelism: 2
name:任务名称,主要用于日志和监控展示;parallelism:Flink CDC Pipeline 的整体并行度,对 Source 和 Sink 的吞吐都有直接影响。
3. Elasticsearch Pipeline Connector 配置项详解
官方文档给了一张「Pipeline Connector Options」表,这里不逐行翻译,而是按功能分组讲一下。
3.1 基础连接与版本
| 配置项 | 是否必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
type | required | (none) | String | 固定为 elasticsearch |
name | optional | (none) | String | Sink 名字,主要用于标识 |
hosts | required | (none) | String | 一个或多个 ES 地址,逗号分隔,如 http://es1:9200,http://es2:9200 |
version | optional | 7 | Integer | ES 版本:6 / 7 / 8 |
username | optional | (none) | String | Elasticsearch 用户名 |
password | optional | (none) | String | Elasticsearch 密码 |
示例:
sink:type: elasticsearchname: ES Sinkhosts: http://es1:9200,http://es2:9200version: 8username: elasticpassword: my-secret
注意:version 要和真实 ES 集群版本匹配,否则某些底层 API 行为可能不一致。
3.2 批量写入 & 缓冲策略
Elasticsearch 写入性能高度依赖 bulk 请求的配置。Connector 提供了一组参数,控制“攒多少再写、写多大的包、在飞多少请求”等:
| 配置项 | 默认值 | 类型 | 说明 |
|---|---|---|---|
batch.size.max | 500 | Integer | 每个 bulk 请求最多包含多少 action;设为 0 表示不按条数限制 |
inflight.requests.max | 5 | Integer | 允许同时在飞(还没返回)的请求数量 |
buffered.requests.max | 1000 | Integer | 内存中最多缓存多少个待发送请求 |
batch.size.max.bytes | 5242880 | Long | 单个 bulk 请求最大字节数(约 5MB) |
buffer.time.max.ms | 5000 | Long | 批次最长等待时间,超过就强制 flush(毫秒) |
record.size.max.bytes | 10485760 | Long | 单条记录允许的最大字节数(约 10MB) |
粗略的调优思路:
-
吞吐优先:
- 增大
batch.size.max(如 1000–2000); - 适度提高
batch.size.max.bytes(如 10–20MB); - 保持
buffer.time.max.ms在 1–3 秒之间。
- 增大
-
延迟优先:
- 降低
batch.size.max(200–500); - 将
buffer.time.max.ms调小(500–1000ms),保证数据尽快写入。
- 降低
-
内存安全:
- 留意
buffered.requests.max和record.size.max.bytes,防止超大文档把内存打爆; - 如果上游有大 JSON 或大文本字段,建议严格控制字段大小或做预处理。
- 留意
3.3 分片后缀:sharding.suffix.key & sharding.suffix.separator
文档中还有两个和“分片后缀”相关的配置:
sharding.suffix.keysharding.suffix.separator(默认_)
它们的核心用途是:
允许你按某个字段对目标索引做“分片后缀”拼接,从而实现类似“按字段分表”的效果。
配置说明里给了一个思路:
默认 sink 表名为
test_table${suffix_key}。
默认分片列是第一个 partition 列。
多个规则用;分隔,表名与列名用:分隔,例如:table1:col1;table2:col2。
虽然描述是“table”,但在 ES 场景下也可以理解为对索引名进行后缀拼接。你可以根据业务的分区需求,自定义 suffix 和 separator 来构造:
index_2025-11-14orders-202510- …
具体使用时需要结合实际 Connector 实现与 ES 索引命名约束来验证。
4. 使用注意事项:索引名、自动建表与路由
官方 Usage Notes 有两句非常关键的话。
4.1 默认索引名与 route 重写
The written index of Elasticsearch will be
namespace.schemaName.tableNamestring of TableId, this can be changed using route function of pipeline.
意思是:
- 如果你不配置
route,那每个表会被写入一个默认索引:namespace.schema.table; - 想要自定义索引名(比如统一写到
order_index),就要通过route来映射。
例如:
route:- source-table: app\.order_detailsink-table: order_detail_indexdescription: 同步 app.order_detail -> ES 索引 order_detail_index
在多库多表场景下,一般建议显式写明 route,避免默认索引名太“技术味”,也更方便做索引合并与分流。
4.2 不支持自动创建 Elasticsearch 索引
No support for automatic Elasticsearch index creation.
这点和 StarRocks / Doris Sink 有明显不同:
- StarRocks / Doris 可以根据上游 schema 自动建表;
- Elasticsearch Connector 不会帮你建索引。
这意味着你需要:
- 在 ES 中提前创建好索引;
- 设计好字段的
mapping(text / keyword / date / long / …); - 规划好分片数、副本数、分析器等。
从工程角度看,这反而是一件好事 —— ES 的索引结构非常关键,自动生成往往不符合搜索需求,自己设计更可控。
5. 数据类型映射:CDC 类型 → JSON → Elasticsearch
Elasticsearch 底层存储的是 JSON 文档,因此这里的类型转换实际上是:
Flink CDC 类型 → JSON 类型
再由 ESmapping去解析 JSON 字段。
官方给出的映射表如下:
| CDC 类型 | JSON 类型 | 说明 |
|---|---|---|
| TINYINT | NUMBER | |
| SMALLINT | NUMBER | |
| INT | NUMBER | |
| BIGINT | NUMBER | |
| FLOAT | NUMBER | |
| DOUBLE | NUMBER | |
| DECIMAL(p, s) | STRING | 用字符串表示小数,避免精度损失 |
| BOOLEAN | BOOLEAN | |
| DATE | STRING | 格式:yyyy-MM-dd,例:2024-10-21 |
| TIMESTAMP | STRING | 格式:yyyy-MM-dd HH:mm:ss.SSSSSS,UTC 时区 |
| TIMESTAMP_LTZ | STRING | 同上 |
| CHAR(n) | STRING | |
| VARCHAR(n) | STRING | |
| ARRAY | ARRAY | |
| MAP | STRING | 序列化成 JSON 字符串 |
| ROW | STRING | 序列化成 JSON 字符串 |
几个要特别注意的地方:
5.1 DECIMAL → STRING
DECIMAL 会被转成字符串,以避免精度问题。
如果你在 ES 映射中硬把它映射成 double 或 scaled_float,要注意可能会有精度丢失。
实践中比较常见的做法:
- 金额以“分”为单位,在上游就转成
BIGINT(long); - 或者在 ES 中仍按照字符串存储金额,只在查询端做转换。
5.2 时间类型全部是字符串
DATE / TIMESTAMP / TIMESTAMP_LTZ 最终都是字符串:
DATE:yyyy-MM-ddTIMESTAMP:yyyy-MM-dd HH:mm:ss.SSSSSS(UTC)
你需要在索引 mapping 中指定对应的 date 类型和 format,例如:
"order_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSSSSS||epoch_millis"
}
否则 ES 会把它当作普通字符串,时间排序、范围过滤、聚合都会出问题。
5.3 MAP / ROW → JSON 字符串
对于 MAP 和 ROW 这类复杂结构:
- Connector 会把它们序列化为 JSON 字符串;
- 如果你希望在 ES 里对内部字段做精细查询或聚合,建议在上游就“摊平”结构,或者自己写一层中间加工逻辑。
6. 实战调优建议
结合上面的配置和类型映射,简单给几点落地建议:
-
索引设计先行
- 根据业务访问路径设计索引结构,而不是简单“表 → 一模一样的索引”;
- 对于高频文本检索字段,用
text + keyword双字段模式; - 时间字段一定要用
date类型并配置好format。
-
吞吐 vs 延迟:根据业务需求设定批量参数
- 报表统计 / 后台检索:可以偏吞吐,增大批次和 buffer;
- 用户实时检索:偏延迟,将
buffer.time.max.ms等参数调小。
-
利用 route 做索引合并或分流
- 多个结构类似的小表可以合并到一个大索引,减少 index 数量;
- 也可以按业务线、场景拆索引,例如:
app_orders_index、web_orders_index。
-
提前做好监控与告警
- Flink CDC 任务的失败率、重启次数;
- Elasticsearch 集群的写入 QPS、Bulk 请求耗时、Rejected 数量;
- 一旦 ES 集群出现写入压力,Connector 端的 bulk 请求就可能频繁重试或堆积。
7. 总结
Elasticsearch Pipeline Connector 帮我们把一件本来很“啰嗦”的事情,变成了几段 YAML 配置:
source捕获 MySQL binlogsink连接 Elasticsearch 集群route决定源表与索引的映射关系- 一套批量写入、缓冲和并发策略,统一交给 Connector 管理
