Flink SQL 将kafka topic的数据写到另外一个topic里面
-- 创建源表,使用 RAW 格式接收原始 JSON 数据
CREATE TABLE source_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'source_kafka-topic',
'properties.bootstrap.servers' = 'master01:9092',
'properties.group.id' = 'flink-kafka-group',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 创建目标表,同样使用 RAW 格式保持数据原样
CREATE TABLE sink_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink-kafka-topic',
'properties.bootstrap.servers' = 'master02:9092',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 执行复制操作
INSERT INTO sink_kafka
SELECT * FROM source_kafka;
- value.fields-include
定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。
- 参考文档:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/#%E6%B6%88%E6%81%AF%E9%94%AEkey%E4%B8%8E%E6%B6%88%E6%81%AF%E4%BD%93value%E7%9A%84%E6%A0%BC%E5%BC%8F
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/