当前位置: 首页 > news >正文

Flink SQL 与 Kafka 整合详细教程

Flink SQL 与 Kafka 整合详细教程

📚 全面讲解 Apache Flink SQL 与 Kafka 的整合使用,包含完整的实战案例

目录

  • 1. 整合概述
  • 2. 环境准备与依赖配置
  • 3. Kafka 连接器基础
  • 4. 数据格式支持详解
  • 5. 完整实战案例
  • 6. 高级特性
  • 7. 性能优化
  • 8. 常见问题

1. 整合概述

1.1 技术架构

数据源 → Kafka Topic → Flink SQL 处理 → Kafka Topic → 数据应用

核心优势:

  • 实时处理:毫秒级延迟
  • SQL 语法:降低开发门槛
  • 流批一体:统一处理引擎
  • 高可靠:Exactly-Once 语义

1.2 适用场景

  • 实时数据统计和聚合
  • 流式 ETL 数据处理
  • 实时数据清洗和过滤
  • 双流 JOIN 关联分析
  • 实时监控告警

2. 环境准备与依赖配置

2.1 版本要求

组件版本
Flink1.17+
Kafka2.8+ / 3.x
Java11 / 17

2.2 Maven 依赖

<properties><flink.version>1.17.1</flink.version>
</properties><dependencies><!-- Flink Table API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- Flink Table Planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- Kafka 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- JSON 格式 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency>
</dependencies>

2.3 Docker 快速启动 Kafka

# docker-compose.yml
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.4.0environment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:7.4.0depends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

启动命令:

docker-compose up -d

3. Kafka 连接器基础

3.1 创建 Kafka 源表

CREATE TABLE order_source (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-consumer-group','scan.startup.mode' = 'earliest-offset','format' = 'json'
);

3.2 核心参数说明

参数说明示例值
connector连接器类型kafka
topicTopic 名称orderstopic1;topic2
properties.bootstrap.serversKafka 地址localhost:9092
properties.group.id消费组 IDmy-group
scan.startup.mode启动模式earliest-offset / latest-offset
format数据格式json / csv / avro

3.3 启动模式详解

earliest-offset

从 Topic 最早位置开始消费

'scan.startup.mode' = 'earliest-offset'
latest-offset

只消费最新消息

'scan.startup.mode' = 'latest-offset'
group-offsets

从已提交的 offset 继续

'scan.startup.mode' = 'group-offsets'
timestamp

从指定时间戳开始

'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1640966400000'
specific-offsets

从指定分区偏移量开始

'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:100;partition:1,offset:200'

3.4 创建 Kafka 结果表

