当前位置: 首页 > news >正文

一套GoldenGate → Kafka → Flink → MySQL 的端到端增量同步方案

GoldenGate → Kafka → Flink → MySQL 的端到端增量同步方案,包含:架构、GoldenGate 配置样例、Kafka 主题规划、Flink(含字段映射)示例代码,以及删除/更新的一致性处理要点。你可以直接按步骤落地。

目录

架构一览

一、GoldenGate 配置(到 Kafka)

1. 源端数据库准备(DBA)

2. OGG 参数(简化示例)

二、Flink 侧(Kafka → 字段映射 → MySQL)

1) Maven 关键依赖(你已基本具备)

2) POJO 与解析

3) Kafka Source + 解析 + 字段映射

4) 分流:UPSERT 与 DELETE 分开写入

5) MySQL Sink(幂等等价)

6) 并行 & 多表

三、字段映射(集中配置思路)

四、删除(D)、更新(U)、插入(I)处理要点

五、运维与监控

六、如果你更想用 Flink SQL

你接下来可以做什么


架构一览

  1. Oracle (源库)
    → 2) Oracle GoldenGate Big Data(Kafka Handler 输出 JSON 到 Kafka)
    → 3) Kafka(主题按表划分)
    → 4) Flink 作业(消费 Kafka,做字段映射/清洗/幂等写入)
    → 5) MySQL(目标库,使用 UPSERT/DELETE)

说明:GoldenGate 不走 LogMiner,因此没有“名称>30字符不可捕获”的限制。

一、GoldenGate 配置(到 Kafka)

假设你使用 OGG for Big Data(Microservices 或 Classic 安装均可),通过 Kafka Handler 输出 JSON。

1. 源端数据库准备(DBA)

开启补充日志(最小主键 + 更新前镜像):

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE FORCE LOGGING;-- 关键表打开最小列级补充日志(至少包含主键/唯一键)
ALTER TABLE SCHEMA.SHOP_PLU ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE SCHEMA.SHOP_STK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

2. OGG 参数(简化示例)

GLOBALS(可选)

GGSCHEMA ggadmin

Extract(集成抽取,读 redo)

EXTRACT ext1
USERIDALIAS ogg_alias
TRANLOGOPTIONS INTEGRATEDPARAMS (MAX_SGA_SIZE 1024, PARALLELISM 4)
EXTTRAIL ./dirdat/aa
TABLE SCHEMA.SHOP_PLU;
TABLE SCHEMA.SHOP_STK;

Replicat(Big Data 到 Kafka Handler)

使用 Big Data 适配器将 trail 转 Kafka。你也可以用 Pump + Replicat 分离,但下面直接用一个 replicat 讲解。

REPLICAT rkaf
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 30 SECONDS, RATE
MAP SCHEMA.SHOP_PLU, TARGET SCHEMA.SHOP_PLU;
MAP SCHEMA.SHOP_STK, TARGET SCHEMA.SHOP_STK;
kafka.props(Handler 配置)
放在 $OGG_HOME/dirprm/kafka.props
gg.handlerlist=kafkah
gg.handler.kafkah.type=kafka
gg.handler.kafkah.format=json            # 输出 JSON,便于 Flink 解析
gg.handler.kafkah.mode=op                # 按操作类型产出(I/U/D)
gg.handler.kafkah.topicMappingTemplate=${schemaName}.${tableName}
gg.handler.kafkah.includeTokens=true     # 包含 op_type/op_ts 等元信息
gg.handler.kafkah.format.pretty=false
gg.handler.kafkah.jsonAsString=false# 如果要把 before/after 都打出来(推荐,便于 UPDATE 前后值比较)
gg.handler.kafkah.beforeAfter=true# 生产者配置文件
gg.handler.kafkah.kafkaProducerConfigFile=dirprm/producer.properties# 过滤/投影(可选)
# gg.handler.kafkah.keyMappingTemplate=${primaryKeys}

producer.properties(Kafka Producer)

bootstrap.servers=broker1:9092,broker2:9092
acks=all
retries=5
linger.ms=20
batch.size=65536
max.in.flight.requests.per.connection=1
compression.type=snappy

产出示例(典型 OGG JSON):

{"table": "SCHEMA.SHOP_PLU","op_type": "U",                               // I/U/D"op_ts": "2025-07-31 06:50:10.000000","current_ts": "2025-07-31 06:50:10.123456","pos": "00000000000000000001","before": { "PLU_ID": 1001, "NAME": "old", "PRICE": 12.00 },"after":  { "PLU_ID": 1001, "NAME": "new", "PRICE": 12.50 }
}

主题命名:${schema}.${table},例如 CMPINT.SHOP_PLUCMPINT.SHOP_STK
分区数建议:与目标表写入并行度一致或更高(如 4/8/12),以提升吞吐。

