从 WAL 到 Fluss->Flink CDC Postgres Connector 端到端同步实战
一、为什么选择 Postgres CDC?
- 一次配置,持续增量:先做一次快照(Snapshot),之后通过 WAL(逻辑日志)连续订阅变更。
- 端到端一致性:配合 checkpoint、LSN 提交与心跳机制,帮助下游做到“至少一次”或“幂等写”。
- 无侵入:对线上业务影响小(合理调优快照并发与 chunk 即可)。
限制:由于 Postgres WAL 不包含完整 DDL 语义,当前不支持结构变更同步。请将 DDL 统一由 DBA/CI 变更并在上下游保持一致。
二、快速上手:一份可运行的 Pipeline YAML
下面是一个从 PostgreSQL 同步到 Fluss 的最小可用示例(含安全配置):
source:type: postgresname: Postgres Sourcehostname: 127.0.0.1port: 5432username: adminpassword: pass# 同一条连接仅支持一个 database,正则可跨 schema/table:tables: adb.\.*\..*decoding.plugin.name: pgoutputslot.name: pgtestsink:type: flussname: Fluss Sinkbootstrap.servers: localhost:9123# Fluss 客户端安全配置properties.client.security.protocol: saslproperties.client.security.sasl.mechanism: PLAINproperties.client.security.sasl.username: developerproperties.client.security.sasl.password: developer-passpipeline:name: Postgres to Fluss Pipelineparallelism: 4
三个易踩点:
tables的点号.是库/模式/表名的分隔符;想匹配任意字符要转义\.。- 所有表必须属于同一个 database(Postgres CDC 当前仅支持单库连接)。
slot.name必须符合 PG 规则:小写字母/数字/下划线。
三、上游数据库准备清单(一次性)
-
启用逻辑复制(postgresql.conf):
wal_level = logical max_wal_senders = 10 max_replication_slots = 10 -
权限与网络:为采集用户授予 REPLICATION/SELECT(实际按你的安全基线收敛),开放 5432 到采集机。
-
解码插件:Postgres 10+ 自带
pgoutput(推荐)。若用其他插件(如 decoderbufs),需另行安装。 -
Publication(可选):很多 Debezium/Flink CDC 会自动处理,无需手工建;若你做了白名单发布,可:
CREATE PUBLICATION mypub FOR ALL TABLES; -
校验:确认
SELECT * FROM pg_replication_slots;能看到你的slot.name(首次运行由 CDC 创建亦可)。
四、核心配置与调优思路
下表挑重点做“参数到效果”的工程化解读(完整参数见文末“参数对照速查”)。
1)连接与扫描
-
hostname/port/username/password:常规连接参数。 -
tables(必填):db.schema.table列表或正则;.作为分隔符,若要作为“任意字符”,请写\.。
例:bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.* -
scan.startup.mode:initial | latest-offset | committed-offset | snapshot- initial(默认):先全量快照,再接入增量 WAL;第一次上线首选。
- latest-offset:跳过快照,从当前 WAL 末尾开始;只关心上线后的新增/变更。
- committed-offset:从确认提交的 LSN 开始;用于“断点续跑/灾备切换”。
- snapshot:只做一次全量,做完退出(用于“冷启动全量导入”)。
-
server-time-zone:强烈建议显式设置(如"Asia/Shanghai"),避免默认时区导致时间错乱。
2)快照并发与内存
-
scan.incremental.snapshot.chunk.size(默认 8096):快照分片行数。表很大时适当减小以降低单批内存。 -
scan.snapshot.fetch.size(默认 1024):JDBC 每批抓取行数,影响单次 I/O 压力与内存。 -
scan.incremental.close-idle-reader.enabled:true 时在快照结束关闭空闲 reader。- Flink ≥ 1.15 默认已开启 checkpoints-after-tasks-finish,无需额外配置。
3)WAL 提交与心跳
scan.lsn-commit.checkpoints-num-delay(默认 3):延迟 N 个 checkpoint 再提交 LSN,有助于避免回退。heartbeat.interval(默认 30s):周期性发心跳,确保“长时间无数据”也能推进位点。
4)连接健壮性
connect.timeout(默认 30s)/connect.max-retries(默认 3)/connection.pool.size(默认 20):结合网络质量与库容量调优。jdbc.properties.*:透传 JDBC 参数(如jdbc.properties.ssl=true)。debezium.*:透传 Debezium 参数(如debezium.snapshot.mode=never)。
五、运行期可观测性:指标(Metrics)
维度:namespace.schema.table
关键 Gauge:
isSnapshotting/isStreamReading:是否处于快照/增量阶段。numTablesSnapshotted/numTablesRemaining:整体进度。numSnapshotSplitsProcessed/Remaining/Finished:分片粒度进度。snapshotStartTime/snapshotEndTime:快照窗口。
这些指标可接入 Prometheus + Grafana 做“迁移热力图”,直观看快照耗时、卡在哪些大表。
六、数据类型映射(工程师最常用对照)
简版速查,覆盖主流类型;完整细节放到附录。
-
布尔/位:
BOOLEAN/BIT(1)→BOOLEAN;BIT(>1)→BYTES -
整数族:
SMALLINT/SERIAL2→SMALLINT;INTEGER/SERIAL→INT;BIGINT/BIGSERIAL/OID→BIGINT -
浮点/定点:
REAL/FLOAT4→FLOAT;DOUBLE PRECISION/FLOAT8→DOUBLE;NUMERIC/DECIMAL→ 见“十进制映射” -
字符串:
CHAR/VARCHAR/BPCHAR/CHARACTER VARYING→STRING -
时间:
TIMESTAMPTZ→ZonedTimestampType(保留时区)- 其他时间类型受
debezium.time.precision.mode影响(见下一节)
-
二进制:
BYTEA→BYTES(或按debezium.binary.handling.mode输出为 base64/base64-url-safe/hex 字符串) -
JSON/XML/UUID/地理/范围/ENUM → 通常映射为
STRING -
数组:仅支持一维数组(多维数组当前不支持)
时间精度策略(务必选对)
debezium.time.precision.mode=adaptive(含..._microseconds变体):
TIME固定TIME(3);TIMESTAMP精度至多 6(微秒)。...=connect:TIME/TIMESTAMP统一(...,3)。
CDC 限制:
TIME精度固定为 3,无论你怎么配置,都会转成TIME(3)。
十进制/货币类型
debezium.decimal.handling.mode:
precise(默认):保留精度 →DECIMAL[(M,D)];MONEY→DECIMAL(38, digits)double:全部转DOUBLE(有精度丢失风险)string:全部转字符串(包含NaN等特殊值可安全表达)
HSTORE
debezium.hstore.handling.mode=json(默认):转为 JSON 字符串...=map:转为MAP类型
网络地址类型
INET/CIDR/MACADDR/MACADDR8→STRING(更通用,便于下游存储)
PostGIS(空间类型)
-
典型映射为 JSON 字符串,示例:
GEOMETRY(POINT, 3187)→{"coordinates":"[[174.9479, -36.7208]]","type":"Point","srid":3187}GEOGRAPHY(MULTILINESTRING)→{"coordinates":"[[169.1321, -44.7032],[167.8974, -44.6414]]","type":"MultiLineString","srid":4326}
七、生产化建议与常见坑
1)Schema 变更与 DDL
- CDC 不同步 DDL:请将 DDL 纳入变更流程,先改上游再改下游,保持结构一致。
- 给下游(如 Fluss → Doris/ES)配一层 Schema 管控(版本号/兼容字段策略)。
2)位点与复制槽
- 复制槽泄漏:下线或变更
slot.name之前,确认消费端已停并手工清理旧槽(否则 WAL 膨胀)。 - LSN 提交延迟:
scan.lsn-commit.checkpoints-num-delay默认 3,若恢复时间过长可适度降低。
3)时间与时区
- 固定
server-time-zone,避免运维切换 / 容器默认时区导致时间型错位。 - 对
TIMESTAMPTZ全链路保持原样,不要盲目做本地化。
4)性能与内存
- 大表快照小 chunk + 小 fetch 保守起步,根据 CPU/IO 再放大。
- 并发靠
pipeline.parallelism+ 源端分片数量决定;过高会抢占库资源。
5)故障排查速记
slot already exists:换slot.name或清理旧槽。publication not found:检查是否手工建了 publication 且表在不在发布列表。permission denied:核对REPLICATION/SELECT与库/模式 ACL。unknown debezium.*:拼写错误或不被当前版本识别。
八、把数据写进 Fluss 的落地要点
- 生产上务必开启SASL 或其他鉴权,防止“数据总线裸奔”。
- 分区策略:可使用
schema.table+ 主键做 Key,下游按 Key 有序消费更容易做到幂等。 - 端到端一致性:Sink 侧实现幂等写(按主键 Upsert、按事件时间去重等)。
九、校验与回归:上线 checklist
- 小表试跑:5~10 张小表先走一遍 Snapshot → Stream,观察 Metrics。
- 全量对账:
COUNT(*) / SUM(主键哈希)粗对,抽样做字段级对账。 - 断点续跑:模拟任务重启,确认 LSN 能平滑推进且不丢不重。
- 空闲心跳:停业务写入,确认
heartbeat.interval能推进位点。 - WAL 膨胀监控:接入
pg_stat_replication/pg_replication_slots,设置告警。
十、参数对照速查(节选高频)
| 选项 | 必填 | 默认 | 类型 | 说明 |
|---|---|---|---|---|
hostname | ✅ | — | String | Postgres 主机 |
port | 5432 | Integer | 端口 | |
username / password | ✅ | — | String | 连接凭据 |
tables | ✅ | — | String | 捕获表(支持正则,. 是分隔符,要匹配任意字符用 \.) |
slot.name | ✅ | — | String | 逻辑复制槽名 |
decoding.plugin.name | — | String | 解码插件:pgoutput / decoderbufs | |
server-time-zone | — | String | 会话时区,如 "Asia/Shanghai" | |
scan.startup.mode | initial | String | initial/latest-offset/committed-offset/snapshot | |
scan.incremental.snapshot.chunk.size | 8096 | Integer | 快照分片行数 | |
scan.snapshot.fetch.size | 1024 | Integer | 每批抓取行数 | |
scan.incremental.close-idle-reader.enabled | false | Boolean | 快照结束关闭 idle readers(Flink ≥1.15 默认可闭环) | |
scan.lsn-commit.checkpoints-num-delay | 3 | Integer | LSN 提交延迟的 checkpoint 数 | |
connect.timeout | 30s | Duration | 连接超时 | |
connect.max-retries | 3 | Integer | 连接重试 | |
connection.pool.size | 20 | Integer | 连接池 | |
jdbc.properties.* | — | String | 透传 JDBC 参数 | |
heartbeat.interval | 30s | Duration | 心跳周期 | |
debezium.* | — | String | 透传 Debezium 参数(例如 debezium.snapshot.mode=never) | |
metadata.list | false | String | 可读元数据,如 op_ts |
十一、附录:时间与十进制映射一图懂
-
时间
TIMESTAMPTZ⇒ZonedTimestampTypedebezium.time.precision.mode=adaptive(_time_microseconds):TIME(3),TIMESTAMP最多 6 位小数...=connect:TIME/TIMESTAMP统一(...,3)
-
十进制
precise:DECIMAL(M,D)(默认,金融/金额推荐)double:DOUBLE(有精度丢失)string:字符串(包含NaN也能表示)
