当前位置: 首页 > news >正文

Flink TableAPI 按分钟统计数据量

一、环境版本

环境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

二、MySQL建表脚本

create table user_log
(id      int auto_increment comment '主键'primary key,uid     int    not null comment '用户id',event   int    not null comment '用户行为',logtime bigint null comment '日志时间'
)comment '用户日志表,作为验证数据源';

三、用户日志类

新建maven项目

用以定义Kafka和MySQL中Schema

/*** 用户日志类*/
@Data
public class UserLog {//用户uidprivate int uid;//用户行为private int event;//日志时间private Date logtime;
}

四、用户数据生成器

/*** 用户数据生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定义数据生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 实现类new GeneratorFunction<Long, UserLog>(){// 定义随机数数据生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//随机生成用户uiduserLog.setUid(generator.nextInt(1, 100000));//随机生成用户行为userLog.setEvent(generator.nextInt(1, 2));//随机生成用户数据时间userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定输出数据的总行数60 * 60 * 10,// 指定每秒发射的记录数RateLimiterStrategy.perSecond(10),// 指定返回值类型, 将Java的StockPrice封装成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//输出生成数据
//        dataGeneratorSourceStream.print();//kafka数据写入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL数据写入,用以数据验证SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("你的用户名").withPassword("你的密码").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、TableAPI 10秒钟内用户的访问量

/*** 10秒钟内用户的访问量*/
public class UserLogCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);//1.定义table的schemafinal Schema schema = Schema.newBuilder().column("uid", DataTypes.INT()).column("event", DataTypes.INT()).column("logtime", DataTypes.BIGINT())//将logtime转换为flink使用的timsstamp格式.columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(logtime, 3)")//定义水位线.watermark("rowtime", "rowtime - INTERVAL '5' SECOND").build();//2.创建Kafka source tabletableEnv.createTable("user_log", TableDescriptor.forConnector("kafka").schema(schema).format("json")
//                .option("json.timestamp-format.standard", "ISO-8601").option("json.ignore-parse-errors", "true").option("topic", "userLog").option("properties.bootstrap.servers", "hadoop01:9092").option("scan.startup.mode", "latest-offset").build());//3.创建一个滚动窗口表Table pvTable = tableEnv.from("user_log")//定义一个10秒钟的滚动窗口.window(Tumble.over(lit(10).seconds()).on($("rowtime")).as("w")).groupBy($("w")).select($("w").start().as("w_start"),$("w").end().as("w_end"),//$("uid").count().distinct().as("uv")),$("uid").count().as("pv"));pvTable.execute().print();}
}

六、数据验证

  1. 启动 UserLogGenerator
  2. 启动 UserLogCount
+----+-------------------------+-------------------------+----------------------+
| op |                 w_start |                   w_end |                   pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:11:50.000 | 2025-08-11 15:12:00.000 |                   10 |
| +I | 2025-08-11 15:12:00.000 | 2025-08-11 15:12:10.000 |                   95 |
| +I | 2025-08-11 15:12:10.000 | 2025-08-11 15:12:20.000 |                  104 |
| +I | 2025-08-11 15:12:20.000 | 2025-08-11 15:12:30.000 |                  104 |
| +I | 2025-08-11 15:12:30.000 | 2025-08-11 15:12:40.000 |                   94 |
| +I | 2025-08-11 15:12:40.000 | 2025-08-11 15:12:50.000 |                  104 |
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 |                   96 |
| +I | 2025-08-11 15:13:00.000 | 2025-08-11 15:13:10.000 |                  100 |
  1. 在MySQL中验证查询

选取数据

+----+-------------------------+-------------------------+----------------------+
| op |                 w_start |                   w_end |                   pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 |                   96 

转换时间戳

时间戳转换前转换后
w_start2025-08-11 15:12:50.0001754896370000
w_end2025-08-11 15:13:00.0001754896380000

MySQL中查询

# 输出96与Flink结果一致
select count(*) 
from user_log 
where logtime>= 1754896370000 and logtime < 1754896380000;

七、POM文件

<project><groupId>dblab</groupId><artifactId>demo</artifactId><modelVersion>4.0.0</modelVersion><name> </name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>central-repos</id><name>Central Repository</name><url>http://repo.maven.apache.org/maven2</url></repository><repository><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-csv</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.39</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

八、常见问题

8.1 未定义水位线

Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:327)at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:307)at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:300)at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:265)at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:262)at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:641)at UserLogCount.main(UserLogCount.java:42)

当TableAPI中未定义水位线时,会导致Flink无法识别窗口的时间戳

//定义水位线
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")
http://www.dtcms.com/a/325717.html

相关文章:

  • 电路笔记参考图
  • 老式大头显示器(CRT)和当前最高分辨率的LED显示器对比
  • 掌握do-while循环:从语法到运用
  • IoT/实现和分析 NB-IoT+DTLS+PSK 接入华为云物联网平台IoTDA过程,总结避坑攻略
  • DeepCompare文件深度对比软件:权限管理与安全功能全面解析
  • Day12 Maven高级
  • openpnp - 顶部相机环形灯光DIY
  • 基于AI量化模型的比特币周期重构:传统四年规律是否被算法因子打破?
  • Apple Intelligence
  • 代币化股票的崛起:比特币安全吗?
  • Linux操作系统从入门到实战(十九)进程状态
  • SpringBoot 实现 Excel 导入导出功能的三种实现方式
  • SpringBoot 自动配置核心机制(面试高频考点)
  • 随身WiFi技术军备赛白热化:WiFi6架构下放中端市场,格行中兴华为三足鼎立;从芯片到场景的 10 款标杆产品深度解析
  • 使用Windbg分析多线程死锁项目实战问题分享
  • FPGA学习笔记——DS18B20(数字温度传感器)
  • 智慧工地:以三大监测技术筑牢安全屏障
  • 衡石科技HENGSHI SENSE 6.0 亮点功能一览-新增仪表盘入口和可视化
  • 【软件安装那些事 6】SOLIDWORKS 2021 详细安装教程(中文简体版)步骤完整不跳步 { 附软件提取下载链接,永久有效---------百度网盘 }
  • Python进阶(6):模块Modules
  • 游戏美术总监级工作流:Firefly AI赋能概念设计,从2D到3D重塑开发管线!
  • CVPR 2025 | 视觉感知新突破丨PF3Det、SemiDAViL与3D物体功能定位的创新点合集
  • MacroDroid 安卓版:功能强大的安卓自动化应用
  • Blender 数据集格式介绍
  • Mybatis学习之逆向工程(十)
  • 华为虚拟防火墙配置案例详解
  • 【软考中级网络工程师】知识点之 UDP 协议:网络通信中的高效轻骑兵
  • Open-Source Agentic Hybrid RAG Framework for Scientific Literature Review
  • Spark 优化全攻略:从 “卡成 PPT“ 到 “飞一般体验“
  • Hadoop和Spark的区别