当前位置: 首页 > wzjs >正文

山东大学青岛校区建设指挥部网站seo软件定制

山东大学青岛校区建设指挥部网站,seo软件定制,重庆忠县网站建设公司哪家好,个人网站如何备案前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.
http://www.dtcms.com/wzjs/471951.html

相关文章:

  • 西安东郊做网站推广竞价托管费用
  • 哪个网站可以接任务做兼职百度投诉中心入口
  • 建造免费网站新手如何自己做网站
  • 绍兴网站制作套餐网络推广内容
  • 宝安附近公司做网站建设哪家效益快优化网站标题和描述的方法
  • 网站开发使用软件环境关键词指数
  • 做网站大概需要几步网络推广营销公司
  • 网站报价表怎么做搜狗站长工具平台
  • 宁乡小程序开发游戏优化是什么意思
  • 软件开发网站能做seo吗市场营销策划公司
  • web前端开发的意义seo优化seo外包
  • 易语言可以做网站管理系统吗百度搜索引擎的功能
  • 公司网站关键词搜索公司建网站多少钱
  • 如何建设简单网站深圳搜索排名优化
  • 网站导航html手机金融界网站
  • 电子商务网站建设相关职位谷歌浏览器 官网下载
  • 2008系统如何做网站seo优化教程自学网
  • 学前端要逛那些网站市场营销图片高清
  • 个人网站制作模板图片网络营销有什么岗位
  • 牡丹江网站推广免费引流推广怎么做
  • 公司网站怎么做才能吸引人深圳seo优化服务
  • 网站的整体风格seo网络优化是做什么的
  • 如何做有亮点的网站建设方案怎样做推广是免费的
  • 直播的网站开发站长查询工具
  • 网站推广外链怎么做宁波seo公司推荐
  • 网页设计制作网站模板草图网络推广属于什么行业
  • 做pc端网站价位自己怎么做网页
  • 零陵旅游建设投资公司网站百度网盘免费下载
  • 如何提升网站打开速度十大嵌入式培训机构
  • 小型私人会所装修设计sem和seo是什么职业