Flink Oracle CDC Connector 实战指南
1. Oracle CDC Connector 能做什么?
简单一句话:
它把 Oracle 表变成一张实时更新的流表——
首次启动读一遍历史快照,之后持续从 redo / 归档日志中消费变更。
从 Flink 端看,它就是一个 Source:
- 先读快照:把当前表全量数据读一遍;
- 再读变更:持续订阅 redo/archivelog 产生的 INSERT / UPDATE / DELETE;
- 对外是一个 动态表(Dynamic Table),可被 Flink SQL / Table API / DataStream 消费。
底层使用 Debezium Oracle Connector + Oracle LogMiner,Flink CDC 帮你处理好了:
- 状态与 Checkpoint;
- Exactly-Once 语义;
- 增量快照机制;
- 类型映射、元数据列暴露;
- Flink SQL DDL 这一层封装。
2. 依赖与基础环境
2.1 Maven 依赖
在工程里引入 Oracle CDC Connector:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>3.5.0</version>
</dependency>
注意:
因为 Oracle 驱动采用 FUTC 协议,Flink CDC 的预构建包不会自带驱动,需要手动引入:
-
用于连接 Oracle:
<dependency><groupId>com.oracle.ojdbc</groupId><artifactId>ojdbc8</artifactId><version>19.3.0.0</version> </dependency> -
用于 XML 类型存储:
<dependency><groupId>com.oracle.database.xml</groupId><artifactId>xdb</artifactId><version>19.3.0.0</version> </dependency>
2.2 Flink SQL Client
如果用 Flink SQL CLI 或 Standalone 集群:
- 下载
flink-sql-connector-oracle-cdc-3.5.0.jar; - 放到
<FLINK_HOME>/lib/; - 同样把
ojdbc8.jar、xdb.jar放到lib/; - 重启集群后,就能在 SQL 中直接写:
'connector' = 'oracle-cdc'。
3. Oracle 侧准备:归档日志 + Supplemental Logging + 用户权限
要让 CDC 正常工作,Oracle 端是重头戏,主要做三件事:
- 打开归档日志(Archivelog);
- 对目标表 / 库开启 Supplemental Logging;
- 创建一个拥有 LogMiner 权限的 CDC 用户。
3.1 非 CDB 数据库
3.1.1 启用归档日志
以 SYSDBA 登录:
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nologCONNECT sys/password AS SYSDBAalter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;-- 应显示 Archive Modearchive log list;
要点:
- 切换到归档模式需要重启数据库,务必选好时间窗口;
- 归档日志会占大量磁盘,必须有定期清理策略。
3.1.2 开启 Supplemental Logging
CDC 要拿到“变更前的值”(before image),需要补充日志:
-- 针对单表开启
ALTER TABLE inventory.customersADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 针对整个数据库开启
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
3.1.3 创建 CDC 用户并授权
1)创建 LogMiner 表空间:
sqlplus sys/password@host:port/SID AS SYSDBA;CREATE TABLESPACE logminer_tbsDATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf'SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
2)创建用户并授予权限:
sqlplus sys/password@host:port/SID AS SYSDBA;CREATE USER flinkuser IDENTIFIED BY flinkpwDEFAULT TABLESPACE LOGMINER_TBSQUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE TO flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;GRANT CREATE TABLE TO flinkuser;
-- 使用旧快照机制(非增量快照)才需要
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
exit;
3.2 CDB + PDB 模式
CDB 架构下步骤类似,只是多了 PDB 层:
- 在 CDB 上启用归档日志;
- 在 CDB / PDB 下分别创建 logminer 表空间;
- 使用带
CONTAINER=ALL的授权:
CREATE USER flinkuser IDENTIFIED BY flinkpwDEFAULT TABLESPACE logminer_tbsQUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
...
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
...
Flink DDL 里需要额外指定要连接的 PDB 名:
'debezium.database.pdb.name' = 'ORCLPDB1'
4. 在 Flink SQL 中创建 Oracle CDC 表
最小可运行示例:
CREATE TABLE products (ID INT NOT NULL,NAME STRING,DESCRIPTION STRING,WEIGHT DECIMAL(10, 3),PRIMARY KEY (ID) NOT ENFORCED
) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'ORCLCDB','schema-name' = 'INVENTORY','table-name' = 'PRODUCTS'
);-- 读快照 + redo 日志
SELECT * FROM products;
4.1 一个容易踩的坑:大小写
- Oracle 未加引号的对象名,会被转成大写(
products→PRODUCTS); - Flink SQL 不会自动改大小写;
所以在 DDL 中:
-
schema-name、table-name建议写成 Oracle 实际的大写形式; -
如果 Oracle 表是
INVENTORY.PRODUCTS,就用:'schema-name' = 'INVENTORY', 'table-name' = 'PRODUCTS'
5. 关键 Connector 参数拆解
5.1 基础连接参数
connector:固定为oracle-cdchostname/port:Oracle 连接地址,端口默认 1521username/password:CDC 用户database-name:数据库 / CDB 名(如ORCLCDB)schema-name:Schema 名(如INVENTORY)table-name:表名(如PRODUCTS)
5.2 URL 参数
url 可选,默认 SID 形式:
jdbc:oracle:thin:@{hostname}:{port}:{database-name}
如果你的环境是 ServiceName,或者连接串比较特殊,可以直接自己写 url,并忽略 hostname/port/database-name。
5.3 启动模式:scan.startup.mode
initial(默认)
首次:做表快照 → 接上最新 redo 日志;latest-offset
不做快照,直接从当前时刻之后的变更开始。
注意:底层依赖 Debezium 的
snapshot.mode,不要同时配置scan.startup.mode和debezium.snapshot.mode,否则可能导致 Flink 的配置无效。
5.4 增量快照相关
-
scan.incremental.snapshot.enabled:是否启用增量快照,默认true; -
scan.incremental.snapshot.chunk.size:快照 chunk 行数,默认 8096; -
scan.snapshot.fetch.size:单次抓取行数,默认 1024; -
scan.incremental.snapshot.chunk.key-column:用于切块的字段,默认是ROWID;
也可以配置非主键列,但会有性能 & 一致性方面的风险(后面“无主键表”再聊)。 -
scan.incremental.snapshot.unbounded-chunk-first.enabled:默认true,优先处理大块,降低 OOM 风险; -
scan.incremental.snapshot.backfill.skip:默认false- 如果设为
true,快照期间产生的变更不合并进快照,而是在后续变更流中重放,只能保证 至少一次(at-least-once),需要下游做特殊处理。
- 如果设为
5.5 Debezium 参数透传
通过 debezium.* 把配置传到 Debezium,如:
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine'= 'true'
适合用来调整 LogMiner 策略等。但依旧那句话:不要和 scan.startup.mode 混用。
6. 特性与限制:Exactly Once 与快照阶段 checkpoint
6.1 Exactly Once 语义
Oracle CDC 同样支持 Exactly Once:
- 首先读取快照;
- 然后持续读取 redo 日志;
- Flink 用状态 + Checkpoint 记录消费位点,保证故障恢复时数据不丢、不多(在语义定义范围内)。
6.2 快照阶段 checkpoint 的限制
Oracle CDC 有一个需要特别注意的限制:
在扫描快照时,由于快照没有精确可恢复位点,无法真正完成可恢复的 checkpoint。
实际行为是:
- 源算子会让 checkpoint 一直等到超时;
- 超时的 checkpoint 被标记为失败;
- 默认 Flink 会因为 checkpoint 失败触发 failover,导致作业频繁重启。
官方建议在大表快照场景下调高容忍度:
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
意思是:
- checkpoint 间隔拉长;
- 允许很多次失败的 checkpoint;
- 使用固定延迟重启策略,重启次数设得很大。
7. DataStream 模式:单线程 vs 增量快照并行
除了 Flink SQL,Oracle CDC 还提供了两种 DataStream 源:
- 基于增量快照的并行 Source(
JdbcIncrementalSource) - 传统 SourceFunction 单线程模式(
OracleSource)
7.1 增量快照并行 Source(实验性质)
适合需要并行读取快照的场景,例如大表首发加载:
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");JdbcIncrementalSource<String> oracleChangeEventSource =new OracleSourceBuilder().hostname("host").port(1521).databaseList("ORCLCDB").schemaList("DEBEZIUM").tableList("DEBEZIUM.PRODUCTS").username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).startupOptions(StartupOptions.initial()).debeziumProperties(debeziumProperties).splitSize(2).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000L);env.fromSource(oracleChangeEventSource,WatermarkStrategy.noWatermarks(),"OracleParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print Oracle Snapshot + RedoLog");
官方标记为 Experimental,生产使用前要充分压测。
7.2 SourceFunction 单线程模式
更简单的写法,只支持单并行:
SourceFunction<String> sourceFunction = OracleSource.<String>builder().url("jdbc:oracle:thin:@{hostname}:{port}:{database}").port(1521).database("ORCLCDB").schemaList("INVENTORY").tableList("INVENTORY.PRODUCTS").username("flinkuser").password("flinkpw").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1);env.execute();
对于变更量不大的系统来说,这种模式反而简单稳定。
8. 无主键表 & chunk key 的一致性风险
从 3.4.0 起,Oracle CDC 支持无主键表,但必须配置:
'scan.incremental.snapshot.chunk.key-column' = 'some_column'
使用时需要注意两个点:
-
尽量选:
- 非空字段;
- 有索引的列;
- 尽量不会频繁更新的列。
-
语义:
- 如果该列不会被更新 → 可保持 Exactly Once;
- 如果该列会被更新 → 只能保证 At-Least-Once,下游需要幂等处理。
文档中的典型反例:
-
主键:
id -
chunk key:
pid(非主键) -
split 切分:
- Split 0:
1 < pid <= 3 - Split 1:
3 < pid <= 5
- Split 0:
-
两个分片并行读时,有一条记录
id=0的pid从2 → 4 -
结果:
- Split 0 看到的是
[id=0, pid=2] - Split 1 看到的是
[id=0, pid=4]
- Split 0 看到的是
-
最终哪个版本生效取决于处理顺序 → 存在数据不一致风险。
实践结论:
有主键就用主键;
无主键表用 chunk key 要非常慎重,能做到“业务上几乎不更新”最好。
9. 元数据列 & 指标:让 CDC 可观测
9.1 元数据列(VIRTUAL)
可以在 Flink 表里暴露一些只读虚拟列,帮助下游做审计和路由:
database_name:数据库名schema_name:schema 名table_name:表名op_ts:变更发生时间(如果来自快照则为 0)
示例:
CREATE TABLE products (db_name STRING METADATA FROM 'database_name' VIRTUAL,schema_name STRING METADATA FROM 'schema_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,ID INT NOT NULL,NAME STRING,DESCRIPTION STRING,WEIGHT DECIMAL(10, 3),PRIMARY KEY (ID) NOT ENFORCED
) WITH ('connector' = 'oracle-cdc',...
);
9.2 Source Metrics
Oracle CDC 在 database.schema.table 维度暴露了一批指标:
isSnapshotting:是否正在做快照isStreamReading:是否在流式读取 redo 日志numTablesSnapshotted/numTablesRemaining:快照进度numSnapshotSplitsProcessed/Remaining/Finished:分片进度snapshotStartTime/snapshotEndTime:快照时间
非常适合接入 Prometheus + Grafana 做一个 CDC 看板,用来监控:
- 首发同步的进度;
- Binlog / redo 延迟;
- 各表运行状态。
10. 总结与实践建议
Flink Oracle CDC Connector 提供了一条不侵入业务代码的路,把 Oracle 核心库的数据实时“搬运”到下游系统——Kafka、Doris、StarRocks、Iceberg、ES、明细库、缓存库等。
落地时,可以按照这个节奏来:
- 从一两张关键表试点(例如订单、账务),熟悉归档日志和权限配置;
- 在测试/预发环境用小表做增量快照验证,调整 chunk size 与 checkpoint 策略;
- 将 CDC 作为 Streaming ELT 流程的第一个 Source,后面接 Flink SQL 做清洗、聚合,再写入数仓或分析库;
- 利用元数据列和 Metrics 做好监控与审计,让 CDC 变成“可观测”的基础设施。
