当前位置: 首页 > news >正文

Flink DataStream 按分钟或日期统计数据量

一、环境版本

环境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

二、MySQL建表脚本

create table user_log
(id      int auto_increment comment '主键'primary key,uid     int    not null comment '用户id',event   int    not null comment '用户行为',logtime bigint null comment '日志时间'
)comment '用户日志表,作为验证数据源';

三、用户日志类

新建maven项目

用以定义Kafka和MySQL中Schema

/*** 用户日志类*/
@Data
public class UserLog {//用户uidprivate int uid;//用户行为private int event;//日志时间private Date logtime;//获取日期,用于按日期统计数据public String getFormatDate() {return DateUtil.format(logtime, "yyyyMMdd");}//获取时间,精确到分钟public String getFormatTime() {return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";}
}
}

四、用户数据生成器

/*** 用户数据生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定义数据生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 实现类new GeneratorFunction<Long, UserLog>(){// 定义随机数数据生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//随机生成用户uiduserLog.setUid(generator.nextInt(1, 50));//随机生成用户行为userLog.setEvent(generator.nextInt(1, 2));//随机生成用户数据时间userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定输出数据的总行数
//                60 * 60 * 10,120,// 指定每秒发射的记录数RateLimiterStrategy.perSecond(10),// 指定返回值类型, 将Java的StockPrice封装成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//输出生成数据
//        dataGeneratorSourceStream.print();//kafka数据写入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL数据写入,用以数据验证SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://192.168.31.116:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、DataStream按分钟或日期统计PV和UV

/*** 计算PV和UV*/
public class UserLogPVUVCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop01:9092").setTopics("userLog").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.earliest()).build();DataStreamSource<String> kafkaSourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");
//        kafkaSourceStream.print();//kafka数据反序列化SingleOutputStreamOperator<UserLog> userLogStream = kafkaSourceStream.map(s -> JSONUtil.toBean(s, UserLog.class));//计算pv和uv,按日期统计时需将getFormatTime改为getFormatDate,并且注释‘一分钟为窗口’代码和反注释‘一天为窗口’代码SingleOutputStreamOperator<Tuple3<String, String, Integer>> userPVUVStream =userLogStream.keyBy((KeySelector<UserLog, String>) UserLog::getFormatTime)// 一天为窗口,指定时间起点比时间戳时间早8个小时
//                .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))// 一分钟为窗口.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))// 10s触发一次计算,更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 剔除超过时间范围的数据.evictor(TimeEvictor.of(Time.seconds(0),true))// 计算pv uv.process(new MyProcessWindowFunction());userPVUVStream.print();env.execute();}
}
/*** 自定义窗口处理函数,计算PV和UV*/
public class MyProcessWindowFunction extends ProcessWindowFunction<UserLog, Tuple3<String, String, Integer>, String, TimeWindow> {// UVprivate transient MapState<Integer, String> uvState;// PVprivate transient ValueState<Integer> pvState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);uvState = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("uv", Integer.class, String.class));pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<>("pv", Integer.class));//ttl过期机制StateTtlConfig ttlConfig = StateTtlConfig//1分钟过期.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();// 开启ttlValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv", Integer.class);MapStateDescriptor<Integer, String> uvStateDescriptor = new MapStateDescriptor<>("uv", Integer.class, String.class);pvStateDescriptor.enableTimeToLive(ttlConfig);uvStateDescriptor.enableTimeToLive(ttlConfig);pvState = this.getRuntimeContext().getState(pvStateDescriptor);uvState = this.getRuntimeContext().getMapState(uvStateDescriptor);}@Overridepublic void process(String s, ProcessWindowFunction<UserLog, Tuple3<String, String, Integer>, String, TimeWindow>.Context context, Iterable<UserLog> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {Integer pv = 0;Iterator<UserLog> iterator = iterable.iterator();while (iterator.hasNext()){pv = pv + 1;Integer userId = iterator.next().getUid();uvState.put(userId,null);}pvState.update((pvState.value() == null ? 0 : pvState.value()) + pv);int uv = 0;Iterator<Integer> uvIterator = uvState.keys().iterator();while (uvIterator.hasNext()){uvIterator.next();uv = uv + 1;}collector.collect(Tuple3.of(s, "uv", uv));collector.collect(Tuple3.of(s, "pv", pvState.value()));}
}

六、数据验证

  1. 启动 UserLogGenerator
  2. 启动 UserLogCount
(2025-08-13 10:37:00,uv,45)
(2025-08-13 10:37:00,pv,118)
(2025-08-13 10:36:00,uv,2)
(2025-08-13 10:36:00,pv,2)
  1. 在MySQL中验证查询

转换时间戳

时间戳转换前转换后
w_start2025-08-13 10:36:001755052560000
w_end2025-08-13 10:37:001755052620000
# 与Flink输出一致
select count(distinct uid) from user_log where logtime< 1755052620000 and logtime>=1755052560000;select count(distinct uid) from user_log where logtime>= 1755052620000 ;

七、POM文件

<project><groupId>dblab</groupId><artifactId>demo</artifactId><modelVersion>4.0.0</modelVersion><name> </name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>central-repos</id><name>Central Repository</name><url>http://repo.maven.apache.org/maven2</url></repository><repository><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-csv</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.39</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

八、参考鸣谢

轻松通关Flink第33讲:Flink 计算 PV、UV 代码实现

Flink计算pv和uv的通用方法

http://www.dtcms.com/a/328894.html

相关文章:

  • 资源查看-lspci命令
  • django request.data.get 的值修改
  • python二叉树的深度优先遍历和广度优先遍历
  • OpenAI官方写的GPT-5 prompt指南
  • Prompt工程师基础技术学习指南:从入门到实战
  • 实战多屏Wallpaper壁纸显示及出现黑屏问题bug分析-学员作业
  • 理解RESTful架构:构建优雅高效的Web服务
  • 直播美颜SDK开发实战:高性能人脸美型的架构与实现
  • STM32HAL 快速入门(六):GPIO 输入之按键控制 LED
  • 代码架构之 BO、PO、DTO
  • 边缘计算:数据处理新范式的革命性架构
  • Dots.ocr:告别复杂多模块架构,1.7B参数单一模型统一处理所有OCR任务22
  • 系统垃圾清理批处理脚本 (BAT)
  • 电子电气架构 --- 软件项目文档管理
  • OpenCVSharp中的HDR成像
  • 杂记 01
  • electron进程间通信- 从渲染进程到主进程
  • Spring cloud集成ElastictJob分布式定时任务完整攻略(含snakeyaml报错处理方法)
  • 移动端网页调试实战,触摸事件穿透与点击冲突问题的定位与优化
  • C++中的`auto`与`std::any`:功能、区别与选择建议
  • CV 医学影像分类、分割、目标检测,之【肝脏分割】项目拆解
  • 数据挖掘常用公开数据集
  • [爬虫实战] 基于半自动化的cookie池更新逻辑讲解
  • 数据分析总结
  • MyBatis 中 XML 与 DAO 接口的位置关系及扫描机制详解
  • 把 Linux 装进“小盒子”——边缘计算场景下的 Linux 裁剪、启动与远程运维全景指南
  • 关于Google Pixel,或者安卓16,状态栏颜色无法修改的解决方案
  • 双屏加固笔记本电脑C156-2:坚固与高效的完美融合
  • FPGA+护理:跨学科发展的探索(四)
  • 在CentOS 7上配置Android USB网络共享方式的方法