当前位置: 首页 > news >正文

Streaming ELT with Flink CDC · OceanBase Sink

1. OceanBase Connector 能做什么?

官方能力可以总结为“三件套”:

  1. 自动建表(Create table automatically if not exist)

    • 当 OceanBase 中不存在目标表时,可根据源表结构自动创建;
    • 字段名与字段类型会依据映射规则自动生成。
  2. Schema 变更同步(Schema change synchronization)

    当前支持的变更类型:

    • 新增列(add column),新列总是追加到字段列表最后;
    • 重命名列(rename column)。
  3. 数据同步(Data synchronization)

    • 订阅上游 MySQL 的增量变更(Flink CDC Source);
    • 使用 OceanBase Sink Connector 将变更写入 OceanBase;
    • 整体保证 at-least-once 语义(不支持 exactly-once)。

用一句话概括:

你只要配好 Source + Sink + Pipeline,OceanBase Connector 就能帮你把「自动建表 + 跟踪结构变化 + 持续写入」全部接好。

2. MySQL → OceanBase Pipeline 示例

先看一个完整的官方示例,感受一下结构:

source:type: mysqlhostname: mysqlport: 3306username: mysqluserpassword: mysqlpwtables: mysql_2_oceanbase_test_17l13vc.\.*server-id: 5400-5404server-time-zone: UTCsink:type: oceanbaseurl: jdbc:mysql://oceanbase:2881/testusername: root@testpassword: passwordpipeline:name: MySQL to OceanBase Pipelineparallelism: 1

2.1 Source:MySQL 变更捕获

source:type: mysqlhostname: mysqlport: 3306username: mysqluserpassword: mysqlpwtables: mysql_2_oceanbase_test_17l13vc.\.*server-id: 5400-5404server-time-zone: UTC

这里就是标准的 Flink CDC MySQL Source:

  • tables:支持正则匹配某个库下的所有表;
  • server-id:指定一段 server-id 范围,方便并行任务使用;
  • server-time-zone:指定 MySQL 服务器时区,保证时间字段解析正确。

你可以直接替换成自己项目的库名、账号即可。

2.2 Sink:OceanBase Connector 配置

sink:type: oceanbaseurl: jdbc:mysql://oceanbase:2881/testusername: root@testpassword: password

关键点:

  • type: oceanbase:使用 OceanBase 作为 Sink;
  • url:标准 MySQL JDBC URL,指向 OceanBase 的 MySQL 租户,例如:
    jdbc:mysql://ob-host:2881/database
  • username:类似 root@test@test 是租户名;
  • password:对应密码。

2.3 pipeline:全局任务设置

pipeline:name: MySQL to OceanBase Pipelineparallelism: 1
  • name:任务名,在 Flink CDC UI 和日志中会显示;
  • parallelism:并行度。
    注意:若后续开启 Direct Load 模式,Sink 并行度必须为 1,这里要一起考虑。

3. 连接器配置项:按功能分组理解

OceanBase Connector 提供了一堆可选参数,我们分块看会更清晰。

3.1 基础连接 & 驱动相关

参数是否必填默认值说明
type(none)固定为 oceanbase
url(none)JDBC URL
username(none)OceanBase MySQL 租户账号
password(none)密码
schema-name(none)schema / database 名,可选
table-name(none)表名,可选(通常自动推断)
driver-class-namecom.mysql.cj.jdbc.DriverJDBC 驱动类,默认 MySQL 驱动
druid-properties(none)Druid 连接池配置,key=value; 分隔

两点要注意:

  1. 连接器不自带驱动:运行环境中必须引入 MySQL JDBC 驱动;

  2. druid-properties:可以配置连接池参数,比如:

    druid-properties: maxActive=20;minIdle=5;maxWait=60000
    

3.2 缓冲写入 & 重试策略

