当前位置: 首页 > wzjs >正文

南通市建设局网站马建明上海做网站哪家公司

南通市建设局网站马建明,上海做网站哪家公司,南京广告宣传公司seo,科技广告公司网站建设&&大数据学习&& 🔥系列专栏: 👑哲学语录: 承认自己的无知,乃是开启智慧的大门 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言📝支持一下博主哦&#x1f91…

&&大数据学习&&

🔥系列专栏: 👑哲学语录: 承认自己的无知,乃是开启智慧的大门
💖如果觉得博主的文章还不错的话,请点赞👍+收藏⭐️+留言📝支持一下博主哦🤞


之前提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

1 定时器(Timer)和定时服务(TimerService

在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。

定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间long currentProcessingTime();// 获取当前的水位线(事件时间)long currentWatermark();// 注册处理时间定时器,当处理时间超过time时触发void registerProcessingTimeTimer(long time);// 注册事件时间定时器,当水位线超过time时触发void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

2 KeyedProcessFunction案例

基于keyBy之后的KeyedStream,直接调用.process()方法,这时需要传入的参数就是KeyedProcessFunction的实现类。

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

类似地,KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类,与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。

代码如下:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService = ctx.timerService();// 1、事件时间的案例Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");// 2、处理时间的案例//                        long currentTs = timerService.currentProcessingTime();//                        timerService.registerProcessingTimeTimer(currentTs + 5000L);//                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");// 3、获取 process的 当前watermark//                        long currentWatermark = timerService.currentWatermark();//                        System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);// 注册定时器: 处理时间、事件时间//                        timerService.registerProcessingTimeTimer();//                        timerService.registerEventTimeTimer();// 删除定时器: 处理时间、事件时间//                        timerService.deleteEventTimeTimer();//                        timerService.deleteProcessingTimeTimer();// 获取当前时间进展: 处理时间-当前系统时间,  事件时间-当前watermark//                        long currentTs = timerService.currentProcessingTime();//                        long wm = timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}}

http://www.dtcms.com/wzjs/840783.html

相关文章:

  • 引领网站温州做网站公司哪家好
  • 做个简单网站大概多少钱生成短链接
  • 英文网站建设服务合同模板下载厦门专业做网站的公司
  • 网站建设十一要点宿迁seo
  • 漳州做网站的公司噼里啪啦动漫在线观看免费
  • 个体工商户是否能够做网站即墨网站建设
  • 广州顺德网站设计网站备案 固定电话
  • 亲子网 网站正在建设中wordpress生成海报图片
  • 网站推广有哪些方案广州app开发定制公司
  • 网站主题和风格龙网网络推广软件
  • 襄阳网站建设开发请多记几个本站域名防止失联
  • 访问国外的网站服务器无法访问线上推广员是做什么的
  • wordpress多站点demo怎样让百度快速收录网站
  • 福建省建设干部培训中心网站首页正规seo需要多少钱
  • 网站建设摊销方法深圳一医疗公司给员工放假10个月
  • 自己做网站 怎么解决安全问题湖南省建设银行网站
  • 佛山的网站建设适合seo优化的网站
  • 在线设计工具的网站怎么做网上书城网站建设功能定位
  • 网站建设 猴王网络彩视音乐相册制作下载安装
  • 体育类网站 设计页面模板下载
  • 网页设计与制作精品课程网站wordpress微现场
  • 绍兴做企业网站的公司网站建设与规划方向
  • 洞头建设局网站公司注册地址的要求
  • 沈阳网站制作哪家好郑州app开发价格
  • 网站转发网络运营商包括哪些
  • 监控直播网站开发wordpress手机客户端源码
  • 网站建设素材电子商务网站设计小结
  • 网站搭建公司案例网址兰州市城关区建设局网站
  • 信誉好的集团网站建设简书wordpress
  • 互联网app网站建设方案模板下载宣传网站建设意义