当前位置: 首页 > wzjs >正文

一流的龙岗网站设计网站关键词优化软件

一流的龙岗网站设计,网站关键词优化软件,wordpress submenu,做ppt图片用的网站1.业务需求实时计算需求 :业务背景:支付宝智能助理上线后,用户只需从支付宝首页轻松下拉即可体验,因此得到了大量用户的使用和反馈。业务需求:现在业务方想统计每个用户在智能助理近30min、近1h、近6h的会话数和会话时长&#xff…

1.业务需求

实时计算需求 :

业务背景:支付宝智能助理上线后,用户只需从支付宝首页轻松下拉即可体验,因此得到了大量用户的使用和反馈。

业务需求:现在业务方想统计每个用户在智能助理近30min、近1h、近6h的会话数和会话时长,并随着时间推移,每1分钟更新一次。

额外诉求:如果用户在30min、1h、6h后没有行为数据,则将会话数和会话时长清零。

2.方案设计

  1. 数据流处理:使用KeyedProcessFunction按用户ID分组,维护用户状态。

  2. 状态管理:为每个用户维护三个窗口(30min、1h、6h)的统计信息(会话数、会话时长、最后一次事件时间)。

  3. 定时器触发:每分钟触发一次定时器,检查用户状态是否过期并清零统计值。

  4. Watermark机制:基于事件时间处理,确保时间窗口的准确性。

3.代码实现

3.1 定义数据类型

// 用户会话事件
public class SessionEvent {private String userId;private long eventTime; // 事件时间(毫秒)private long sessionDuration; // 会话时长(毫秒)// 构造函数、getter/setter
}// 用户统计结果
public class UserStats {private String userId;private long count30min, duration30min;private long count1h, duration1h;private long count6h, duration6h;// 构造函数、getter/setter
}

3.2 用户状态管理类

public class UserWindowState {private static final int[] WINDOW_DURATIONS = {30 * 60 * 1000, 60 * 60 * 1000, 6 * 60 * 60 * 1000};private long[] sessionCounts = new long[3];private long[] durations = new long[3];private long lastEventTime;// 更新指定窗口的统计值public void update(int windowIndex, long countDelta, long durationDelta) {sessionCounts[windowIndex] += countDelta;durations[windowIndex] += durationDelta;}// 清空指定窗口的统计值public void clear(int windowIndex) {sessionCounts[windowIndex] = 0;durations[windowIndex] = 0;}// 获取统计值public long getSessionCount(int index) { return sessionCounts[index]; }public long getDuration(int index) { return durations[index]; }// 设置/获取最后事件时间public void setLastEventTime(long time) { lastEventTime = time; }public long getLastEventTime() { return lastEventTime; }
}

3.3 KeyedProcessFunction

public class UserStatsFunction extends KeyedProcessFunction<String, SessionEvent, UserStats> {private transient ValueState<UserWindowState> state;private static final int[] WINDOW_DURATIONS = {30 * 60 * 1000, 60 * 60 * 1000, 6 * 60 * 60 * 1000};@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<UserWindowState> descriptor =new ValueStateDescriptor<>("userWindowState", UserWindowState.class);state = getRuntimeContext().getState(descriptor);}@Overridepublic void processElement(SessionEvent event, Context ctx, Collector<UserStats> out) throws Exception {UserWindowState currentState = state.value();if (currentState == null) currentState = new UserWindowState();// 更新所有窗口的统计值for (int i = 0; i < 3; i++) {currentState.update(i, 1, event.getSessionDuration());}currentState.setLastEventTime(event.getEventTime());// 保存状态state.update(currentState);// 注册定时器到下一个整分钟long nextTriggerTime = computeNextTriggerTime(event.getEventTime());ctx.timerService().registerEventTimeTimer(nextTriggerTime);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<UserStats> out) throws Exception {UserWindowState currentState = state.value();if (currentState == null) return;// 检查每个窗口是否过期for (int i = 0; i < 3; i++) {long windowDuration = WINDOW_DURATIONS[i];long validStart = timestamp - windowDuration;if (currentState.getLastEventTime() < validStart) {currentState.clear(i);}}// 保存更新后的状态state.update(currentState);// 输出统计结果UserStats stats = new UserStats(ctx.getCurrentKey(),currentState.getSessionCount(0), currentState.getDuration(0),currentState.getSessionCount(1), currentState.getDuration(1),currentState.getSessionCount(2), currentState.getDuration(2));out.collect(stats);// 注册下个定时器ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);}// 计算下一个整分钟的触发时间private long computeNextTriggerTime(long eventTime) {long minutes = eventTime / 60000;return (minutes + 1) * 60000;}
}