二、Flink 侧(Kafka → 字段映射 → MySQL)

你可以用 DataStream(灵活处理 delete/upsert)Flink SQL(结构更简洁)
由于 OGG 输出的 JSON 不一定是 Debezium 格式,建议使用 DataStream 明确处理 op_type

下面给你 DataStream Java 示例(Flink 1.17,KafkaSource + JDBC Sink),同时演示字段映射与删除处理。

1) Maven 关键依赖(你已基本具备)

  • flink-streaming-java

  • flink-connector-kafka(注意与你的 Flink 版本匹配)

  • flink-connector-jdbc

  • jackson-databind(解析 OGG JSON)

2) POJO 与解析

// OggChange.java
public class OggChange {public String table;public String op_type; // I/U/Dpublic Map<String, Object> before;public Map<String, Object> after;public String op_ts;public String pos;
}

3) Kafka Source + 解析 + 字段映射

SHOP_PLU 为例(演示映射:Oracle.PLN_ID → MySQL.id, NAME→full_name, PRICE→price):

KafkaSource<String> sourcePlu = KafkaSource.<String>builder().setBootstrapServers("broker1:9092,broker2:9092").setGroupId("o2m-sync-plu").setTopics("CMPINT.SHOP_PLU").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).setProperty("isolation.level", "read_committed").build();DataStream<String> rawPlu = env.fromSource(sourcePlu, WatermarkStrategy.noWatermarks(), "OGG Kafka Source SHOP_PLU");ObjectMapper mapper = new ObjectMapper();// 解析 OGG JSON
SingleOutputStreamOperator<OggChange> changePlu = rawPlu.map(v -> mapper.readValue(v, OggChange.class)).name("Parse OGG JSON");// 字段映射后的业务对象
public static class PluEvent {public String op;     // UPSERT or DELETEpublic Long id;       // 映射: PLU_ID -> idpublic String fullName; // 映射: NAME -> full_namepublic BigDecimal price; // 映射: PRICE -> price
}SingleOutputStreamOperator<PluEvent> mappedPlu = changePlu.map(c -> {PluEvent e = new PluEvent();if ("D".equalsIgnoreCase(c.op_type)) {e.op = "DELETE";Map<String, Object> b = c.before;e.id = b == null ? null : ((Number) b.get("PLU_ID")).longValue();} else {e.op = "UPSERT";Map<String, Object> a = c.after;e.id = a == null ? null : ((Number) a.get("PLU_ID")).longValue();e.fullName = a == null ? null : String.valueOf(a.get("NAME"));Object p = a == null ? null : a.get("PRICE");e.price = p == null ? null : new BigDecimal(String.valueOf(p));}return e;
}).name("Field Mapping: SHOP_PLU");

同理再来一个 SHOP_STK 的流,或把多表汇入一个流并在下游按表路由(更灵活)。

4) 分流:UPSERT 与 DELETE 分开写入

SplitStream<PluEvent> split = mappedPlu.split(e -> Collections.singletonList("DELETE".equals(e.op) ? "del" : "upsert"));DataStream<PluEvent> upserts = split.select("upsert");
DataStream<PluEvent> deletes = split.select("del");

5) MySQL Sink(幂等等价)

  • UPSERT 使用:INSERT ... ON DUPLICATE KEY UPDATE ...

  • DELETE 使用:DELETE FROM ... WHERE id=?

    // UPSERT Sink
    upserts.addSink(JdbcSink.sink("INSERT INTO shop_plu (id, full_name, price) VALUES (?, ?, ?)"+ " ON DUPLICATE KEY UPDATE full_name=VALUES(full_name), price=VALUES(price)",(ps, e) -> {ps.setLong(1, e.id);ps.setString(2, e.fullName);if (e.price == null) ps.setNull(3, Types.DECIMAL);else ps.setBigDecimal(3, e.price);},new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build()
    )).name("MySQL UPSERT: SHOP_PLU");// DELETE Sink
    deletes.addSink(JdbcSink.sink("DELETE FROM shop_plu WHERE id = ?",(ps, e) -> ps.setLong(1, e.id),new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build()
    )).name("MySQL DELETE: SHOP_PLU");
    

一致性

  • Kafka Source 可设 setProperty("isolation.level","read_committed") 并开启 Flink checkpointing,拿到至少一次/近似恰一次的语义。

  • JDBC 写 MySQL暂不具备严格两阶段提交的 exactly-once,但通过 幂等 UPSERT + DELETE 按主键,可以达到工程上可接受的一致性。

  • 开启 checkpoint:env.enableCheckpointing(30000);

