Flink CDC + StarRocks用 StarRocks Connector 打通实时明细与分析
1. StarRocks Connector 能做什么?
StarRocks Connector 在 Flink CDC 里的角色是 数据下游的 Sink,它主要负责三件事:
-
自动建表(Create table automatically if not exist)
当 StarRocks 中还没有对应表时,可以根据上游的表结构,自动创建一张主键表,省去手工 DDL 的步骤。
-
Schema 变更同步(Schema change synchronization)
当上游表结构发生变化(目前支持加列 / 删列)时,Connector 会在 StarRocks 侧做对应的 schema change,保持上下游结构一致。
-
数据同步(Data synchronization)
对于来自 MySQL binlog 的变更事件,使用 StarRocks Sink Connector 做 Stream Load 写入,实现主键表上的实时 upsert。
简单一句话:StarRocks Connector = 自动建表 + 自动跟踪字段变更 + 实时写入主键表。
2. 一个最小可用的 MySQL → StarRocks Pipeline 示例
先看一眼完整的 YAML 配置长什么样(官方示例):
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: passpipeline:name: MySQL to StarRocks Pipelineparallelism: 2
下面按块拆解一下。
2.1 Source:MySQL 变更捕获
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404
这里就是一个典型的 Flink CDC MySQL Source:
-
tables支持多个库、正则匹配表名,比如:adb.\.*:adb 下的所有表bdb.user_table_[0-9]+:user_table_0 / user_table_1 / …[app|web].order_\.*:app.order_xxx 和 web.order_xxx 两个库里的明细表
-
server-id用一个区间表示,可以让多个并行 subtasks 使用不同 server-id。
2.2 Sink:StarRocks Connector 的最小配置
sink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: pass
几个关键点:
-
type: starrocks
告诉 Flink CDC 使用 StarRocks Connector。 -
jdbc-url
通过 MySQL 协议连接 FE 的 查询端口,用于:- 查询 / 校验表信息
- 自动创建表
- 发起 schema change 请求等
可以写多个 FE 地址:
jdbc:mysql://fe1:9030,fe2:9030,fe3:9030 -
load-url
连接 FE 的 HTTP 端口,用于 Stream Load 写入数据。
多个 FE 用分号分隔:
fe1:8030;fe2:8030;fe3:8030 -
username/password
StarRocks 用户与密码,需要具备对应库表的建表、写入权限。
2.3 Pipeline:全局并行度等
pipeline:name: MySQL to StarRocks Pipelineparallelism: 2
name:在 Flink CDC CLI 或监控界面中看到的 pipeline 名称。parallelism:整个 pipeline 的并行度。
对于 StarRocks Sink 来说,这个值也影响并发 Stream Load 的吞吐能力。
3. Connector Options:按场景理解配置项
文档中的配置表比较长,我们可以换一种更“工程师友好”的方式来记住——按功能分组来看。
3.1 基础必填项
| 参数 | 是否必填 | 说明 |
|---|---|---|
type | required | 固定为 starrocks |
name | optional | sink 的名字,方便标识 |
jdbc-url | required | 连接 FE MySQL 服务的地址,支持多地址逗号分隔 |
load-url | required | 连接 FE HTTP 服务的地址,支持多地址分号分隔 |
username | required | StarRocks 用户名 |
password | required | StarRocks 密码 |
这些不配就直接跑不起来,是 最小集合。
3.2 写入性能与缓冲策略
下列参数决定了“写多快、攒多久再写、占多少内存”:
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
sink.buffer-flush.max-bytes | 157286400 (150MB) | Long | 所有表共享的缓冲区最大大小,到了就触发 flush,范围 64MB–10GB |
sink.buffer-flush.interval-ms | 300000 (5min) | Long | 单表的最大间隔 flush 时间 |
sink.scan-frequency.ms | 50 | Long | Sink 定时扫描缓冲是否需要 flush 的频率 |
sink.io.thread-count | 2 | Integer | 并发执行 Stream Load 的线程数 |
简单理解:
- 数据量大 → 提高
max-bytes,适当增加io.thread-count,保证吞吐。 - 实时性要求高 → 降低
interval-ms,不要等 5 分钟才 flush 一次。 scan-frequency.ms一般没必要动,默认 50ms 就够频繁了。
3.3 网络与超时配置
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
sink.connect.timeout-ms | 30000 | String | 建立 HTTP 连接超时 |
sink.wait-for-continue.timeout-ms | 30000 | String | 等待 100-continue 响应的超时 |
sink.socket.timeout-ms | -1 | Integer | HTTP 读超时,-1 表示不限制 |
在网络条件一般、StarRocks FE 负载不高的情况下可以用默认值。如果出现 “写入经常超时” 的情况,再结合 FE 负载与网络状况调优。
3.4 事务 & Stream Load 行为
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
sink.at-least-once.use-transaction-stream-load | true | Boolean | 是否使用事务 Stream Load 实现 at-least-once |
sink.label-prefix | (none) | String | Stream Load 的 label 前缀 |
sink.properties.* | (none) | String | 下发给 Stream Load 的参数,如 sink.properties.timeout |
例如:
sink:...sink.label-prefix: cdc_starrocks_sink.properties.timeout: 600sink.properties.max_filter_ratio: 0.1
这些参数最终都会透传给 StarRocks 的 Stream Load,具体支持内容可以参考官方 Stream Load 文档。
3.5 自动建表 & Schema 变更控制
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
table.create.num-buckets | (none) | Integer | 自动建表时使用的 bucket 数量(2.5+ 可以不设) |
table.create.properties.* | (none) | String | 自动建表时附加的表属性 |
table.schema-change.timeout | 30min | Duration | StarRocks 侧 schema change 的超时时间 |
几个关键点:
-
在 StarRocks 2.5+:
可以不填table.create.num-buckets,StarRocks 会自动决定 bucket 数量。 -
在 2.5 之前版本:
一定要配置一个合适的 bucket 数(例如 16 / 32 / 64 等 2 的倍数)。 -
table.create.properties.*非常适合开启新版本的能力,比如 3.2+ 可以打开快速 schema 演进:table.create.properties.fast_schema_evolution: "true" -
table.schema-change.timeout设成一个合理值(比如 30min),避免 schema 变更时间过长被自动取消,导致 Sink 任务失败。
4. 使用注意事项(一定要看!)
官方文档里有几条“红字注意事项”,实际落地时非常重要。
4.1 只支持主键表(Primary Key Table)
Only support StarRocks primary key table, so the source table must have primary keys.
StarRocks Connector 目前 只支持写入 StarRocks 主键表。
因此:
- 上游 CDC 表必须有主键
- 自动建表的时候,会直接创建主键模型表
否则没法实现 upsert 语义,也没办法保证 at-least-once 语义下的幂等写入。
4.2 一致性语义:At-least-once + 主键幂等等价于“逻辑精确一次”
Not support exactly-once. The connector uses at-least-once + primary key table for idempotent writing.
简单翻译:
- 从 Flink 角度看,是 at-least-once(可能重放)
- 但写入的是 主键表,同一主键的重复事件会被覆盖,因此结果上接近 逻辑上的 exactly-once
只要你能接受 “极小概率重复写,但不影响最终结果” 这个前提,那么这个语义在大多数实时数仓场景下是足够的。
4.3 自动建表规则
For creating table automatically…
- the distribution keys are the same as the primary keys
- there is no partition key
- the number of buckets is controlled by
table.create.num-buckets
也就是说,如果你完全依赖自动建表:
- 分桶键 = 主键
- 没有分区列
- bucket 数由
table.create.num-buckets控制(2.5+ 可自动)
如果你对分区策略、存储策略有更细致的设计诉求(比如按照日期分区),一般建议手工建好表,再让 Connector 直接写入。
4.4 Schema 变更限制
For schema change synchronization…
- only supports add/drop columns
- the new column will always be added to the last position
- 可以通过
fast_schema_evolution加速(3.2+)
换句话说:
-
不支持改类型 / 改列名,这类操作要自己在 StarRocks 上控制。
-
新增列时会自动追加到表尾,不会插到中间。
-
如果版本满足条件(3.2+)且自动建表,可以通过:
table.create.properties.fast_schema_evolution: "true"来加速 schema 变更。
5. 数据类型映射:尤其要注意 CHAR / VARCHAR
StarRocks Connector 已经帮我们做好了 Flink CDC → StarRocks 的类型映射,大部分类型都是一一对应,例如:
| Flink CDC type | StarRocks type |
|---|---|
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(p, s) | DECIMAL(p, s) |
| BOOLEAN | BOOLEAN |
| DATE | DATE |
| TIMESTAMP | DATETIME |
| TIMESTAMP_LTZ | DATETIME |
真正的坑在 CHAR / VARCHAR 上:
- CDC 里的长度是 “字符数”(character)
- StarRocks 里的长度是 “字节数”(byte)
- UTF-8 编码下:一个中文 = 3 字节
所以映射规则是:
-
CHAR(n)且n <= 85→ StarRocks
CHAR(n * 3)例如:
CHAR(20)→CHAR(60),刚好能覆盖 20 个中文。 -
CHAR(n)且n > 85→ StarRocks
VARCHAR(n * 3)由于 StarRocks 的
CHAR最大长度是 255,所以超过 85 字符的 CDC CHAR 会被映射成 VARCHAR。 -
VARCHAR(n)→ StarRocks
VARCHAR(n * 3)同样按 3 倍放大容量。
实战建议:
- 上游尽量统一使用
VARCHAR,避免过多CHAR(n),这样迁移时更灵活。 - 如果上游已经有很多
CHAR(n)字段,要注意 StarRocks 长度是乘以 3 之后的值,避免出现实际数据被截断的情况。
6. 实战上的一些小建议
最后,结合上面的配置和限制,总结几个实践建议:
-
小流量 / Demo 环境
parallelism: 1-2sink.buffer-flush.max-bytes: 64MB–128MBsink.buffer-flush.interval-ms: 10s–30s- 先确保链路稳定,再考虑调优。
-
中等流量 / 线上业务库同步
- 视 CPU 和网络情况适度提高
io.thread-count(2–4) - 将
buffer-flush.max-bytes设置在 256MB–512MB - 将
buffer-flush.interval-ms调到 3s–10s,换取更好的实时性。
- 视 CPU 和网络情况适度提高
-
表设计方面
-
主键设计要兼顾:
- 唯一性(CDC upsert 语义)
- 分桶均匀性(避免数据倾斜)
-
对于需要复杂分区策略的表,推荐手动建表,然后在 Connector 中关闭自动建表、直接写入。
-
-
监控与告警
- 关注 StarRocks FE / BE 的 Stream Load 指标(QPS、失败率、响应时间)。
- 在 Flink CDC 侧为 Sink 任务设置失败重启策略,配合 checkpoint 实现更稳健的 at-least-once 语义。
