当前位置: 首页 > news >正文

Flink 通过 Chunjun Oracle LogMiner 实时读取 Oracle 变更日志并写入 Doris 的方案

文章目录

  • 一、 技术背景
  • 二、 关键技术
    • 1、 Oracle LogMiner
    • 2、 Chunjun 的 LogMiner 关键流程
    • 3、修复 Chunjun Oracle LogMiner 问题

一、 技术背景

在大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris,以支持 数据分析、BI 报表、实时数据仓库 等应用。

本方案基于 Flink + Chunjun,通过 Oracle LogMiner 解析 Redo Log,实现 低延迟 写入Doris。

 

二、 关键技术

1、 Oracle LogMiner

LogMiner 是 Oracle 提供的 redo log 解析工具,用于跟踪 INSERTUPDATEDELETE 操作。

使用LogMiner需要现在Oracle中开启,具体开启操作见:Oracle配置LogMiner

 

2、 Chunjun 的 LogMiner 关键流程

Chunjun(原 FlinkX)是 Flink 生态的数据同步框架,支持多种数据源连接器(如 Oracle、MySQL、PostgreSQL、Doris)。
其中 Chunjun Oracle LogMiner Source 用于解析 Oracle Redo Log 并转换为 Flink 数据流

如下整个流程架构:

在这里插入图片描述

Flink任务启动后

  1. 通过Chunjun的oracle logMiner连接器, 建立 Oracle 连接,启动 LogMiner 解析 Redo Log。
  2. 实时监听 V$LOGMNR_CONTENTS,解析变更数据并转换为 Flink 事件流。具体地会将Oracle不同的操作日志解析为如下数据类型即重放数据操作,
  3. Flink 任务处理数据,完成转换、清洗等操作。
  4. Flink Sink 组件(Chunjun Doris Sink)将数据写入 Doris
操作类型before(旧数据)after(新数据)Flink 处理逻辑
INSERT{新数据}直接插入
UPDATE{旧数据}{新数据}先删除旧数据,再插入新数据
DELETE{旧数据}删除数据

最后如下示例flink sql:


CREATE TABLE source  
(  
    ID             int,  
    NAME          string  
) WITH (  
      'connector' = 'oraclelogminer-x'  
      ,'url' = 'jdbc:oracle:thin:@//xxx:1521/ORCL'  
      ,'username' = 'system'  
      ,'password' = 'xxx'  
      ,'cat' = 'insert,delete,update'  
      ,'table' = 'TEST.TEST_USER'  
      ,'timestamp-format.standard' = 'SQL'  
      );  
  
  
CREATE TABLE sink  
(  
     k4             int,  
     k3          string  
) WITH (  
'connector' = 'doris-x',  
'schema'='demo',  
      'password' = 'xxx',  
      'table-name' = 'mytable',  
      'url' = 'jdbc:mysql://xxx:9030',  
      'username' = 'root',  
      'sink.parallelism' = '1',  
      'lookup.error-limit' = '100',  
      'lookup.cache-type' = 'LRU',  
      'lookup.parallelism' = '1',  
      'lookup.cache.ttl' = '60000',  
      'lookup.cache.max-rows' = '10000',  
      'writeMode'='UPSERT'  
      );  
  
  
insert into sink  
select ID as k4, NAME as k3  
from source;  
  

 

3、修复 Chunjun Oracle LogMiner 问题

在实际使用中,Chunjun Oracle LogMiner 会遇到以下问题:

  1. 关于全量增量读数据的问题
//LogMinerConfig,没有全量同步的外部配置,默认是增量读取数据
private boolean enableFetchAll = true;

  1. 无法获取监听的表
//LogMinerListener 中的LogMinerConfig没有set table的地方,
//即无法获取被监听的表,改成直接获取
logMinerConfig.getListenerTables(); 

  1. PavingData和Split 不能同时开启,默认都开启,将PavingData关闭

 

相关文章:

  • DAY36贪心算法Ⅴ
  • Linux常用指令(3)
  • SQL授予用户查询某个模式或者具体某个表
  • 分布式事务解决方案简介
  • AI大模型:(二)1.1 deepseek+ollama本地快速部署
  • 关于Flask框架30道面试题及解析
  • CUDAOpenCV 基于Hessian矩阵计算特征值
  • 蓝桥杯 之 数论
  • C++学习之QT中HTTP正则表达式
  • 基于 ABAP RESTful 应用程序编程模型开发 OData V4 服务
  • 面试复习-基础网络+运维知识
  • 指针与引用的深度解析 (408数据结构入门)
  • 深入解析数据结构中的表:从数组到哈希表
  • 新能源市场科技变革:用Python解码产业趋势与技术创新
  • C 语 言 --- 操 作 符 2
  • 开源新星YT-Navigator:重新定义你的视频探索之旅!
  • Embedding类与word2vec模型
  • SQL Server——表数据的插入、修改和删除
  • 信息学奥赛一本通 1610:玩具装箱 | 洛谷 P3195 [HNOI2008] 玩具装箱
  • 银联无感支付实现
  • 伊朗最高领袖顾问:伊朗愿承诺永不制造核武,换取美解除制裁
  • 上海国际电影节纪录片单元,还世界真实色彩
  • 袁思达已任中国科学院办公厅主任
  • 央行:中国政府债务扩张仍有可持续性
  • “浦东时刻”在京展出:沉浸式体验海派风情
  • 高进华“控股”后首份年报出炉,史丹利账上可动资金大幅缩水