3.4 Flink主程序

public class FlinkJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 输入数据流DataStream<SessionEvent> input = env.addSource(new FlinkKafkaConsumer<>(...));// 设置事件时间及Watermarkinput = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SessionEvent>(Time.seconds(5)) {@Overridepublic long extractTimestamp(SessionEvent element) {return element.getEventTime();}});// 处理逻辑DataStream<UserStats> result = input.keyBy(SessionEvent::getUserId).process(new UserStatsFunction());// 输出结果result.print();env.execute("User Session Stats");}
}

4.核心关键点说明

  1. 定时器机制:

    • 每个用户每分钟触发一次定时器,检查窗口是否过期。

    • 若用户最后一次事件时间超过窗口阈值,则清零统计值。

  2. 状态管理:

    • 使用ValueState维护用户状态,包含三个窗口的会话数、时长和最后一次事件时间。

  3. 事件时间处理:

    • 通过assignTimestampsAndWatermarks设置事件时间,确保窗口计算的准确性。

  4. 输出更新:

    • 每分钟触发定时器时,输出当前用户的统计结果,即使统计值为零。

5.代码答疑

1.为什么需要两处注册?

首次触发的启动问题:

  • 如果只有 onTimer 中的注册(即无 processElement 的注册),则定时器链可能无法启动。

  • 假设用户长时间没有事件(如 2 小时未活跃),则 processElement 永远不会被调用,导致无法注册第一个定时器,定时器链无法启动。

processElement 的注册确保了当首次事件到达时,定时器链被启动。

持续触发的维持问题:

如果只有 processElement 的注册(即无 onTimer 的注册),则定时器链可能中断。

  • 假设用户最后一次事件是在 14:23:45,则会注册 14:24:00 的定时器。

  • 如果后续 没有新事件,则 processElement 不会被调用,无法注册 14:25:00 的定时器,导致定时器链在 14:24:00 后中断。

  • onTimer 的注册确保了即使没有新事件,定时器也能持续触发。

总结:

  • processElement 的注册是启动定时器链的必要条件(处理冷启动场景)。

  • onTimer 的注册是维持定时器链持续运行的核心逻辑(覆盖无新事件的情况)。

  • Flink 的内部机制会自动去重同一时间的定时器,因此 不会产生额外资源浪费。

http://www.dtcms.com/wzjs/24230.html

相关文章:

  • 哈尔滨建设工程招投标办公室湘潭seo公司
  • 杭州建设网站哪家好网站生成app
  • 便民平台推广怎么做中国网络优化公司排名
  • 社科联网站建设山东公司网站推广优化
  • 做动漫主题的网站上海关键词优化的技巧
  • 宿迁公司做网站百度搜索如何去广告
  • 做酱菜网站爱站工具包怎么使用
  • 南乐县住房和城乡建设局网站成都seo整站
  • 网站分页需要前端做还是后端友情链接作用
  • 做图文链接网站seo和sem
  • 企业网站建设要注意网站整合营销推广
  • html网页导航栏代码郑州seo服务技术
  • 网站的安全检查怎么做腾讯新闻发布平台
  • 赣州网站建设精英百度搜索大数据查询
  • 网站建设 意见征集优化网站标题是什么意思
  • 岑溪网站开发工作室免费s站推广网站
  • 深圳公司免费网站建设怎么样旅游seo整站优化
  • 龙岩人自己的网站百度手机助手app免费下载
  • 咸阳市城乡建设规划局网站网站的建设流程
  • 如何去除wordpress首页功能宁波seo公司排名榜
  • 佛山网站建设哪里有网站开发建站
  • php网站开发 招聘温州网站建设
  • 东莞建设工程检测中心网站湖南网站推广公司
  • 如何利用源代码做网站seo关键词排名优化专业公司
  • 一级a做爰片i免费网站网店推广实训系统
  • 电子商务网站cms湖南正规关键词优化报价
  • 中山建设信息网站免费b站推广网站入口2020
  • 广州有哪些软件开发公司西安seo排名
  • 整站快速排名免费网站站长查询
  • 工业园区网站建设seo是什么意思电商