当前位置: 首页 > news >正文

Flink SQL-Client Kafka connector

下载对应版本的 Kafka 连接器 JAR 文件(如 flink-sql-connector-kafka-.jar),并放置到 Flink 的 lib/ 目录下。

Source

CREATE TABLE kafka_source (
    user_id STRING,
    event_time TIMESTAMP(3),
    action STRING,
    -- 定义事件时间与 Watermark
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',                      -- 指定连接器类型
    'topic' = 'test',                    -- Kafka Topic 名称
    'properties.bootstrap.servers' = 'chb1:9092',  -- Kafka Broker 地址
    'properties.group.id' = 'flink-sql-group',  -- 消费者组 ID
    'scan.startup.mode' = 'earliest-offset',    -- 初始消费位点(可选:latest-offset)
    'format' = 'json'                           -- 数据格式(JSON、Avro 等)
);

Sink


drop table kafka_sink;
CREATE TABLE kafka_sink (
    user_id STRING,
    window_start TIMESTAMP(3),
    action_count BIGINT
) WITH (
    'connector' = 'kafka',                      -- 连接器类型
    'topic' = 'output_topic',                   -- 目标 Topic
	'properties.group.id' = 'ksink-01',
    'properties.bootstrap.servers' = 'chb1:9092',
    'properties.auto.offset.reset' = 'earliest',
    'format' = 'json',                          -- 数据格式
    'sink.partitioner' = 'round-robin',         -- 分区策略(round-robin、key-hash)
	-- 事务
	'properties.transaction.timeout.ms'='300000',
    'sink.transactional-id-prefix' = 'ksink-tid-',	
    'sink.delivery-guarantee' = 'exactly-once'  -- 投递语义(需开启 Checkpoint)
	
);

写入到Sink

-- 按窗口统计用户行为次数
INSERT INTO kafka_sink
SELECT 
    user_id,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    COUNT(action) AS action_count
FROM kafka_source
GROUP BY 
    user_id,
    TUMBLE(event_time, INTERVAL '1' MINUTE);

问题

1、 The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
在 Flink SQL 中配置 Kafka 的 transaction.timeout.ms 时,需通过 properties. 前缀传递 Kafka Producer 的专属参数。
sink 端添加'properties.transaction.timeout.ms'='300000',

2、查询kafka_sink报错: Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [output_topic-0]
Flink SQL可能未正确设置auto.offset.reset参数,导致无法找到偏移量。您需要在Flink SQL的WITH子句中明确指定auto.offset.reset的值

 'properties.auto.offset.reset' = 'earliest',
http://www.dtcms.com/a/109945.html

相关文章:

  • Uni-app 项目 PDF 批注插件库在线版 API 示例教程
  • Ceph异地数据同步之-RBD异地同步复制(上)
  • 每日一题(小白)ASCLL娱乐篇5
  • ARM架构+CODESYS:解锁嵌入式边缘计算的实时控制新范式
  • MIT6.828 Lab3-2 Print a page table (easy)
  • 大数据学习(98)-数据治理
  • 预测分析(二):基于机器学习的数值预测
  • 【大模型基础_毛玉仁】6.3 知识检索
  • API接口调用
  • 通信算法之256: 无人机Remote ID(远程识别)
  • adc推荐,单通道,双极性采集
  • 最近常用 python 记录
  • 环境数据综合分析系统
  • 贤小二c#版Yolov5 yolov8 yolov10 yolov11自动标注工具 + 免python环境 GPU一键训练包
  • 贴片加工SMT厂核心工艺解析
  • 码界奇缘 Java 觉醒 第二章 变量迷城
  • 计算机网络-TCP的重传机制
  • 清晰易懂的 Flutter 开发环境搭建教程
  • java短连接,长连接
  • Linux命令-uniq
  • RAGFlow部署与使用介绍-深度文档理解和检索增强生成
  • 本地部署 Firecrawl 爬虫让 AI 知识库更丰满
  • Java创建对象和spring创建对象的过程和区别
  • AI赋能数据库管理“最后一公里”,融合架构重塑数据库承载成本效能——zCloud 6.7与zData X 3.3正式发布
  • MonkeyDev 如何创建一个root级级别的app,并执行root命令获取iphone设备序列号serialNumber(ios15.8)
  • 航电系统之承重与避障技术
  • “二分查找 + (必要时)前缀和” -- 处理 ’有序数组‘ 的区间问题汇总
  • 信息学奥赛一本通 1524:旅游航道
  • 胶铁一体化产品介绍
  • 什么是 SAML身份验证