当前位置: 首页 > news >正文

Flink Oracle CDC Connector 实战指南

1. Oracle CDC Connector 能做什么?

简单一句话:

它把 Oracle 表变成一张实时更新的流表——
首次启动读一遍历史快照,之后持续从 redo / 归档日志中消费变更。

从 Flink 端看,它就是一个 Source:

  • 先读快照:把当前表全量数据读一遍;
  • 再读变更:持续订阅 redo/archivelog 产生的 INSERT / UPDATE / DELETE;
  • 对外是一个 动态表(Dynamic Table),可被 Flink SQL / Table API / DataStream 消费。

底层使用 Debezium Oracle Connector + Oracle LogMiner,Flink CDC 帮你处理好了:

  • 状态与 Checkpoint;
  • Exactly-Once 语义;
  • 增量快照机制;
  • 类型映射、元数据列暴露;
  • Flink SQL DDL 这一层封装。

2. 依赖与基础环境

2.1 Maven 依赖

在工程里引入 Oracle CDC Connector:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>3.5.0</version>
</dependency>

注意:
因为 Oracle 驱动采用 FUTC 协议,Flink CDC 的预构建包不会自带驱动,需要手动引入

  • 用于连接 Oracle:

    <dependency><groupId>com.oracle.ojdbc</groupId><artifactId>ojdbc8</artifactId><version>19.3.0.0</version>
    </dependency>
    
  • 用于 XML 类型存储:

    <dependency><groupId>com.oracle.database.xml</groupId><artifactId>xdb</artifactId><version>19.3.0.0</version>
    </dependency>
    

2.2 Flink SQL Client

如果用 Flink SQL CLI 或 Standalone 集群:

  1. 下载 flink-sql-connector-oracle-cdc-3.5.0.jar
  2. 放到 <FLINK_HOME>/lib/
  3. 同样把 ojdbc8.jarxdb.jar 放到 lib/
  4. 重启集群后,就能在 SQL 中直接写:'connector' = 'oracle-cdc'

3. Oracle 侧准备:归档日志 + Supplemental Logging + 用户权限

要让 CDC 正常工作,Oracle 端是重头戏,主要做三件事:

  1. 打开归档日志(Archivelog);
  2. 对目标表 / 库开启 Supplemental Logging;
  3. 创建一个拥有 LogMiner 权限的 CDC 用户。

3.1 非 CDB 数据库

3.1.1 启用归档日志

以 SYSDBA 登录:

ORACLE_SID=SID
export ORACLE_SID
sqlplus /nologCONNECT sys/password AS SYSDBAalter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;-- 应显示 Archive Modearchive log list;

要点:

  • 切换到归档模式需要重启数据库,务必选好时间窗口;
  • 归档日志会占大量磁盘,必须有定期清理策略。
3.1.2 开启 Supplemental Logging

CDC 要拿到“变更前的值”(before image),需要补充日志:

-- 针对单表开启
ALTER TABLE inventory.customersADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 针对整个数据库开启
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
3.1.3 创建 CDC 用户并授权

1)创建 LogMiner 表空间:

sqlplus sys/password@host:port/SID AS SYSDBA;CREATE TABLESPACE logminer_tbsDATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf'SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;

2)创建用户并授予权限:

sqlplus sys/password@host:port/SID AS SYSDBA;CREATE USER flinkuser IDENTIFIED BY flinkpwDEFAULT TABLESPACE LOGMINER_TBSQUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE TO flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;GRANT CREATE TABLE TO flinkuser;
-- 使用旧快照机制(非增量快照)才需要
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR   TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG                 TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY         TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS         TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS     TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS   TO flinkuser;
GRANT SELECT ON V_$LOGFILE             TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG        TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
exit;

3.2 CDB + PDB 模式

CDB 架构下步骤类似,只是多了 PDB 层:

  • 在 CDB 上启用归档日志;
  • 在 CDB / PDB 下分别创建 logminer 表空间;
  • 使用带 CONTAINER=ALL 的授权:
CREATE USER flinkuser IDENTIFIED BY flinkpwDEFAULT TABLESPACE logminer_tbsQUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
...
GRANT EXECUTE ON DBMS_LOGMNR   TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
...

Flink DDL 里需要额外指定要连接的 PDB 名:

'debezium.database.pdb.name' = 'ORCLPDB1'

4. 在 Flink SQL 中创建 Oracle CDC 表

最小可运行示例:

CREATE TABLE products (ID          INT NOT NULL,NAME        STRING,DESCRIPTION STRING,WEIGHT      DECIMAL(10, 3),PRIMARY KEY (ID) NOT ENFORCED
) WITH ('connector'     = 'oracle-cdc','hostname'      = 'localhost','port'          = '1521','username'      = 'flinkuser','password'      = 'flinkpw','database-name' = 'ORCLCDB','schema-name'   = 'INVENTORY','table-name'    = 'PRODUCTS'
);-- 读快照 + redo 日志
SELECT * FROM products;

4.1 一个容易踩的坑:大小写

  • Oracle 未加引号的对象名,会被转成大写(productsPRODUCTS);
  • Flink SQL 不会自动改大小写;

所以在 DDL 中:

  • schema-nametable-name 建议写成 Oracle 实际的大写形式

  • 如果 Oracle 表是 INVENTORY.PRODUCTS,就用:

    'schema-name' = 'INVENTORY',
    'table-name'  = 'PRODUCTS'
    

5. 关键 Connector 参数拆解