参数默认值类型说明
sync-writefalseBoolean是否同步写入,true 时不使用缓冲
buffer-flush.interval1sDuration缓冲 flush 间隔,设为 0 则不做周期 flush
buffer-flush.buffer-size1000Integer缓冲区大小(条数)
max-retries3Integer写入失败时最大重试次数

典型模式:

  • 低延迟

    • sync-write: true,或者
    • buffer-flush.interval 调小(如 100ms ~ 500ms);
  • 高吞吐

    • sync-write: false(默认);
    • 增大 buffer-flush.buffer-sizebuffer-flush.interval(比如 2s);
  • max-retries 通常 3~5 即可,再高会让问题“拖太久”。

3.3 memstore 使用率检查

参数默认值类型说明
memstore-check.enabledtrueBoolean是否开启 memstore 检查
memstore-check.threshold0.9Doublememstore 使用率阈值(相对限制值的比例)
memstore-check.interval30sDuration检查间隔

OceanBase 写入会占用 memstore,如果使用率太高还在猛写,很容易出问题。
这个机制相当于是一个“安全阀”:

  • 当开启检查且 memstore 使用率超过阈值时,会放缓写入节奏;
  • 默认 0.9 比较保守,通常不用动;
  • 如果经常打到 0.9,可以一方面降并发/减缓冲,一方面从 OceanBase 侧扩资源。

3.4 按分区写入(partition.enabled)

参数默认值类型说明
partition.enabledfalseBoolean是否按分区计算并按分区 flush,仅在 sync-write=falsedirect-load.enabled=false 时生效

简单说:

  • 默认是“整体缓冲、整体 flush”;
  • 开启 partition.enabled 后,会根据某种分区逻辑(通常是表分区键)对记录做分组、按分区写;
  • 对分区表场景会友好一些,写入更有 locality。

3.5 Direct Load 模式

Direct Load 更偏向于批量装载场景,相关配置如下:

参数默认值类型说明
direct-load.enabledfalseBoolean是否开启 Direct Load
direct-load.host(none)StringDirect Load 使用的主机名
direct-load.port2882StringDirect Load 使用的 RPC 端口
direct-load.parallel8IntegerDirect Load 内部并行度
direct-load.max-error-rows0Long允许的最大错误行数
direct-load.dup-actionREPLACEString遇到重复记录时的行为(默认 REPLACE)
direct-load.timeout7dDurationDirect Load 任务超时时间
direct-load.heartbeat-timeout30sDuration客户端心跳超时时间

注意:开启 direct-load.enabled 时,Sink 并行度必须为 1
适合大批量导入 + 少量实时增量的混合场景,比如:

  • 先用 Direct Load 做全量导入;
  • 再改成普通模式做增量同步。

4. 使用注意事项(Usage Notes)

文档里的几条 Usage Notes 非常关键。

4.1 只支持 MySQL 租户

Currently only supports MySQL tenants of OceanBase

这个 Connector 目前只支持 OceanBase 的 MySQL 租户,不支持 Oracle 租户。
也就是说你的目标库必须是 MySQL 模式的租户。

4.2 一致性语义:at-least-once

Provides at-least-once semantics, exactly-once is not supported yet

  • 整条链路是 at-least-once
  • 出问题时会重试写入,可能导致重复写;
  • 可以通过主键 / 唯一键 + REPLACE 等策略,在业务侧实现“幂等”。

4.3 自动建表:不带分区键

For creating table automatically
there is no partition key

自动建表时不会自动加分区键,表是“无显式分区策略”的。
如果你想用更合理的分区(按日期、按 hash),建议手动建表 后再用 Connector 写入。

4.4 Schema 变更:只支持新增列与重命名列

For schema change synchronization

  • Currently, only adding new columns and renaming columns are supported
  • the new column will always be added to the last position
  • 支持:新增列 + 重命名列;
  • 不支持:直接改类型、删列等激进变更;
  • 新增列会自动追加到字段列表最后。

因此,在设计上游表变更时,最好遵循“向后兼容”的策略,多用新增列来扩展。