CREATE TABLE order_result (user_id BIGINT,total_amount DECIMAL(10, 2),order_count BIGINT,window_start TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'order-statistics','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);

4. 数据格式支持详解

4.1 JSON 格式(推荐)

Kafka 消息示例:

{"order_id": "ORD001","user_id": 10001,"product_id": "PROD-123","amount": 299.99,"order_time": "2024-01-15 10:30:00","user_info": {"name": "张三","city": "北京"}
}

建表语句:

CREATE TABLE orders_json (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time STRING,user_info ROW<name STRING, city STRING>,  -- 嵌套对象event_time AS TO_TIMESTAMP(order_time, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true'
);

4.2 CSV 格式

Kafka 消息示例:

ORD001,10001,PROD-123,299.99,2024-01-15 10:30:00
ORD002,10002,PROD-456,199.99,2024-01-15 10:31:00

建表语句:

CREATE TABLE orders_csv (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time STRING
) WITH ('connector' = 'kafka','topic' = 'orders-csv','properties.bootstrap.servers' = 'localhost:9092','format' = 'csv','csv.field-delimiter' = ',','csv.ignore-parse-errors' = 'true'
);

4.3 Avro 格式

CREATE TABLE orders_avro (order_id STRING,user_id BIGINT,amount DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'orders-avro','properties.bootstrap.servers' = 'localhost:9092','format' = 'avro'
);

5. 完整实战案例

案例一:订单实时统计

业务需求

实时统计每个用户在 5 分钟窗口内的订单总额和订单数量。

第 1 步:创建 Kafka Topic
# 创建输入 Topic
kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic order-input \--partitions 3 \--replication-factor 1# 创建输出 Topic
kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic order-statistics \--partitions 3 \--replication-factor 1
第 2 步:编写数据生产者
# kafka_producer.py
from kafka import KafkaProducer
import json
import random
import time
from datetime import datetimeproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8')
)user_ids = [1001, 1002, 1003, 1004, 1005]
products = ['手机', '电脑', '耳机', '键盘', '鼠标']print("开始生产订单数据...")
try:for i in range(100):order = {'order_id': f'ORDER-{i+1:04d}','user_id': random.choice(user_ids),'product_name': random.choice(products),'amount': round(random.uniform(50.0, 3000.0), 2),'order_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'status': 'paid'}producer.send('order-input', value=order)print(f"✓ 发送订单 {order['order_id']}: 用户{order['user_id']}, 金额{order['amount']}")time.sleep(0.5)  # 每0.5秒发送一条except KeyboardInterrupt:print("\n停止生产数据")
finally:producer.flush()producer.close()print("生产者已关闭")

运行生产者:

python kafka_producer.py
第 3 步:Flink SQL 作业
// OrderStatisticsJob.java
package com.example.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class OrderStatisticsJob {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 2. 创建 Kafka 源表tableEnv.executeSql("CREATE TABLE order_source (" +"  order_id STRING," +"  user_id BIGINT," +"  product_name STRING," +"  amount DECIMAL(10, 2)," +"  order_time STRING," +"  status STRING," +"  event_time AS TO_TIMESTAMP(order_time, 'yyyy-MM-dd HH:mm:ss')," +"  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'order-input'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'properties.group.id' = 'flink-order-consumer'," +"  'scan.startup.mode' = 'latest-offset'," +"  'format' = 'json'" +")");// 3. 创建 Kafka 结果表tableEnv.executeSql("CREATE TABLE order_statistics (" +"  user_id BIGINT," +"  total_amount DECIMAL(10, 2)," +"  order_count BIGINT," +"  avg_amount DECIMAL(10, 2)," +"  max_amount DECIMAL(10, 2)," +"  window_start TIMESTAMP(3)," +"  window_end TIMESTAMP(3)" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'order-statistics'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'format' = 'json'" +")");// 4. 执行统计查询tableEnv.executeSql("INSERT INTO order_statistics " +"SELECT " +"  user_id," +"  SUM(amount) AS total_amount," +"  COUNT(*) AS order_count," +"  AVG(amount) AS avg_amount," +"  MAX(amount) AS max_amount," +"  TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start," +"  TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end " +"FROM order_source " +"WHERE status = 'paid' " +"GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE)");System.out.println("✓ Flink SQL 作业已启动");}
}

编译运行:

mvn clean package
flink run target/flink-kafka-demo-1.0.jar
第 4 步:消费统计结果
# kafka_consumer.py
from kafka import KafkaConsumer
import jsonconsumer = KafkaConsumer('order-statistics',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest',value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)print("=" * 80)
print("开始消费订单统计结果...")
print("=" * 80)for message in consumer:data = message.valueprint(f"\n【用户 {data['user_id']} 的订单统计】")print(f"  订单总额: ¥{data['total_amount']:.2f}")print(f"  订单数量: {data['order_count']} 单")print(f"  平均金额: ¥{data['avg_amount']:.2f}")print(f"  最大订单: ¥{data['max_amount']:.2f}")print(f"  统计窗口: {data['window_start']} ~ {data['window_end']}")print("-" * 80)

运行消费者:

python kafka_consumer.py

预期输出:

================================================================================
开始消费订单统计结果...
================================================================================【用户 1001 的订单统计】订单总额: ¥2849.50订单数量: 4 单平均金额: ¥712.38最大订单: ¥1299.99统计窗口: 2024-01-15 10:00:00 ~ 2024-01-15 10:05:00
--------------------------------------------------------------------------------【用户 1002 的订单统计】订单总额: ¥1699.80订单数量: 3 单平均金额: ¥566.60最大订单: ¥899.00统计窗口: 2024-01-15 10:00:00 ~ 2024-01-15 10:05:00
--------------------------------------------------------------------------------

案例二:双流 JOIN 关联

业务需求

订单流和支付流进行关联,找出已支付的订单。

SQL 实现
-- 订单流
CREATE TABLE orders (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 支付流
CREATE TABLE payments (payment_id STRING,order_id STRING,pay_amount DECIMAL(10, 2),payment_time TIMESTAMP(3),payment_method STRING,WATERMARK FOR payment_time AS payment_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'payments','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 关联结果表
CREATE TABLE paid_orders (order_id STRING,user_id BIGINT,product_id STRING,order_amount DECIMAL(10, 2),pay_amount DECIMAL(10, 2),payment_method STRING,order_time TIMESTAMP(3),payment_time TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'paid-orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- JOIN 查询(支付时间在订单后15分钟内)
INSERT INTO paid_orders
SELECT o.order_id,o.user_id,o.product_id,o.amount AS order_amount,p.pay_amount,p.payment_method,o.order_time,p.payment_time
FROM orders o
INNER JOIN payments p
ON o.order_id = p.order_id
AND p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL '15' MINUTE;

案例三:实时 Top N 统计

业务需求

统计每小时销售额 Top 5 的商品。

-- 订单表
CREATE TABLE product_orders (order_id STRING,product_id STRING,product_name STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- Top5 结果表
CREATE TABLE top5_products (window_start TIMESTAMP(3),product_id STRING,product_name STRING,total_sales DECIMAL(10, 2),order_count BIGINT,ranking BIGINT
) WITH ('connector' = 'kafka','topic' = 'top5-products','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- TopN 查询
INSERT INTO top5_products
SELECT window_start,product_id,product_name,total_sales,order_count,ranking
FROM (SELECT window_start,product_id,product_name,total_sales,order_count,ROW_NUMBER() OVER (PARTITION BY window_start ORDER BY total_sales DESC) AS rankingFROM (SELECT TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,product_id,ANY_VALUE(product_name) AS product_name,SUM(amount) AS total_sales,COUNT(*) AS order_countFROM product_ordersGROUP BY product_id,TUMBLE(order_time, INTERVAL '1' HOUR))
)
WHERE ranking <= 5;

6. 高级特性

6.1 时间窗口

滚动窗口 (Tumble)

不重叠的固定大小窗口

TUMBLE(event_time, INTERVAL '5' MINUTE)
滑动窗口 (Hop)

可重叠的窗口

HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
-- 窗口大小10分钟,每5分钟滑动一次
会话窗口 (Session)

基于活动间隔的动态窗口

SESSION(event_time, INTERVAL '30' MINUTE)
-- 30分钟无活动后会话结束

6.2 Watermark 策略

-- 固定延迟
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND-- 递增时间戳(无乱序)
WATERMARK FOR event_time AS event_time

6.3 Upsert Kafka(支持更新)

CREATE TABLE user_latest_status (user_id BIGINT,status STRING,update_time TIMESTAMP(3),PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'user-status','properties.bootstrap.servers' = 'localhost:9092','key.format' = 'json','value.format' = 'json'
);

7. 性能优化

7.1 并行度设置

// 全局并行度
env.setParallelism(4);// SQL 配置
tableEnv.getConfig().getConfiguration().setInteger("table.exec.resource.default-parallelism", 4);

7.2 MiniBatch 聚合

Configuration config = tableEnv.getConfig().getConfiguration();
config.setString("table.exec.mini-batch.enabled", "true");
config.setString("table.exec.mini-batch.allow-latency", "5s");
config.setString("table.exec.mini-batch.size", "5000");

7.3 状态 TTL

config.setString("table.exec.state.ttl", "1 h");

7.4 Kafka 参数优化

WITH ('properties.fetch.min.bytes' = '1048576',        -- 1MB'properties.fetch.max.wait.ms' = '500',          -- 500ms'properties.max.partition.fetch.bytes' = '2097152' -- 2MB
)

8. 常见问题

Q1: 数据延迟过高怎么办?

解决方案:

  • 增加并行度
  • 调整 Watermark 延迟
  • 优化 Kafka 分区数
  • 使用 MiniBatch 聚合

Q2: 如何处理乱序数据?

-- 设置合理的 Watermark 延迟
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

Q3: 如何保证 Exactly-Once?

// 开启 Checkpoint
env.enableCheckpointing(60000); // 60秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// Kafka 开启事务
config.setString("sink.delivery-guarantee", "exactly-once");
config.setString("sink.transactional-id-prefix", "flink-kafka-");

Q4: 如何监控 Flink 作业?

访问 Flink Web UI: http://localhost:8081

  • 查看作业运行状态
  • 监控吞吐量和延迟
  • 查看 Checkpoint 信息
  • 分析反压情况

总结

本教程全面介绍了 Flink SQL 与 Kafka 的整合使用:

基础知识:连接器配置、数据格式、启动模式
实战案例:订单统计、双流 JOIN、TopN 分析
高级特性:时间窗口、Watermark、Upsert 模式
性能优化:并行度、MiniBatch、状态管理

通过这些示例,你可以快速上手 Flink SQL + Kafka 的流式处理开发!


参考资源

  • Apache Flink 官方文档
  • Flink Kafka Connector
  • Flink SQL 函数参考
http://www.dtcms.com/a/503160.html

相关文章:

  • 机票售票网站开发wordpress前台显示友链
  • 电子电气架构 --- 汽车软件架构未来的发展方向
  • JavaScript 表单验证
  • android - JPG图片转换HDR图片,heic格式
  • 【C语言】文件操作(附源码与图片)
  • Vue-Router4使用详解(结合Vue3)
  • 免费做做网站网站建设优化方法 s
  • 图书馆理论与建设网站北京工程建设监理协会网站
  • postman 调用接口设置全局变量
  • Lua协程coroutine库用法
  • 若依字典原理---后端
  • SpringBoot 接入 Prometheus + Grafana
  • 自己有网站怎么做优化实时热榜
  • 基于SpringBoot的“基于数据安全的旅游民宿租赁系统”的设计与实现(源码+数据库+文档+PPT)
  • 海宁公司做网站wordpress编辑器存内容
  • 旅游管理系统|基于SpringBoot和Vue的旅游管理系统(源码+数据库+文档)
  • DAQ系统混合方案与设计模式详解
  • 【Linux系统编程】3. Linux基本指令(下)
  • sql练习-5
  • 做网站审批号必须要wix做网站的建议
  • YAML的Value表示
  • 如何在Gitee和GitHub上部署SSH公钥
  • 成都高新网站建设美图秀秀在线制作
  • Flutter Isolate的使用
  • 从 JDK 8 到 JDK 23:HotSpot 垃圾回收器全景演进与深度剖析
  • 深圳网站建设jm3q网站是公司域名是个人可以吗
  • 【深度学习新浪潮】多模态大模型在图像理解领域的技术进展与实践
  • wordpress 分类菜单高亮外贸seo软件
  • 百度面试题解析:Zookeeper、ArrayList、生产者消费者模型及多线程(二)
  • excel绘制折线图