Flink SQL 与 Kafka 整合详细教程
Flink SQL 与 Kafka 整合详细教程
📚 全面讲解 Apache Flink SQL 与 Kafka 的整合使用,包含完整的实战案例
目录
- 1. 整合概述
- 2. 环境准备与依赖配置
- 3. Kafka 连接器基础
- 4. 数据格式支持详解
- 5. 完整实战案例
- 6. 高级特性
- 7. 性能优化
- 8. 常见问题
1. 整合概述
1.1 技术架构
数据源 → Kafka Topic → Flink SQL 处理 → Kafka Topic → 数据应用
核心优势:
- 实时处理:毫秒级延迟
- SQL 语法:降低开发门槛
- 流批一体:统一处理引擎
- 高可靠:Exactly-Once 语义
1.2 适用场景
- 实时数据统计和聚合
- 流式 ETL 数据处理
- 实时数据清洗和过滤
- 双流 JOIN 关联分析
- 实时监控告警
2. 环境准备与依赖配置
2.1 版本要求
组件 | 版本 |
---|---|
Flink | 1.17+ |
Kafka | 2.8+ / 3.x |
Java | 11 / 17 |
2.2 Maven 依赖
<properties><flink.version>1.17.1</flink.version>
</properties><dependencies><!-- Flink Table API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- Flink Table Planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- Kafka 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- JSON 格式 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency>
</dependencies>
2.3 Docker 快速启动 Kafka
# docker-compose.yml
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.4.0environment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:7.4.0depends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
启动命令:
docker-compose up -d
3. Kafka 连接器基础
3.1 创建 Kafka 源表
CREATE TABLE order_source (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-consumer-group','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
3.2 核心参数说明
参数 | 说明 | 示例值 |
---|---|---|
connector | 连接器类型 | kafka |
topic | Topic 名称 | orders 或 topic1;topic2 |
properties.bootstrap.servers | Kafka 地址 | localhost:9092 |
properties.group.id | 消费组 ID | my-group |
scan.startup.mode | 启动模式 | earliest-offset / latest-offset |
format | 数据格式 | json / csv / avro |
3.3 启动模式详解
earliest-offset
从 Topic 最早位置开始消费
'scan.startup.mode' = 'earliest-offset'
latest-offset
只消费最新消息
'scan.startup.mode' = 'latest-offset'
group-offsets
从已提交的 offset 继续
'scan.startup.mode' = 'group-offsets'
timestamp
从指定时间戳开始
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1640966400000'
specific-offsets
从指定分区偏移量开始
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:100;partition:1,offset:200'
3.4 创建 Kafka 结果表
CREATE TABLE order_result (user_id BIGINT,total_amount DECIMAL(10, 2),order_count BIGINT,window_start TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'order-statistics','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);
4. 数据格式支持详解
4.1 JSON 格式(推荐)
Kafka 消息示例:
{"order_id": "ORD001","user_id": 10001,"product_id": "PROD-123","amount": 299.99,"order_time": "2024-01-15 10:30:00","user_info": {"name": "张三","city": "北京"}
}
建表语句:
CREATE TABLE orders_json (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time STRING,user_info ROW<name STRING, city STRING>, -- 嵌套对象event_time AS TO_TIMESTAMP(order_time, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true'
);
4.2 CSV 格式
Kafka 消息示例:
ORD001,10001,PROD-123,299.99,2024-01-15 10:30:00
ORD002,10002,PROD-456,199.99,2024-01-15 10:31:00
建表语句:
CREATE TABLE orders_csv (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time STRING
) WITH ('connector' = 'kafka','topic' = 'orders-csv','properties.bootstrap.servers' = 'localhost:9092','format' = 'csv','csv.field-delimiter' = ',','csv.ignore-parse-errors' = 'true'
);
4.3 Avro 格式
CREATE TABLE orders_avro (order_id STRING,user_id BIGINT,amount DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'orders-avro','properties.bootstrap.servers' = 'localhost:9092','format' = 'avro'
);
5. 完整实战案例
案例一:订单实时统计
业务需求
实时统计每个用户在 5 分钟窗口内的订单总额和订单数量。
第 1 步:创建 Kafka Topic
# 创建输入 Topic
kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic order-input \--partitions 3 \--replication-factor 1# 创建输出 Topic
kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic order-statistics \--partitions 3 \--replication-factor 1
第 2 步:编写数据生产者
# kafka_producer.py
from kafka import KafkaProducer
import json
import random
import time
from datetime import datetimeproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8')
)user_ids = [1001, 1002, 1003, 1004, 1005]
products = ['手机', '电脑', '耳机', '键盘', '鼠标']print("开始生产订单数据...")
try:for i in range(100):order = {'order_id': f'ORDER-{i+1:04d}','user_id': random.choice(user_ids),'product_name': random.choice(products),'amount': round(random.uniform(50.0, 3000.0), 2),'order_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'status': 'paid'}producer.send('order-input', value=order)print(f"✓ 发送订单 {order['order_id']}: 用户{order['user_id']}, 金额{order['amount']}")time.sleep(0.5) # 每0.5秒发送一条except KeyboardInterrupt:print("\n停止生产数据")
finally:producer.flush()producer.close()print("生产者已关闭")
运行生产者:
python kafka_producer.py
第 3 步:Flink SQL 作业
// OrderStatisticsJob.java
package com.example.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class OrderStatisticsJob {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 2. 创建 Kafka 源表tableEnv.executeSql("CREATE TABLE order_source (" +" order_id STRING," +" user_id BIGINT," +" product_name STRING," +" amount DECIMAL(10, 2)," +" order_time STRING," +" status STRING," +" event_time AS TO_TIMESTAMP(order_time, 'yyyy-MM-dd HH:mm:ss')," +" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'order-input'," +" 'properties.bootstrap.servers' = 'localhost:9092'," +" 'properties.group.id' = 'flink-order-consumer'," +" 'scan.startup.mode' = 'latest-offset'," +" 'format' = 'json'" +")");// 3. 创建 Kafka 结果表tableEnv.executeSql("CREATE TABLE order_statistics (" +" user_id BIGINT," +" total_amount DECIMAL(10, 2)," +" order_count BIGINT," +" avg_amount DECIMAL(10, 2)," +" max_amount DECIMAL(10, 2)," +" window_start TIMESTAMP(3)," +" window_end TIMESTAMP(3)" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'order-statistics'," +" 'properties.bootstrap.servers' = 'localhost:9092'," +" 'format' = 'json'" +")");// 4. 执行统计查询tableEnv.executeSql("INSERT INTO order_statistics " +"SELECT " +" user_id," +" SUM(amount) AS total_amount," +" COUNT(*) AS order_count," +" AVG(amount) AS avg_amount," +" MAX(amount) AS max_amount," +" TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start," +" TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end " +"FROM order_source " +"WHERE status = 'paid' " +"GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE)");System.out.println("✓ Flink SQL 作业已启动");}
}
编译运行:
mvn clean package
flink run target/flink-kafka-demo-1.0.jar
第 4 步:消费统计结果
# kafka_consumer.py
from kafka import KafkaConsumer
import jsonconsumer = KafkaConsumer('order-statistics',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest',value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)print("=" * 80)
print("开始消费订单统计结果...")
print("=" * 80)for message in consumer:data = message.valueprint(f"\n【用户 {data['user_id']} 的订单统计】")print(f" 订单总额: ¥{data['total_amount']:.2f}")print(f" 订单数量: {data['order_count']} 单")print(f" 平均金额: ¥{data['avg_amount']:.2f}")print(f" 最大订单: ¥{data['max_amount']:.2f}")print(f" 统计窗口: {data['window_start']} ~ {data['window_end']}")print("-" * 80)
运行消费者:
python kafka_consumer.py
预期输出:
================================================================================
开始消费订单统计结果...
================================================================================【用户 1001 的订单统计】订单总额: ¥2849.50订单数量: 4 单平均金额: ¥712.38最大订单: ¥1299.99统计窗口: 2024-01-15 10:00:00 ~ 2024-01-15 10:05:00
--------------------------------------------------------------------------------【用户 1002 的订单统计】订单总额: ¥1699.80订单数量: 3 单平均金额: ¥566.60最大订单: ¥899.00统计窗口: 2024-01-15 10:00:00 ~ 2024-01-15 10:05:00
--------------------------------------------------------------------------------
案例二:双流 JOIN 关联
业务需求
订单流和支付流进行关联,找出已支付的订单。
SQL 实现
-- 订单流
CREATE TABLE orders (order_id STRING,user_id BIGINT,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 支付流
CREATE TABLE payments (payment_id STRING,order_id STRING,pay_amount DECIMAL(10, 2),payment_time TIMESTAMP(3),payment_method STRING,WATERMARK FOR payment_time AS payment_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'payments','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 关联结果表
CREATE TABLE paid_orders (order_id STRING,user_id BIGINT,product_id STRING,order_amount DECIMAL(10, 2),pay_amount DECIMAL(10, 2),payment_method STRING,order_time TIMESTAMP(3),payment_time TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'paid-orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- JOIN 查询(支付时间在订单后15分钟内)
INSERT INTO paid_orders
SELECT o.order_id,o.user_id,o.product_id,o.amount AS order_amount,p.pay_amount,p.payment_method,o.order_time,p.payment_time
FROM orders o
INNER JOIN payments p
ON o.order_id = p.order_id
AND p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL '15' MINUTE;
案例三:实时 Top N 统计
业务需求
统计每小时销售额 Top 5 的商品。
-- 订单表
CREATE TABLE product_orders (order_id STRING,product_id STRING,product_name STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- Top5 结果表
CREATE TABLE top5_products (window_start TIMESTAMP(3),product_id STRING,product_name STRING,total_sales DECIMAL(10, 2),order_count BIGINT,ranking BIGINT
) WITH ('connector' = 'kafka','topic' = 'top5-products','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- TopN 查询
INSERT INTO top5_products
SELECT window_start,product_id,product_name,total_sales,order_count,ranking
FROM (SELECT window_start,product_id,product_name,total_sales,order_count,ROW_NUMBER() OVER (PARTITION BY window_start ORDER BY total_sales DESC) AS rankingFROM (SELECT TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,product_id,ANY_VALUE(product_name) AS product_name,SUM(amount) AS total_sales,COUNT(*) AS order_countFROM product_ordersGROUP BY product_id,TUMBLE(order_time, INTERVAL '1' HOUR))
)
WHERE ranking <= 5;
6. 高级特性
6.1 时间窗口
滚动窗口 (Tumble)
不重叠的固定大小窗口
TUMBLE(event_time, INTERVAL '5' MINUTE)
滑动窗口 (Hop)
可重叠的窗口
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
-- 窗口大小10分钟,每5分钟滑动一次
会话窗口 (Session)
基于活动间隔的动态窗口
SESSION(event_time, INTERVAL '30' MINUTE)
-- 30分钟无活动后会话结束
6.2 Watermark 策略
-- 固定延迟
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND-- 递增时间戳(无乱序)
WATERMARK FOR event_time AS event_time
6.3 Upsert Kafka(支持更新)
CREATE TABLE user_latest_status (user_id BIGINT,status STRING,update_time TIMESTAMP(3),PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'user-status','properties.bootstrap.servers' = 'localhost:9092','key.format' = 'json','value.format' = 'json'
);
7. 性能优化
7.1 并行度设置
// 全局并行度
env.setParallelism(4);// SQL 配置
tableEnv.getConfig().getConfiguration().setInteger("table.exec.resource.default-parallelism", 4);
7.2 MiniBatch 聚合
Configuration config = tableEnv.getConfig().getConfiguration();
config.setString("table.exec.mini-batch.enabled", "true");
config.setString("table.exec.mini-batch.allow-latency", "5s");
config.setString("table.exec.mini-batch.size", "5000");
7.3 状态 TTL
config.setString("table.exec.state.ttl", "1 h");
7.4 Kafka 参数优化
WITH ('properties.fetch.min.bytes' = '1048576', -- 1MB'properties.fetch.max.wait.ms' = '500', -- 500ms'properties.max.partition.fetch.bytes' = '2097152' -- 2MB
)
8. 常见问题
Q1: 数据延迟过高怎么办?
解决方案:
- 增加并行度
- 调整 Watermark 延迟
- 优化 Kafka 分区数
- 使用 MiniBatch 聚合
Q2: 如何处理乱序数据?
-- 设置合理的 Watermark 延迟
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
Q3: 如何保证 Exactly-Once?
// 开启 Checkpoint
env.enableCheckpointing(60000); // 60秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// Kafka 开启事务
config.setString("sink.delivery-guarantee", "exactly-once");
config.setString("sink.transactional-id-prefix", "flink-kafka-");
Q4: 如何监控 Flink 作业?
访问 Flink Web UI: http://localhost:8081
- 查看作业运行状态
- 监控吞吐量和延迟
- 查看 Checkpoint 信息
- 分析反压情况
总结
本教程全面介绍了 Flink SQL 与 Kafka 的整合使用:
✅ 基础知识:连接器配置、数据格式、启动模式
✅ 实战案例:订单统计、双流 JOIN、TopN 分析
✅ 高级特性:时间窗口、Watermark、Upsert 模式
✅ 性能优化:并行度、MiniBatch、状态管理
通过这些示例,你可以快速上手 Flink SQL + Kafka 的流式处理开发!
参考资源
- Apache Flink 官方文档
- Flink Kafka Connector
- Flink SQL 函数参考