当前位置: 首页 > news >正文

Flink SQL 将kafka topic的数据写到另外一个topic里面

-- 创建源表,使用 RAW 格式接收原始 JSON 数据
CREATE TABLE source_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'source_kafka-topic',
'properties.bootstrap.servers' = 'master01:9092',
'properties.group.id' = 'flink-kafka-group',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 创建目标表,同样使用 RAW 格式保持数据原样
CREATE TABLE sink_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink-kafka-topic',
'properties.bootstrap.servers' = 'master02:9092',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 执行复制操作
INSERT INTO sink_kafka
SELECT * FROM source_kafka;
  • value.fields-include

定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

  • 参考文档:
    https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/#%E6%B6%88%E6%81%AF%E9%94%AEkey%E4%B8%8E%E6%B6%88%E6%81%AF%E4%BD%93value%E7%9A%84%E6%A0%BC%E5%BC%8F

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/

相关文章:

  • PyQt5基本窗口控件(QComboBox(下拉列表框))
  • Webpack其他插件
  • 【计算机网络】TLS中的对称加密和非对称加密的应用,应对第三方抓包的双向https认证
  • 数据库系统概论|第七章:数据库设计—课程笔记
  • 计算机网络-MPLS VPN基础概念
  • 如何设置FFmpeg实现对高分辨率视频进行转码
  • 高速数字测试利器,新款是德科技UXR0504B示波器
  • 多模态和多智能体系统与理性的结合综述研究
  • 2天长沙旅游规划
  • MFC 调用海康相机进行软触发
  • 【ROS】将Qt的Pro工程转换到ROS2的colcon
  • 【springcloud学习(dalston.sr1)】使用Feign实现接口调用(八)
  • spark小任务
  • AI产品上市前的“安全通行证“
  • 高防ip支持哪些网络协议
  • HDD 安全擦除:何时以及如何在 Windows PC 上安全擦除硬盘
  • vue3:十三、分类管理-表格--slot插槽详细说明---表格内拼接字段、tag标签
  • 微信小程序学习之搜索框
  • 【工具变量】各省市场化指数-杨兴权版共三个方法(1997-2023年)
  • C++类和对象之相关特性
  • 从能源装备向应急装备蓝海拓展,川润股份发布智能综合防灾应急仓
  • 习近平会见哥伦比亚总统佩特罗
  • 男子退机票被收90%的手续费,律师:虽然合规,但显失公平
  • 季子文化与江南文化的根脉探寻与融合
  • 行知读书会|换一个角度看见社会
  • 为证明我爸是我爸,我将奶奶告上法庭