5. 数据类型映射:CDC → OceanBase(MySQL Tenant)

最后是类型映射,这会影响自动建表的字段类型。

CDC typeOceanBase type(MySQL tenant)说明
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
BINARYBINARY
VARBINARY(n) ≤ 1,048,576VARBINARY(n)
VARBINARY(n) > 1,048,576LONGBLOB大于 1MB 用 LONGBLOB
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p, s)DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIMETIME
TIMESTAMPDATETIME
TIMESTAMP_TZTIMESTAMP带时区
TIMESTAMP_LTZTIMESTAMP本地时区
CHAR(n) ≤ 256CHAR(n)
CHAR(n) > 256VARCHAR(n)超长 CHAR 转 VARCHAR
VARCHAR(n) ≤ 262,144VARCHAR(n)
VARCHAR(n) > 262,144TEXT超长字符串转 TEXT

实战里可以记几个关键点:

  1. VARBINARY 超过 1MB 直接变 LONGBLOB
    这类字段存大文件没问题,但成本高,尽量只在确实需要时使用。

  2. 超大 VARCHAR 转 TEXT
    同样是大字段,通常不走索引,查询开销较大;
    如果只是偶尔长一点的备注类字段,可以事先限制长度。

  3. 超长 CHAR 会自动改为 VARCHAR
    很多场景不需要大 CHAR,统一用 VARCHAR 即可。

  4. 时间类型映射稍微有点差异

    • TIMESTAMPDATETIME
    • TIMESTAMP_TZ / TIMESTAMP_LTZTIMESTAMP

    在做时区敏感的计算时,注意区分。

6. 小结:什么时候用 OceanBase Connector?

简单总结一下:

  • 你已经在用 Flink CDC 做 MySQL 捕获
  • 下游希望落到 OceanBase(MySQL 租户)
  • 希望尽可能自动化:自动建表、自动跟踪新增字段、自动写入;
http://www.dtcms.com/a/618247.html

相关文章:

  • 环境变量与地址
  • C/C++爱心①
  • 7.4、Python-变量的作用域
  • 英文专业的网站建设网站设计建设流程
  • 【教程】用Python复刻经典小游戏(贪吃蛇、扫雷)
  • 在智联招聘网站做销售最新国际足球世界排名
  • 垃圾回收算法(GC Algorithm)基石:标记-清除、复制、标记-整理
  • 中保研汽车小偏置碰撞案例分析
  • 广西建设厅查询网站wordpress 批量导入评论
  • AI工具 Claude code 常用命令和标注汇总
  • 车联网GPS测试:GPS动态欺骗测试 || GPS信号干扰测试.
  • <script setup> 实战模式:大型组件怎么拆?
  • 关键词解释:迁移学习(Transfer Learning)
  • 网站建设方案书简单wordpress主页加音乐
  • 这样做网站标志设计公司有哪些
  • 【MacBook】自动隐藏和显示菜单栏
  • 在Mysql环境下对数据进行增删改查
  • C#类修饰符功能与范围详解
  • 一个网站怎么留住用户做的比较好的国外网站一级页面布局分析
  • 可信网站是否必须做南阳网站备案
  • 【Linux驱动开发】Linux 设备驱动中的阻塞与非阻塞 I/O:机制、源码与示例
  • HarmonyOS新闻卡片组件开发实战:自定义组件与List渲染深度解析
  • 解决:jenkins Exception java.lang.NoSuchFieldError: SNAKE_CASE
  • 如何实现Redis安装与使用的详细教程
  • tensorflow+yolo图片训练和图片识别系统
  • 唯品会 一家专门做特卖的网站现在前端开发用什么技术
  • 图神经网络分享系列-GraphSage(Inductive Representation Learning on Large Graphs) (一)
  • leetcode对称二叉树
  • 网站开发设计心得及体会河南建设工程造价管理协会网站
  • 深度学习实战:(2)用 TensorFlow 1.x 构建手语识别模型