一套GoldenGate → Kafka → Flink → MySQL 的端到端增量同步方案
GoldenGate → Kafka → Flink → MySQL 的端到端增量同步方案,包含:架构、GoldenGate 配置样例、Kafka 主题规划、Flink(含字段映射)示例代码,以及删除/更新的一致性处理要点。你可以直接按步骤落地。
目录
架构一览
一、GoldenGate 配置(到 Kafka)
1. 源端数据库准备(DBA)
2. OGG 参数(简化示例)
二、Flink 侧(Kafka → 字段映射 → MySQL)
1) Maven 关键依赖(你已基本具备)
2) POJO 与解析
3) Kafka Source + 解析 + 字段映射
4) 分流:UPSERT 与 DELETE 分开写入
5) MySQL Sink(幂等等价)
6) 并行 & 多表
三、字段映射(集中配置思路)
四、删除(D)、更新(U)、插入(I)处理要点
五、运维与监控
六、如果你更想用 Flink SQL
你接下来可以做什么
架构一览
-
Oracle (源库)
→ 2) Oracle GoldenGate Big Data(Kafka Handler 输出 JSON 到 Kafka)
→ 3) Kafka(主题按表划分)
→ 4) Flink 作业(消费 Kafka,做字段映射/清洗/幂等写入)
→ 5) MySQL(目标库,使用 UPSERT/DELETE)
说明:GoldenGate 不走 LogMiner,因此没有“名称>30字符不可捕获”的限制。
一、GoldenGate 配置(到 Kafka)
假设你使用 OGG for Big Data(Microservices 或 Classic 安装均可),通过 Kafka Handler 输出 JSON。
1. 源端数据库准备(DBA)
开启补充日志(最小主键 + 更新前镜像):
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE FORCE LOGGING;-- 关键表打开最小列级补充日志(至少包含主键/唯一键)
ALTER TABLE SCHEMA.SHOP_PLU ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE SCHEMA.SHOP_STK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
2. OGG 参数(简化示例)
GLOBALS(可选)
GGSCHEMA ggadmin
Extract(集成抽取,读 redo)
EXTRACT ext1
USERIDALIAS ogg_alias
TRANLOGOPTIONS INTEGRATEDPARAMS (MAX_SGA_SIZE 1024, PARALLELISM 4)
EXTTRAIL ./dirdat/aa
TABLE SCHEMA.SHOP_PLU;
TABLE SCHEMA.SHOP_STK;
Replicat(Big Data 到 Kafka Handler)
使用 Big Data 适配器将 trail 转 Kafka。你也可以用 Pump + Replicat 分离,但下面直接用一个 replicat 讲解。
REPLICAT rkaf
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 30 SECONDS, RATE
MAP SCHEMA.SHOP_PLU, TARGET SCHEMA.SHOP_PLU;
MAP SCHEMA.SHOP_STK, TARGET SCHEMA.SHOP_STK;
kafka.props(Handler 配置)
放在 $OGG_HOME/dirprm/kafka.props
gg.handlerlist=kafkah
gg.handler.kafkah.type=kafka
gg.handler.kafkah.format=json # 输出 JSON,便于 Flink 解析
gg.handler.kafkah.mode=op # 按操作类型产出(I/U/D)
gg.handler.kafkah.topicMappingTemplate=${schemaName}.${tableName}
gg.handler.kafkah.includeTokens=true # 包含 op_type/op_ts 等元信息
gg.handler.kafkah.format.pretty=false
gg.handler.kafkah.jsonAsString=false# 如果要把 before/after 都打出来(推荐,便于 UPDATE 前后值比较)
gg.handler.kafkah.beforeAfter=true# 生产者配置文件
gg.handler.kafkah.kafkaProducerConfigFile=dirprm/producer.properties# 过滤/投影(可选)
# gg.handler.kafkah.keyMappingTemplate=${primaryKeys}
producer.properties(Kafka Producer)
bootstrap.servers=broker1:9092,broker2:9092
acks=all
retries=5
linger.ms=20
batch.size=65536
max.in.flight.requests.per.connection=1
compression.type=snappy
产出示例(典型 OGG JSON):
{"table": "SCHEMA.SHOP_PLU","op_type": "U", // I/U/D"op_ts": "2025-07-31 06:50:10.000000","current_ts": "2025-07-31 06:50:10.123456","pos": "00000000000000000001","before": { "PLU_ID": 1001, "NAME": "old", "PRICE": 12.00 },"after": { "PLU_ID": 1001, "NAME": "new", "PRICE": 12.50 }
}
主题命名:${schema}.${table}
,例如 CMPINT.SHOP_PLU
、CMPINT.SHOP_STK
。
分区数建议:与目标表写入并行度一致或更高(如 4/8/12),以提升吞吐。
二、Flink 侧(Kafka → 字段映射 → MySQL)
你可以用 DataStream(灵活处理 delete/upsert) 或 Flink SQL(结构更简洁)。
由于 OGG 输出的 JSON 不一定是 Debezium 格式,建议使用 DataStream 明确处理 op_type
。
下面给你 DataStream Java 示例(Flink 1.17,KafkaSource + JDBC Sink),同时演示字段映射与删除处理。
1) Maven 关键依赖(你已基本具备)
-
flink-streaming-java
-
flink-connector-kafka
(注意与你的 Flink 版本匹配) -
flink-connector-jdbc
-
jackson-databind
(解析 OGG JSON)
2) POJO 与解析
// OggChange.java
public class OggChange {public String table;public String op_type; // I/U/Dpublic Map<String, Object> before;public Map<String, Object> after;public String op_ts;public String pos;
}
3) Kafka Source + 解析 + 字段映射
以 SHOP_PLU
为例(演示映射:Oracle.PLN_ID → MySQL.id, NAME→full_name, PRICE→price):
KafkaSource<String> sourcePlu = KafkaSource.<String>builder().setBootstrapServers("broker1:9092,broker2:9092").setGroupId("o2m-sync-plu").setTopics("CMPINT.SHOP_PLU").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).setProperty("isolation.level", "read_committed").build();DataStream<String> rawPlu = env.fromSource(sourcePlu, WatermarkStrategy.noWatermarks(), "OGG Kafka Source SHOP_PLU");ObjectMapper mapper = new ObjectMapper();// 解析 OGG JSON
SingleOutputStreamOperator<OggChange> changePlu = rawPlu.map(v -> mapper.readValue(v, OggChange.class)).name("Parse OGG JSON");// 字段映射后的业务对象
public static class PluEvent {public String op; // UPSERT or DELETEpublic Long id; // 映射: PLU_ID -> idpublic String fullName; // 映射: NAME -> full_namepublic BigDecimal price; // 映射: PRICE -> price
}SingleOutputStreamOperator<PluEvent> mappedPlu = changePlu.map(c -> {PluEvent e = new PluEvent();if ("D".equalsIgnoreCase(c.op_type)) {e.op = "DELETE";Map<String, Object> b = c.before;e.id = b == null ? null : ((Number) b.get("PLU_ID")).longValue();} else {e.op = "UPSERT";Map<String, Object> a = c.after;e.id = a == null ? null : ((Number) a.get("PLU_ID")).longValue();e.fullName = a == null ? null : String.valueOf(a.get("NAME"));Object p = a == null ? null : a.get("PRICE");e.price = p == null ? null : new BigDecimal(String.valueOf(p));}return e;
}).name("Field Mapping: SHOP_PLU");
同理再来一个
SHOP_STK
的流,或把多表汇入一个流并在下游按表路由(更灵活)。
4) 分流:UPSERT 与 DELETE 分开写入
SplitStream<PluEvent> split = mappedPlu.split(e -> Collections.singletonList("DELETE".equals(e.op) ? "del" : "upsert"));DataStream<PluEvent> upserts = split.select("upsert");
DataStream<PluEvent> deletes = split.select("del");
5) MySQL Sink(幂等等价)
-
UPSERT 使用:
INSERT ... ON DUPLICATE KEY UPDATE ...
-
DELETE 使用:
DELETE FROM ... WHERE id=?
// UPSERT Sink upserts.addSink(JdbcSink.sink("INSERT INTO shop_plu (id, full_name, price) VALUES (?, ?, ?)"+ " ON DUPLICATE KEY UPDATE full_name=VALUES(full_name), price=VALUES(price)",(ps, e) -> {ps.setLong(1, e.id);ps.setString(2, e.fullName);if (e.price == null) ps.setNull(3, Types.DECIMAL);else ps.setBigDecimal(3, e.price);},new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build() )).name("MySQL UPSERT: SHOP_PLU");// DELETE Sink deletes.addSink(JdbcSink.sink("DELETE FROM shop_plu WHERE id = ?",(ps, e) -> ps.setLong(1, e.id),new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build() )).name("MySQL DELETE: SHOP_PLU");
一致性:
Kafka Source 可设
setProperty("isolation.level","read_committed")
并开启 Flink checkpointing,拿到至少一次/近似恰一次的语义。JDBC 写 MySQL暂不具备严格两阶段提交的 exactly-once,但通过 幂等 UPSERT + DELETE 按主键,可以达到工程上可接受的一致性。
开启 checkpoint:
env.enableCheckpointing(30000);
6) 并行 & 多表
-
Kafka 主题为
CMPINT.SHOP_PLU
、CMPINT.SHOP_STK
,各自KafkaSource
并行消费,下游映射与写入可设置setParallelism(4/8)
。 -
也可以合并成一个
KafkaSource
订阅通配CMPINT.*
,在解析后按table
字段路由到不同的 sink。 -
若量级较大,建议 分表分主题 + 分 sink,便于扩展与隔离。
三、字段映射(集中配置思路)
把 Oracle→MySQL 的映射关系做成一张表或一个配置 Map,便于后续扩展:
// UPSERT Sink
upserts.addSink(JdbcSink.sink("INSERT INTO shop_plu (id, full_name, price) VALUES (?, ?, ?)"+ " ON DUPLICATE KEY UPDATE full_name=VALUES(full_name), price=VALUES(price)",(ps, e) -> {ps.setLong(1, e.id);ps.setString(2, e.fullName);if (e.price == null) ps.setNull(3, Types.DECIMAL);else ps.setBigDecimal(3, e.price);},new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build()
)).name("MySQL UPSERT: SHOP_PLU");// DELETE Sink
deletes.addSink(JdbcSink.sink("DELETE FROM shop_plu WHERE id = ?",(ps, e) -> ps.setLong(1, e.id),new JdbcExecutionOptions.Builder().withBatchSize(500).withBatchIntervalMs(1000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://mysqlhost:3306/targetdb?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("user").withPassword("pwd").build()
)).name("MySQL DELETE: SHOP_PLU");
你可以写一个通用映射器:把 after
Map 根据 mapping 产出目标 POJO;对 DELETE
则用 before
取主键。
四、删除(D)、更新(U)、插入(I)处理要点
-
I/U:统一当成 UPSERT。
-
D:单独走 DELETE sink。
-
空值:注意
null
的处理,尤其是 Decimal/时间类型;MySQL 列允许为 NULL 时注意setNull
。 -
主键:目标表上必须有主键/唯一键以支持 UPSERT。
-
时间:如果 OGG JSON 给的是字符串时间,自行解析或直接落到
DATETIME
字段。
五、运维与监控
-
Kafka Topic:按表拆分、合理分区数;启用压缩(snappy/lz4)。
-
Flink:开启 checkpoint;注意重启策略(fixed-delay / failure-rate)。
-
MySQL:分库分表或二级索引优化写入;批量大小调优。
-
告警:消费延迟(Kafka lag)、失败重试次数、sink 异常等。
六、如果你更想用 Flink SQL
如果你能把 OGG 的 JSON 统一成“扁平结构”,或通过 UDF 解析为行级变更(包含 op
),也可以用 Flink SQL:
-
Kafka Source:
'format'='json'
,字段含op_type
/各列; -
通过
WHERE op_type IN ('I','U')
写入 UPSERT Sink; -
通过另一个
INSERT
+ JDBC Sink 执行DELETE
(使用sql("EXECUTE STATEMENT ...")
的方式或写自定义 DynamicTableSink)。
但 DataStream 在处理 DELETE 时会更直接稳妥。