当前位置: 首页 > news >正文

消费 Kafka 一个TOPIC数据,插入到另一个KAFKA的TOPIC

从 Kafka 消费 CDC 数据(变更捕获,需 Upsert 语义)
用 kafka 连接器 + 主键 + 处理函数 模拟 Upsert,示例:
CREATE TABLE `KAFKA_TEST_0002` (
`LGL_PERN_CODE` VARCHAR COMMENT 'LGL_PERN_CODE',
`LBLTY_ACCT_NUM` VARCHAR COMMENT 'LBLTY_ACCT_NUM',
`ACCT_NM` VARCHAR COMMENT 'ACCT_NM',
`CUST_NUM` VARCHAR COMMENT 'CUST_NUM',
`NAT_CODE` VARCHAR COMMENT 'NAT_CODE',
-- 声明主键(用于 Upsert 去重)
PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
'connector' = 'kafka',  -- 恢复为 kafka 连接器
'topic' = 'KAFKA_TEST_0002',
'properties.bootstrap.servers' = '10.57.48.38:21007,10.57.48.37:21007,10.57.48.36:21007',
'properties.group.id' = '7a074dd07bfb4d4da39eb0f5773b952b',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',  -- 适配 CDC 格式
'debezium-json.ignore-parse-errors' = 'true',
'debezium-json.schema-include' = 'true',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.124dba82_3b54_0125_81e4_110652049a41.com',
'properties.sasl.kerberos.service.name' = 'kafka'
);

-- 如需 Upsert 输出,再通过 Sink 写入 upsert-kafka
CREATE TABLE KafkaUpsertSink (
`LBLTY_ACCT_NUM` VARCHAR,
`LGL_PERN_CODE` VARCHAR,
`ACCT_NM` VARCHAR,
PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
'connector' = 'upsert-kafka',  -- Sink 侧使用 upsert-kafka
'topic' = 'sink_topic',
'properties.bootstrap.servers' = '...',
'key.format' = 'json',
'value.format' = 'json'
);

-- 业务逻辑:从 Kafka 读 CDC 数据,处理后 Upsert 写入
INSERT INTO KafkaUpsertSink
SELECT LBLTY_ACCT_NUM, LGL_PERN_CODE, ACCT_NM
FROM `KAFKA_TEST_0002`;

http://www.dtcms.com/a/279334.html

相关文章:

  • c#如何将不同类型的数据存储到一起
  • 项目进度依赖纸面计划,如何提升计划动态调整能力
  • 基于FinRL深度强化学习框架的股票预测和回测交易
  • 迁移学习:知识复用的智能迁移引擎 | 从理论到实践的跨域赋能范式
  • 什么是神经网络,常用的神经网络,如何训练一个神经网络
  • python 循环遍历取出偶数
  • 「日拱一码」027 深度学习库——PyTorch Geometric(PyG)
  • MCP基础知识二(实战通信方式之Streamable HTTP)
  • 【CTF学习】PWN基础工具的使用(binwalk、foremost、Wireshark、WinHex)
  • ewdyfdfytty
  • LangChain教程——文本嵌入模型
  • 20250714让荣品RD-RK3588开发板在Android13下长按关机
  • Debezium日常分享系列之:提升Debezium性能
  • 制造业实战:数字化集采如何保障千种备件“不断供、不积压”?
  • 16.避免使用裸 except
  • MFC扩展库BCGControlBar Pro v36.2新版亮点:可视化设计器升级
  • 计算机毕业设计Java轩辕购物商城管理系统 基于 SpringBoot 的轩辕电商商城管理系统 Java 轩辕购物平台管理系统设计与实现
  • 面向对象的设计模式
  • 【数据结构】树(堆)·上
  • js的局部变量和全局变量
  • 测试驱动开发(TDD)实战:在 Spring 框架实现中践行 “红 - 绿 - 重构“ 循环
  • Bash vs PowerShell | 从 CMD 到跨平台工具:Bash 与 PowerShell 的全方位对比
  • vue3 服务端渲染时请求接口没有等到数据,但是客户端渲染是请求接口又可以得到数据
  • 7.14 map | 内存 | 二维dp | 二维前缀和
  • python+Request提取cookie
  • 电脑升级Experience
  • python transformers笔记(Trainer类)
  • 代码随想录算法训练营第三十五天|416. 分割等和子集
  • LLM表征工程还有哪些值得做的地方
  • 内部文件审计:企业文件服务器审计对网络安全提升有哪些帮助?