当前位置: 首页 > news >正文

flink 1.20 物化表(Materialized Tables)

Flink 1.20 - 物化表(Materialized Tables)

特性概述

Flink 1.20 引入了物化表(Materialized Tables)的概念,旨在简化批处理和流处理的数据管道,并提供一致的开发体验。物化表通过查询和数据新鲜度规范定义,Flink 引擎会自动维护查询结果,确保数据的实时性。

详细说明

什么是物化表

物化表(Materialized Tables)是:

  • 一个由查询定义的表
  • 自动维护查询结果
  • 支持定期刷新数据
  • 提供数据新鲜度保证

核心特性

  1. 自动维护:Flink 自动执行查询并更新结果
  2. 数据新鲜度:可以指定数据刷新频率
  3. 流批统一:支持流处理和批处理模式
  4. 简化开发:无需手动管理数据管道

语法规则

CREATE MATERIALIZED TABLE table_name
[WITH (table_options)]
AS
SELECT ...
[REFRESH INTERVAL 'interval'];

示例代码

示例 1:基本物化表创建

-- 创建源表
CREATE TABLE orders_source (order_id INT,user_id INT,product_id INT,amount DECIMAL(10, 2),order_time TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 创建物化表(每 3 分钟刷新一次)
CREATE MATERIALIZED TABLE user_order_stats
WITH ('connector' = 'kafka','topic' = 'user_stats','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '3 MINUTES'
)
AS
SELECT user_id,COUNT(*) as order_count,SUM(amount) as total_amount,AVG(amount) as avg_amount,MAX(order_time) as last_order_time
FROM orders_source
GROUP BY user_id;

示例 2:物化表与窗口聚合

-- 创建物化表,使用窗口聚合
CREATE MATERIALIZED TABLE hourly_order_stats
WITH ('connector' = 'kafka','topic' = 'hourly_stats','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '1 HOUR'
)
AS
SELECT window_start,window_end,product_id,COUNT(*) as order_count,SUM(amount) as total_amount
FROM TABLE(TUMBLE(TABLE orders_source,DESCRIPTOR(order_time),INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, product_id;

示例 3:物化表与 JOIN

-- 创建用户表
CREATE TABLE users (user_id INT,user_name STRING,city STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/test','table-name' = 'users','username' = 'root','password' = 'password'
);-- 创建物化表,包含 JOIN
CREATE MATERIALIZED TABLE user_order_details
WITH ('connector' = 'kafka','topic' = 'user_order_details','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '5 MINUTES'
)
AS
SELECT u.user_id,u.user_name,u.city,COUNT(o.order_id) as order_count,SUM(o.amount) as total_amount
FROM users u
LEFT JOIN orders_source o ON u.user_id = o.user_id
GROUP BY u.user_id, u.user_name, u.city;

示例 4:物化表写入文件系统

-- 创建物化表,写入文件系统
CREATE MATERIALIZED TABLE daily_order_summary
WITH ('connector' = 'filesystem','path' = 'file:///path/to/daily_summary','format' = 'parquet','refresh' = '1 DAY'
)
AS
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as order_date,product_id,COUNT(*) as order_count,SUM(amount) as total_amount,AVG(amount) as avg_amount
FROM orders_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd'), product_id;

示例 5:物化表与过滤条件

-- 创建物化表,包含过滤条件
CREATE MATERIALIZED TABLE high_value_orders
WITH ('connector' = 'kafka','topic' = 'high_value_orders','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '1 MINUTE'
)
AS
SELECT order_id,user_id,amount,order_time
FROM orders_source
WHERE amount > 1000;

示例 6:物化表与复杂聚合

-- 创建物化表,包含复杂聚合
CREATE MATERIALIZED TABLE product_statistics
WITH ('connector' = 'kafka','topic' = 'product_stats','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '10 MINUTES'
)
AS
SELECT product_id,COUNT(*) as order_count,COUNT(DISTINCT user_id) as unique_customers,SUM(amount) as total_revenue,AVG(amount) as avg_order_value,MIN(amount) as min_order_value,MAX(amount) as max_order_value,STDDEV(amount) as stddev_order_value
FROM orders_source
GROUP BY product_id;

示例 7:物化表与时间函数

-- 创建物化表,使用时间函数
CREATE MATERIALIZED TABLE time_based_stats
WITH ('connector' = 'kafka','topic' = 'time_stats','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '1 HOUR'
)
AS
SELECT EXTRACT(HOUR FROM order_time) as hour_of_day,EXTRACT(DAY_OF_WEEK FROM order_time) as day_of_week,product_id,COUNT(*) as order_count,SUM(amount) as total_amount
FROM orders_source
GROUP BY EXTRACT(HOUR FROM order_time),EXTRACT(DAY_OF_WEEK FROM order_time),product_id;

示例 8:物化表与子查询

-- 创建物化表,使用子查询
CREATE MATERIALIZED TABLE top_customers
WITH ('connector' = 'kafka','topic' = 'top_customers','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','refresh' = '1 HOUR'
)
AS
SELECT user_id,total_amount,order_count
FROM (SELECT user_id,SUM(amount) as total_amount,COUNT(*) as order_countFROM orders_sourceGROUP BY user_id
) t
WHERE total_amount > 10000
ORDER BY total_amount DESC
LIMIT 100;

示例 9:修改物化表

-- 修改物化表的查询
ALTER MATERIALIZED TABLE user_order_stats
AS
SELECT user_id,COUNT(*) as order_count,SUM(amount) as total_amount,AVG(amount) as avg_amount,-- 新增字段MAX(order_time) as last_order_time,MIN(order_time) as first_order_time
FROM orders_source
GROUP BY user_id;-- 修改物化表的刷新间隔
ALTER MATERIALIZED TABLE user_order_stats
SET ('refresh' = '5 MINUTES');

示例 10:删除物化表

-- 删除物化表
DROP MATERIALIZED TABLE user_order_stats;

Java API 示例

示例 1:使用 Table API 创建物化表

import org.apache.flink.table.api.*;public class MaterializedTableExample {public static void main(String[] args) {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());// 创建源表tEnv.executeSql("CREATE TABLE orders_source (" +"  order_id INT," +"  user_id INT," +"  amount DECIMAL(10, 2)," +"  order_time TIMESTAMP(3)" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'orders'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'format' = 'json'" +")");// 创建物化表tEnv.executeSql("CREATE MATERIALIZED TABLE user_order_stats " +"WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_stats'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'format' = 'json'," +"  'refresh' = '3 MINUTES'" +") " +"AS " +"SELECT " +"  user_id, " +"  COUNT(*) as order_count, " +"  SUM(amount) as total_amount " +"FROM orders_source " +"GROUP BY user_id");}
}

测试用例

测试类 1:物化表基本功能测试

import org.apache.flink.table.api.*;
import org.junit.Test;public class MaterializedTableTest {@Testpublic void testCreateMaterializedTable() {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());// 创建源表tEnv.executeSql("CREATE TABLE test_source (" +"  id INT," +"  value DECIMAL(10, 2)" +") WITH (" +"  'connector' = 'values'," +"  'data-id' = '1'" +")");// 创建物化表tEnv.executeSql("CREATE MATERIALIZED TABLE test_materialized " +"WITH (" +"  'connector' = 'print'," +"  'refresh' = '1 MINUTE'" +") " +"AS " +"SELECT id, SUM(value) as total " +"FROM test_source " +"GROUP BY id");}
}

注意事项

  1. 刷新间隔

    • 根据业务需求设置合适的刷新间隔
    • 过短的间隔可能导致性能问题
    • 过长的间隔可能导致数据不够新鲜
  2. 数据一致性

    • 物化表是查询结果的快照
    • 刷新时可能短暂不一致
    • 适合最终一致性场景
  3. 存储成本

    • 物化表需要存储查询结果
    • 考虑存储成本和数据保留策略
  4. 查询复杂度

    • 复杂查询可能影响刷新性能
    • 建议优化查询性能
  5. 连接器支持

    • 需要连接器支持写入操作
    • Kafka、文件系统等连接器支持良好

最佳实践

  1. 选择合适的刷新间隔:根据业务需求平衡数据新鲜度和性能
  2. 优化查询性能:确保物化表的查询能够高效执行
  3. 监控物化表:监控刷新状态和数据质量
  4. 合理使用:物化表适合聚合和汇总场景

相关 JEP 和 FLIP

参考资料

  • Apache Flink 1.20 Release Notes
  • Flink SQL Materialized Tables
http://www.dtcms.com/a/582275.html

相关文章:

  • html网站建设流程图只做一页的网站多少钱
  • 百度上能收到的企业名称网站怎么做wordpress theme api
  • asp.net网站开发案例教程做图片模板
  • 大庆门户网站做外贸接私单的网站
  • 企业项目级医院随访系统源码,患者随访管理系统,技术框架:Java+Spring boot,Vue,Ant-Design+MySQL5
  • 上蔡网站建设WordPress导出静态网页
  • 成都网站制作怎么样设计企业网站内容
  • 互联网做网站地推WordPress全局屏蔽谷歌
  • RV1126 NO.40:OPENCV图形计算面积、弧长API讲解
  • 威海网站建设哪家好哪有个人免费云服务器
  • 【Redis】02 基本数据类型
  • Rust内存问题检测
  • 娄底市网站建设课程资源网站开发
  • 写 CSDN 文章的体会
  • Vibe Coding - 免费使用claude code 、gpt-5、grok-code-fast-1进行氛围编程
  • 【NOI】C++一维数组之数组计数法
  • flash xml网站wordpress 表单数据
  • 事业单位建立网站wordpress后台慢
  • 轴控功能块常用调用方法
  • 常用的电气元件详细介绍-11.7(1.5hour)
  • 【ZeroRange WebRTC 】STUN 在 WebRTC 中的角色与工作原理(深入指南)
  • 网站备案后需要年检吗系统更新
  • 怎么做收费网站宣传片制作公司排行
  • [Linux]学习笔记系列 -- [kernel]completion
  • 如何创建一个自己的Docker镜像(Dockerfile)
  • 从一个问题深入解析C++字符串处理中的栈损坏
  • 成都市做网站的公司建设网站的心得
  • 爱下手机站建设学院实验网站的作用
  • afsim-2.9.0升级Qt5.15.2
  • 网站域名实名认证通知最新国际军事新闻