6) 并行 & 多表

  • Kafka 主题为 CMPINT.SHOP_PLUCMPINT.SHOP_STK,各自 KafkaSource 并行消费,下游映射与写入可设置 setParallelism(4/8)

  • 也可以合并成一个 KafkaSource 订阅通配 CMPINT.*,在解析后按 table 字段路由到不同的 sink。

  • 若量级较大,建议 分表分主题 + 分 sink,便于扩展与隔离。


三、字段映射(集中配置思路)

把 Oracle→MySQL 的映射关系做成一张表或一个配置 Map,便于后续扩展:

// UPSERT Sink
upserts.addSink(JdbcSink.sink("INSERT INTO shop_plu (id, full_name, price) VALUES (?, ?, ?)"+ " ON DUPLICATE KEY UPDATE full_name=VALUES(full_name), price=VALUES(price)",(ps, e) -> {ps.setLong(1, e.id);ps.setString(2, e.fullName);if (e.price == null) ps.setNull(3, Types.DECIMAL);else ps.setBigDecimal(3, e.price);},new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build()
)).name("MySQL UPSERT: SHOP_PLU");// DELETE Sink
deletes.addSink(JdbcSink.sink("DELETE FROM shop_plu WHERE id = ?",(ps, e) -> ps.setLong(1, e.id),new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build()
)).name("MySQL DELETE: SHOP_PLU");

你可以写一个通用映射器:把 after Map 根据 mapping 产出目标 POJO;对 DELETE 则用 before 取主键。


四、删除(D)、更新(U)、插入(I)处理要点

  • I/U:统一当成 UPSERT

  • D:单独走 DELETE sink。

  • 空值:注意 null 的处理,尤其是 Decimal/时间类型;MySQL 列允许为 NULL 时注意 setNull

  • 主键:目标表上必须有主键/唯一键以支持 UPSERT。

  • 时间:如果 OGG JSON 给的是字符串时间,自行解析或直接落到 DATETIME 字段。


五、运维与监控

  • Kafka Topic:按表拆分、合理分区数;启用压缩(snappy/lz4)。

  • Flink:开启 checkpoint;注意重启策略(fixed-delay / failure-rate)。

  • MySQL:分库分表或二级索引优化写入;批量大小调优。

  • 告警:消费延迟(Kafka lag)、失败重试次数、sink 异常等。


六、如果你更想用 Flink SQL

如果你能把 OGG 的 JSON 统一成“扁平结构”,或通过 UDF 解析为行级变更(包含 op),也可以用 Flink SQL:

  • Kafka Source: 'format'='json',字段含 op_type/各列;

  • 通过 WHERE op_type IN ('I','U') 写入 UPSERT Sink;

  • 通过另一个 INSERT + JDBC Sink 执行 DELETE(使用 sql("EXECUTE STATEMENT ...") 的方式或写自定义 DynamicTableSink)。

但 DataStream 在处理 DELETE 时会更直接稳妥。

http://www.dtcms.com/a/336928.html

相关文章:

  • 云计算学习100天-第17天
  • Linux学习-(进程间,线程间通信)
  • nuScence数据集
  • 计算机视觉 图像处理 在两张二值图中检测线条交集点的高效方法 适合工程图纸比对、生物神经元网络分析和文档特征提取等场景 ,
  • 20. 云计算-Service MeshServerless
  • 谷粒商城项目-P3简介-分布式基础概念
  • CloudBase AI ToolKit + VSCode Copilot:打造高效智能云端开发新体验
  • 【运维进阶】LNMP + WordPress 自动化部署实验
  • CMakeLists.txt 学习笔记
  • MariaDB/MySQL 客户端工具与服务端配置精要指南
  • C++---有符号和无符号整数的位移操作
  • 云原生俱乐部-mysql知识点归纳(1)
  • 《亚矩阵云手机重构出租接单:KVM 虚拟化与边缘计算驱动的设备替代技术路径》
  • 8.18决策树
  • 性能测试(Jemter)
  • grep命令要点、详解和示例
  • 基于nvm安装管理多个node.js版本切换使用(附上详细安装使用图文教程+nvm命令大全)
  • QT第九讲- 控件委托
  • Git智能合并机制深度解析
  • ChatGPT-5 对教育行业的影响与案例研究
  • Qt笔试题
  • 录像视频删除如何恢复?手机电脑的录像恢复技巧
  • 给linux的root磁盘扩容
  • 手游搬砖对云手机的需求有哪些?
  • 机器学习实例应用
  • 获粤港澳大湾区碳足迹认证:遨游智能三防手机赋能绿色通信
  • VLN视觉语言导航(3)——神经网络的构建和优化 2.3
  • 二十八、案例-部门管理-查询
  • Android中flavor的使用
  • 项目实战——矿物识别系统(利用机器学习从化学元素数据中识别矿物,从数据到分类模型)