当前位置: 首页 > news >正文

Flink SQL 将kafka topic的数据写到另外一个topic里面

-- 创建源表,使用 RAW 格式接收原始 JSON 数据
CREATE TABLE source_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'source_kafka-topic',
'properties.bootstrap.servers' = 'master01:9092',
'properties.group.id' = 'flink-kafka-group',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 创建目标表,同样使用 RAW 格式保持数据原样
CREATE TABLE sink_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink-kafka-topic',
'properties.bootstrap.servers' = 'master02:9092',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 执行复制操作
INSERT INTO sink_kafka
SELECT * FROM source_kafka;
  • value.fields-include

定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

  • 参考文档:
    https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/#%E6%B6%88%E6%81%AF%E9%94%AEkey%E4%B8%8E%E6%B6%88%E6%81%AF%E4%BD%93value%E7%9A%84%E6%A0%BC%E5%BC%8F

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/

http://www.dtcms.com/a/189878.html

相关文章:

  • PyQt5基本窗口控件(QComboBox(下拉列表框))
  • Webpack其他插件
  • 【计算机网络】TLS中的对称加密和非对称加密的应用,应对第三方抓包的双向https认证
  • 数据库系统概论|第七章:数据库设计—课程笔记
  • 计算机网络-MPLS VPN基础概念
  • 如何设置FFmpeg实现对高分辨率视频进行转码
  • 高速数字测试利器,新款是德科技UXR0504B示波器
  • 多模态和多智能体系统与理性的结合综述研究
  • 2天长沙旅游规划
  • MFC 调用海康相机进行软触发
  • 【ROS】将Qt的Pro工程转换到ROS2的colcon
  • 【springcloud学习(dalston.sr1)】使用Feign实现接口调用(八)
  • spark小任务
  • AI产品上市前的“安全通行证“
  • 高防ip支持哪些网络协议
  • HDD 安全擦除:何时以及如何在 Windows PC 上安全擦除硬盘
  • vue3:十三、分类管理-表格--slot插槽详细说明---表格内拼接字段、tag标签
  • 微信小程序学习之搜索框
  • 【工具变量】各省市场化指数-杨兴权版共三个方法(1997-2023年)
  • C++类和对象之相关特性
  • bfs-最小步数问题
  • Leetcode数组day1
  • SpringAI
  • HandlerInterceptor介绍-笔记
  • NC65开发环境(eclipse启动)在企业报表中的报表数据中心里计算某张报表时,一直计算不出数据的解决办法。
  • C++类和对象练习:Date类实现日期的差,比较日期的大小,日期的前置后置++,--,输入输出Date类,对默认函数的练习。
  • uniapp使用全局组件,
  • Django + Celery 打造企业级大模型异步任务管理平台 —— 从需求到完整实践(含全模板源码)
  • VCS X-PROP建模以及在方针中的应用
  • 【MySQL】变更缓冲区:作用、主要配置以及如何查看