5.1 基础连接参数

  • connector:固定为 oracle-cdc
  • hostname / port:Oracle 连接地址,端口默认 1521
  • username / password:CDC 用户
  • database-name:数据库 / CDB 名(如 ORCLCDB
  • schema-name:Schema 名(如 INVENTORY
  • table-name:表名(如 PRODUCTS

5.2 URL 参数

url 可选,默认 SID 形式:

jdbc:oracle:thin:@{hostname}:{port}:{database-name}

如果你的环境是 ServiceName,或者连接串比较特殊,可以直接自己写 url,并忽略 hostname/port/database-name

5.3 启动模式:scan.startup.mode

  • initial(默认)
    首次:做表快照 → 接上最新 redo 日志;
  • latest-offset
    不做快照,直接从当前时刻之后的变更开始。

注意:底层依赖 Debezium 的 snapshot.mode不要同时配置 scan.startup.modedebezium.snapshot.mode,否则可能导致 Flink 的配置无效。

5.4 增量快照相关

  • scan.incremental.snapshot.enabled:是否启用增量快照,默认 true

  • scan.incremental.snapshot.chunk.size:快照 chunk 行数,默认 8096;

  • scan.snapshot.fetch.size:单次抓取行数,默认 1024;

  • scan.incremental.snapshot.chunk.key-column:用于切块的字段,默认是 ROWID
    也可以配置非主键列,但会有性能 & 一致性方面的风险(后面“无主键表”再聊)。

  • scan.incremental.snapshot.unbounded-chunk-first.enabled:默认 true,优先处理大块,降低 OOM 风险;

  • scan.incremental.snapshot.backfill.skip:默认 false

    • 如果设为 true,快照期间产生的变更不合并进快照,而是在后续变更流中重放,只能保证 至少一次(at-least-once),需要下游做特殊处理。

5.5 Debezium 参数透传

通过 debezium.* 把配置传到 Debezium,如:

'debezium.log.mining.strategy'       = 'online_catalog',
'debezium.log.mining.continuous.mine'= 'true'

适合用来调整 LogMiner 策略等。但依旧那句话:不要和 scan.startup.mode 混用

6. 特性与限制:Exactly Once 与快照阶段 checkpoint

6.1 Exactly Once 语义

Oracle CDC 同样支持 Exactly Once:

  • 首先读取快照;
  • 然后持续读取 redo 日志;
  • Flink 用状态 + Checkpoint 记录消费位点,保证故障恢复时数据不丢、不多(在语义定义范围内)。

6.2 快照阶段 checkpoint 的限制

Oracle CDC 有一个需要特别注意的限制:

在扫描快照时,由于快照没有精确可恢复位点,无法真正完成可恢复的 checkpoint

实际行为是:

  • 源算子会让 checkpoint 一直等到超时;
  • 超时的 checkpoint 被标记为失败;
  • 默认 Flink 会因为 checkpoint 失败触发 failover,导致作业频繁重启。

官方建议在大表快照场景下调高容忍度:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

意思是:

  • checkpoint 间隔拉长;
  • 允许很多次失败的 checkpoint;
  • 使用固定延迟重启策略,重启次数设得很大。

7. DataStream 模式:单线程 vs 增量快照并行

除了 Flink SQL,Oracle CDC 还提供了两种 DataStream 源:

  1. 基于增量快照的并行 Source(JdbcIncrementalSource
  2. 传统 SourceFunction 单线程模式(OracleSource

7.1 增量快照并行 Source(实验性质)

适合需要并行读取快照的场景,例如大表首发加载:

Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");JdbcIncrementalSource<String> oracleChangeEventSource =new OracleSourceBuilder().hostname("host").port(1521).databaseList("ORCLCDB").schemaList("DEBEZIUM").tableList("DEBEZIUM.PRODUCTS").username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).startupOptions(StartupOptions.initial()).debeziumProperties(debeziumProperties).splitSize(2).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000L);env.fromSource(oracleChangeEventSource,WatermarkStrategy.noWatermarks(),"OracleParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print Oracle Snapshot + RedoLog");

官方标记为 Experimental,生产使用前要充分压测。

7.2 SourceFunction 单线程模式

更简单的写法,只支持单并行:

SourceFunction<String> sourceFunction = OracleSource.<String>builder().url("jdbc:oracle:thin:@{hostname}:{port}:{database}").port(1521).database("ORCLCDB").schemaList("INVENTORY").tableList("INVENTORY.PRODUCTS").username("flinkuser").password("flinkpw").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1);env.execute();

对于变更量不大的系统来说,这种模式反而简单稳定。

8. 无主键表 & chunk key 的一致性风险

从 3.4.0 起,Oracle CDC 支持无主键表,但必须配置:

'scan.incremental.snapshot.chunk.key-column' = 'some_column'

使用时需要注意两个点:

  1. 尽量选:

    • 非空字段;
    • 有索引的列;
    • 尽量不会频繁更新的列。
  2. 语义:

    • 如果该列不会被更新 → 可保持 Exactly Once;
    • 如果该列会被更新 → 只能保证 At-Least-Once,下游需要幂等处理。

文档中的典型反例:

  • 主键:id

  • chunk key:pid(非主键)

  • split 切分:

    • Split 0:1 < pid <= 3
    • Split 1:3 < pid <= 5
  • 两个分片并行读时,有一条记录 id=0pid2 → 4

  • 结果:

    • Split 0 看到的是 [id=0, pid=2]
    • Split 1 看到的是 [id=0, pid=4]
  • 最终哪个版本生效取决于处理顺序 → 存在数据不一致风险。

实践结论:

有主键就用主键;
无主键表用 chunk key 要非常慎重,能做到“业务上几乎不更新”最好。

9. 元数据列 & 指标:让 CDC 可观测

9.1 元数据列(VIRTUAL)

可以在 Flink 表里暴露一些只读虚拟列,帮助下游做审计和路由:

  • database_name:数据库名
  • schema_name:schema 名
  • table_name:表名
  • op_ts:变更发生时间(如果来自快照则为 0)

示例:

CREATE TABLE products (db_name       STRING           METADATA FROM 'database_name' VIRTUAL,schema_name   STRING           METADATA FROM 'schema_name'  VIRTUAL,table_name    STRING           METADATA FROM 'table_name'   VIRTUAL,operation_ts  TIMESTAMP_LTZ(3) METADATA FROM 'op_ts'        VIRTUAL,ID            INT NOT NULL,NAME          STRING,DESCRIPTION   STRING,WEIGHT        DECIMAL(10, 3),PRIMARY KEY (ID) NOT ENFORCED
) WITH ('connector'   = 'oracle-cdc',...
);

9.2 Source Metrics

Oracle CDC 在 database.schema.table 维度暴露了一批指标:

  • isSnapshotting:是否正在做快照
  • isStreamReading:是否在流式读取 redo 日志
  • numTablesSnapshotted / numTablesRemaining:快照进度
  • numSnapshotSplitsProcessed / Remaining / Finished:分片进度
  • snapshotStartTime / snapshotEndTime:快照时间

非常适合接入 Prometheus + Grafana 做一个 CDC 看板,用来监控:

  • 首发同步的进度;
  • Binlog / redo 延迟;
  • 各表运行状态。

10. 总结与实践建议

Flink Oracle CDC Connector 提供了一条不侵入业务代码的路,把 Oracle 核心库的数据实时“搬运”到下游系统——Kafka、Doris、StarRocks、Iceberg、ES、明细库、缓存库等。

落地时,可以按照这个节奏来:

  1. 从一两张关键表试点(例如订单、账务),熟悉归档日志和权限配置;
  2. 在测试/预发环境用小表做增量快照验证,调整 chunk size 与 checkpoint 策略;
  3. 将 CDC 作为 Streaming ELT 流程的第一个 Source,后面接 Flink SQL 做清洗、聚合,再写入数仓或分析库;
  4. 利用元数据列和 Metrics 做好监控与审计,让 CDC 变成“可观测”的基础设施。
http://www.dtcms.com/a/618565.html

相关文章:

  • 深入浅出Rust编程:Vec 源码解析
  • 山西长治做网站公司有哪些设计软件有哪些手机版
  • java拼图小游戏
  • 【Linux驱动开发】Linux I2C 通信详解:从硬件到驱动再到应用
  • 《Unity优化指南:直击引擎本质的非典型技术路径》
  • 如何修改网站标题济南网站建设凡科
  • 【MySQL | 基础】多表查询
  • 网站建设评比自评情况网站没有收录从哪开始做优化
  • SATA协议深度剖析:从接口到指令集
  • 《嵌入式操作系统》_移植三星原版uboot20251114
  • 云南省和城乡建设厅网站环保空调东莞网站建设
  • 做鞋子网站的域名高端建站设计
  • Android内核进阶之周期更新PCM状态snd_pcm_period_elapsed:用法实例(九十二)
  • 做外汇关注的网站大学生创业做网站的筹资方式
  • HttpServletResponse 与 ResponseEntity 详解
  • 网络安全 | 深入了解OAuth 2.0原理
  • 人人商城网站开发wordpress图片放大镜
  • 2016年软件评测师,web测试案例分析解答
  • Vue 项目实战《尚医通》,获取挂号医生的信息展示,笔记43
  • wordpress创建多站点互联网公司排名伊对排第几
  • 什么网站容易做流量中小微企业名录查询系统
  • 杂志社网站模板wordpress网址导航模板
  • RAG 场景中常用的向量索引
  • 【论文阅读】Harnessing the Power of LLMs in Practice: A Survey on ChatGPT and Beyond
  • LC144 二叉树的前序遍历
  • Map的遍历方式
  • 有没有找人做标书的网站建网站用自己的主机做服务器
  • Day18:二叉树part8(669.修剪二叉搜索树、108.将有序数组转换为二叉搜索树、538.把二叉搜索树转换为累加树)
  • 常见的简单的营销软件宁波seo哪家最便宜
  • 电力系统设备故障因果推理与深度学习驱动的根因分析优化