Flink CDC + MaxCompute用 MaxCompute Connector 打通实时入湖通道
1. MaxCompute Connector 能做什么?
官方给出的能力非常清晰,主要就是“三件套”:
-
自动建表(Create table automatically if not exist)
当 MaxCompute 中没有对应表时,Connector 可以根据上游表结构自动创建表,并且:
- 如果源表有主键 → 自动建 Delta 表;
- 如果源表无主键 → 自动建 普通 Append 表。
-
Schema 变更同步(Schema change synchronization)
当上游表结构发生变化时,Connector 会在 MaxCompute 侧做对应的表结构变更(有一定限制,下文展开)。
-
数据同步(Data synchronization)
订阅上游 binlog,将增量变更按 at-least-once 语义写入 MaxCompute:
- Delta 表依赖主键特性,可以实现幂等写入;
- Append 表则是纯“追加”,删除被忽略、更新当作插入。
换句话说,MaxCompute Connector 就是帮你做了一套:
“自动建表 + 跟踪 schema + 实时入湖”的一站式方案。
2. 示例:MySQL → MaxCompute 的最小 Pipeline
先看官方给的完整示例,整体结构跟 StarRocks / Doris、Elasticsearch 的 Pipeline 很类似:
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: maxcomputename: MaxCompute Sinkaccess-id: akaccess-key: skendpoint: endpointproject: flink_cdcbuckets-num: 8pipeline:name: MySQL to MaxCompute Pipelineparallelism: 2
简单拆一下。
2.1 Source:MySQL 增量捕获
source:type: mysql...tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404
-
tables:支持多库多表 + 正则匹配:adb.\.*→ adb 库全部表;bdb.user_table_[0-9]+→ user_table_0 / user_table_1 / … 分表;[app|web].order_\. *→ app / web 两个库下所有 order_ 前缀表。
-
server-id:使用 server-id 范围方便多并行度使用不同 server-id。
这部分和你已有的 MySQL Source 配置基本一致。
2.2 Sink:MaxCompute 连接配置
sink:type: maxcomputename: MaxCompute Sinkaccess-id: akaccess-key: skendpoint: endpointproject: flink_cdcbuckets-num: 8
关键字段:
-
type: maxcompute
指定使用 MaxCompute Connector。 -
access-id/access-key
阿里云账号或 RAM 用户的 AccessKey ID / Secret,
对应控制台的 AccessKey 管理页面。 -
endpoint
MaxCompute 服务的连接地址,不同地域和网络环境的 Endpoint 不同,需要按照项目所在 Region & 网络方式来配置。 -
project
MaxCompute 项目的名字(不是 Workspace 名),
在控制台 Workspace → Project Management 页面可以看到。 -
buckets-num
自动创建 Delta 表 时使用的 bucket 数量。
(是否理解为 Delta 表内部桶数,可参考 MaxCompute Delta 表文档)
2.3 pipeline:整体任务属性
pipeline:name: MySQL to MaxCompute Pipelineparallelism: 2
name:任务名字,日志、UI 中都会用到;parallelism:Pipeline 的整体并行度,决定 Source / Sink 的并发度。
3. Connector Options:按功能分组理解配置项
文档给了一张完整配置表,我们按功能拆开会更直观。
3.1 基础认证 & 连接参数
| 参数 | 必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
type | 是 | (none) | String | 固定为 maxcompute |
name | 否 | (none) | String | Sink 名字,便于区分 |
access-id | 是 | (none) | String | 阿里云账号或 RAM 用户的 AccessKey ID |
access-key | 是 | (none) | String | 对应 AccessKey Secret |
endpoint | 是 | (none) | String | MaxCompute 服务 Endpoint |
project | 是 | (none) | String | MaxCompute 项目名 |
tunnel.endpoint | 否 | (none) | String | MaxCompute Tunnel 服务 Endpoint,在特殊网络环境(代理等)才需要显式配置 |
quota.name | 否 | (none) | String | MaxCompute 数据传输专用资源组名称,不配置用共享资源组 |
sts-token | 否 | (none) | String | 使用 STS 临时凭证时需要配置 |
这里几个点简单提一下:
- 使用生产环境通常建议用 RAM 角色 + STS,安全性更好;
quota.name对应 MaxCompute 的“独享资源组”,大体可以理解为给数据导入导出“上专线”,避免和其他作业抢共享资源;tunnel.endpoint一般可自动路由,仅在代理或特殊网络里需要手动设置。
3.2 Delta 表 & 写入并发相关参数
| 参数 | 必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
buckets-num | 否 | 16 | Integer | 自动创建 Delta 表时使用的 bucket 数量 |
total.buffer-size | 否 | 64MB | String | 内存数据缓冲大小,按“分区级别”(无分区则按表级)统计,达到阈值就写入 |
bucket.buffer-size | 否 | 4MB | String | 按 bucket 级别的数据缓冲大小,仅 Delta 表有效 |
commit.thread-num | 否 | 16 | Integer | checkpoint 阶段可同时处理的分区 / 表数量 |
flush.concurrent-num | 否 | 4 | Integer | 同时向 MaxCompute 写入的 bucket 数(仅 Delta 表有效) |
可以简单理解为:
-
buckets-num:是 Delta 表在 MaxCompute 侧的“桶数”,影响并行写入与后续查询性能; -
total.buffer-size/bucket.buffer-size:- 决定了内存中能“攒”多少数据;
- 达到阈值就触发写入;
-
commit.thread-num/flush.concurrent-num:- 决定 checkpoint 阶段、写入阶段的并发度。
一般来说:
- 数据量较大 → 可以适度增大
buckets-num、flush.concurrent-num; - 内存紧张 → 需要把
total.buffer-size/bucket.buffer-size控制小一点。
3.3 压缩与写入性能
| 参数 | 必填 | 默认值 | 类型 | 说明 |
|---|---|---|---|---|
compress.algorithm | 否 | zlib | String | 写入 MaxCompute 的压缩算法:raw / zlib / lz4 / snappy |
压缩算法选择上:
raw:不压缩,CPU 压力最小,存储占用最高;zlib:压缩率高,CPU 较重,默认值,比较通用;lz4/snappy:压缩率略低于 zlib,但速度更快,更适合吞吐优先场景。
4. 使用说明:自动建表、Delta 表与一致性语义
官方 Usage Instructions 部分信息很关键,这里单独拎出来说。
4.1 自动建表策略
The connector supports automatic table creation, automatically mapping the location relationship and data types between MaxCompute tables and source tables…
要点:
- Connector 会根据源表的位置(库名 / 表名)和字段类型,在 MaxCompute 侧自动创建表;
- 如果源表有主键 → 创建 Delta 表;
- 如果源表没有主键 → 创建普通 Append 表。
Delta 表可以理解为:
- 支持基于主键的 upsert / delete;
- 可以通过主键实现幂等写入。
而 Append 表则是“纯追加”,不支持主键级别更新。
4.2 Delete / Update 在 Append 表上的行为
When writing to a regular MaxCompute table (Append table), the delete operation will be ignored, and the update operation will be treated as an insert operation.
非常重要的一条:
-
对 Append 表:
- 删除操作会被忽略;
- 更新会当作插入一行新数据。
也就是说,如果你真的需要“行级别”的准确变更结果,请尽量保证源表有主键,让 Connector 自动建成 Delta 表,或者手工建好 Delta 表再写入。
4.3 一致性语义:at-least-once + Delta 表幂等
Currently, only at-least-once is supported. Delta tables can achieve idempotent writes due to their primary key characteristics.
-
从 Flink 任务的角度:语义是 at-least-once,出现失败时会重试;
-
对 Delta 表而言,因为有主键:
- 重试时写入的“重复数据”会覆盖同主键行,从而实现幂等效果;
- 最终结果上接近逻辑的“exactly-once”。
对于 Append 表则没有幂等性保证,重试时就真的是重复行。
4.4 Schema 变更支持
For synchronization of table structure changes:
- A new column can only be added as the last column.
- Modifying a column type can only be changed to a compatible type.
这里的约束很类似 StarRocks / Doris:
-
新增列:只能追加到字段列表的最后;
-
修改列类型:必须是兼容类型(详见 MaxCompute 的
ALTER TABLE文档),例如:- 小范围整数升级为大范围整数;
- 某些字符串长度增加等。
5. 表位置映射:Project / Schema / Table 对应关系
MaxCompute Connector 在“自动建表”的时候,需要把 Flink CDC 中的 TableId 映射到 MaxCompute 的实际位置(project / schema / table)。官方给了一张映射表:
| 抽象概念(Flink CDC) | MaxCompute 位置 | MySQL 位置 |
|---|---|---|
project in config | project | (none) |
TableId.namespace | schema(仅在 MaxCompute 项目支持 Schema 模型时) | database |
TableId.tableName | table | table |
重点几点:
-
配置文件中的
project就是 MaxCompute 的 Project 名
与 MySQL 无直接对应。 -
TableId.namespace一般对应 MySQL 的 database:-
如果 MaxCompute 项目支持 Schema 模型:
- namespace → MaxCompute schema;
-
如果不支持 Schema 模型:
- 该信息会被忽略;
- 并且一个同步任务只能同步 一个 MySQL Database(同理适用于其它 DataSource)。
-
-
TableId.tableName直接对应 MaxCompute 表名。
⚠️ 注意:
如果你的 MaxCompute 项目不支持 Schema 模型,那么一个同步任务只能对接一个上游库。多库场景下,需要拆成多个 Pipeline。
6. 数据类型映射:Flink Type → MaxCompute Type
最后是非常重要的类型映射表,决定了自动建表时字段的类型:
| Flink Type | MaxCompute Type |
|---|---|
| CHAR / VARCHAR | STRING |
| BOOLEAN | BOOLEAN |
| BINARY / VARBINARY | BINARY |
| DECIMAL | DECIMAL |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| TIME_WITHOUT_TIME_ZONE | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
| TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
| ARRAY | ARRAY |
| MAP | MAP |
| ROW | STRUCT |
几点小建议:
-
字符串统一映射为 STRING
- 上游的 CHAR / VARCHAR 都是 STRING,比较自然;
- 但如果你后续对维度列有分桶、分区等需求,请在建模阶段自行约束长度与内容格式。
-
时间类型要注意 TIMESTAMP_NTZ vs TIMESTAMP
TIMESTAMP_WITHOUT_TIME_ZONE→TIMESTAMP_NTZTIMESTAMP_WITH_LOCAL_TIME_ZONE/TIMESTAMP_WITH_TIME_ZONE→TIMESTAMP
这会影响你在 MaxCompute 侧对时间字段的处理方式,尤其是时区相关的 ETL / 分区。
-
ROW → STRUCT
- 上游复杂结构在 MaxCompute 中变为 STRUCT;
- 在 SQL 开发中,你可以通过
col.subfield的方式访问内部字段。
7. 实战经验与建议
结合上面这些配置和特性,最后给几个落地建议:
-
能用 Delta 表就用 Delta 表
- 有主键的业务表尽量走 Delta 表,享受幂等写入和更新支持;
- 对纯日志类、append-only 表(没有更新 / 删除需求)可适当用 Append 表。
-
先想清楚 Project / Schema 布局
- 确认 MaxCompute 项目是否支持 Schema 模型;
- 如果不支持,一个 Pipeline 对应一个 MySQL 库;
- 多库 / 大项目可以按业务线拆多个 Pipeline,对应多个 Project 或 Schema。
-
合理设置 buckets-num 和 buffer
- 小规模数据:
buckets-num可用默认 16,total.buffer-size64MB 足够; - 大规模高吞吐:可以增大 bucket 数量,适当调高
flush.concurrent-num,并评估内存占用。
- 小规模数据:
-
事先规划好表结构与 Schema 变更策略
- 尽量避免频繁的类型变更,遵循 MaxCompute 的兼容性约束;
- 新增列默认在尾部,在建模时考虑预留扩展空间。
-
写入性能监控
- 监控 Flink 任务的 checkpoint 时间、backpressure 情况;
- 监控 MaxCompute 的写入吞吐和资源组使用情况(quota.name 对应的独享资源组)。
