当前位置: 首页 > wzjs >正文

广州网站营销优化开发网站建设 项目经验

广州网站营销优化开发,网站建设 项目经验,wordpress汽车主题公园,深圳 三人 网站建设前言 在flink api中,聚合算子是非常常用的。所谓的聚合就是在分组的基础上做比较计算的操作。下面通过几个简单案例来说明聚合算子的用法和注意事项。 聚合算子案例 因为flink的api操作流程比较固定,从获取执行环境》获取数据源》执行数据转换操作》输…

前言

在flink api中,聚合算子是非常常用的。所谓的聚合就是在分组的基础上做比较计算的操作。下面通过几个简单案例来说明聚合算子的用法和注意事项。

聚合算子案例

因为flink的api操作流程比较固定,从获取执行环境==》获取数据源==》执行数据转换操作==》输出结果。为了复用代码,参考代码使用了一个模板设计模式。

先定义一个Stream的泛型接口

package com.tml.common;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public interface StreamService<T> {StreamExecutionEnvironment getEnv();DataStream<T>  getSource(StreamExecutionEnvironment env);
}

抽象一个模板

package com.tml.common;import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public abstract class AbsStreamCommonService<T> implements StreamService<T> {public void processStream(Integer parallelism) throws Exception {StreamExecutionEnvironment env = getEnv();env.setParallelism(parallelism);DataStream<T> stream = getSource(env);handle(stream);env.execute();}public abstract void handle(DataStream<T> source);@Overridepublic StreamExecutionEnvironment getEnv() {return StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());}public DataStream<String> getSourceFromSocket(StreamExecutionEnvironment environment) {return environment.socketTextStream("43.139.114.233", 9999);}public DataStream<CommonMsg> getSourceFromCollection(StreamExecutionEnvironment environment) {DataStreamSource<CommonMsg> source = environment.fromElements(new CommonMsg("11", "hello world", 11L),new CommonMsg("11", "hello flink", 3L),new CommonMsg("12", "hello kitty", 13L),new CommonMsg("13", "hello world", 12L),new CommonMsg("11", "hello java", 23L));return source;}public DataStream<Long> getSourceFromDataGenerator(StreamExecutionEnvironment environment) {DataGeneratorSource<Long> dataGeneratorSource =new DataGeneratorSource<>((GeneratorFunction<Long, Long>) o -> o, 100000L,RateLimiterStrategy.perSecond(2), Types.LONG);return environment.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource", Types.LONG);}}

注:使用 

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())可以在控制台看到flink的web-ui界面,默认是http://localhost:8081,方便看到flink job的执行参数,这种方式适用于本地调试和学习

 比如这样

 对应的pom文件依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.tml</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>flink-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Streaming API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API and SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency></dependencies>
</project>

keyBy

package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo extends AbsStreamCommonService<CommonMsg> {public static void main(String[] args) throws Exception {new KeyByDemo().processStream(4);}@Overridepublic void handle(DataStream<CommonMsg> stream) {/*** keyby算子返回的是一个keyedStream* 1.keyby不是一个转换算子,只对数据进行了重分区,另外还不能设置并行度* 2.keyby分组和分区的概念*  keyby是对数据进行分组,保证同一个分组的数据会落到同一个数据分区内*  分区:一个子任务可以理解为一个分区,一个分区可以包含有多个分组的数据*/KeyedStream<CommonMsg, String> keyBy = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));keyBy.print();}@Overridepublic DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {return super.getSourceFromCollection(env);}
}

数据源是一个有界的数组,对应的数据是程序中自己new出来的,执行结果如下

 2> CommonMsg(id=11, msg=hello world, time=11)
2> CommonMsg(id=11, msg=hello flink, time=3)
2> CommonMsg(id=11, msg=hello java, time=23)
1> CommonMsg(id=12, msg=hello kitty, time=13)
3> CommonMsg(id=13, msg=hello world, time=12)

可以看到,通过keyBy的分组操作,相同的数据放在了同一个分区去执行。 

sum/min/minBy/max/maxBy

这几个是最基本的聚合算子。

package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SimpleAggregateDemo extends AbsStreamCommonService<CommonMsg> {public static void main(String[] args) throws Exception {new SimpleAggregateDemo().processStream(1);}@Overridepublic void handle(DataStream<CommonMsg> stream) {KeyedStream<CommonMsg, String> keyStream = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));//使用sum聚合//SingleOutputStreamOperator<CommonMsg> time = stream.sum("time");//SingleOutputStreamOperator<CommonMsg> min = stream.min("time");/*** max、maxyBy的区别在于* max不会对非比较字段重新赋值,而maxBy会更新非比较字段的值*/SingleOutputStreamOperator<CommonMsg> minBy = keyStream.minBy("time");//min.print();minBy.print();}@Overridepublic DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {return super.getSourceFromCollection(env);}
}

先看一下minBy这个算子结果输出

CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello flink, time=3)

将聚合操作的api换成min(),对比一下程序的输出

CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello world, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello world, time=3)

两个对比输出可以发现,min、minBy的区别在于

min不会对非比较字段重新赋值,而minBy会更新非比较字段的值

当然max、maxBy也是一样

reduce

package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceDemo extends AbsStreamCommonService<CommonMsg> {public static void main(String[] args) throws Exception {new ReduceDemo().processStream(1);}@Overridepublic void handle(DataStream<CommonMsg> source) {KeyedStream<CommonMsg, String> stream = source.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));/*** reduce函数是非常灵活的,可以根据业务需求,非常灵活的进行聚合计算* 当每个分组中只有一条数据的时候,是不会进行reduce的,因为只有一条数据,没有比较的数据,进行reduce没有必要*/SingleOutputStreamOperator<CommonMsg> reduce = stream.reduce((t1, t2) -> {System.out.println("t1==>" + t1);System.out.println("t2==>" + t2);CommonMsg commonMsg = new CommonMsg(t1.getId(), t2.getMsg(), t1.getTime() + t2.getTime());return commonMsg;});reduce.print();}@Overridepublic DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {return super.getSourceFromCollection(env);}
}

看一下运行结果

CommonMsg(id=11, msg=hello world, time=11)
t1==>CommonMsg(id=11, msg=hello world, time=11)
t2==>CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=11, msg=hello flink, time=14)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
t1==>CommonMsg(id=11, msg=hello flink, time=14)
t2==>CommonMsg(id=11, msg=hello java, time=23)
CommonMsg(id=11, msg=hello java, time=37) 

通过运行结果可以看到,reduce算子是非常灵活的,可以在两个数据之间做非常灵活的操作,当然,如果对应的分组中只有一条数据,自然是 不会触发reduce函数的执行了。 

richFunction

package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** richfunction添加了一些额外的功能* 提供了一些生命周期的管理方法,比如open()\close()* open() 在每个子任务启动的时候调用一次* close() 在每个任务结束的时候调用一次,如果是flink程序挂掉,不会调用这个close方法,在控制台上点击cancel任务,这个close方法也是可以额正常调用的** 另外多了一些运行时上下文,可以通过getRuntimeContext() 来获取上下文中的一些关键信息* 在close方法中可以做一些释放资源的操作,回调通知操作等一些hook函数*/
public class RichFunctionDemo extends AbsStreamCommonService<String> {public static void main(String[] args) throws Exception {new RichFunctionDemo().processStream(1);}@Overridepublic void handle(DataStream<String> stream) {SingleOutputStreamOperator<String> map = stream.map(new RichMapFunction<String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext context = getRuntimeContext();String taskName = context.getTaskName();int subtasks = context.getNumberOfParallelSubtasks();System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call open()");}@Overridepublic void close() throws Exception {super.close();RuntimeContext context = getRuntimeContext();String taskName = context.getTaskName();int subtasks = context.getNumberOfParallelSubtasks();System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call close()");}@Overridepublic String map(String value) throws Exception {return "(" + value + ")";}}, TypeInformation.of(String.class));map.print();}@Overridepublic DataStream<String> getSource(StreamExecutionEnvironment env) {return super.getSourceFromSocket(env);}
}

运行程序前需要先运行socket,这里使用了nc,详细可以参考Flink实时统计单词【入门】

看一下运行结果 

taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call open()
(hello kitty)
(hello flink)
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call close()

总结

  • 这些聚合算子的基础实在keyBy之后,只有对数据进行了分组之后,才能执行后面的聚合操作。
  • min、minBy和max、maxBy之间有细微的区别,前者不会对非比较字段重新赋值,而后者会更新非比较字段的值
  • reduce算子是在两个数据之间进行操作的,可以非常灵活
  • richFunction不算聚合函数,这里写进来是富函数可以做非常多的额外功能,open()方法是对应的子任务启动时调用一下,close()方法是在对应的子任务结束的时候调用一次,通过这个可以做一些监控或者hook通知的操作

代码案例已经上传到了github,欢迎前来围观!


文章转载自:

http://PmIouLRt.grxsc.cn
http://jzGExpTr.grxsc.cn
http://SSr1TnEY.grxsc.cn
http://L0vB5l3k.grxsc.cn
http://ydLp9L26.grxsc.cn
http://Ad5QuHUr.grxsc.cn
http://eEiyVlW9.grxsc.cn
http://BLa4PX0q.grxsc.cn
http://rqL4XkHY.grxsc.cn
http://gL1xMG0H.grxsc.cn
http://1IgICq8b.grxsc.cn
http://mFVbIJ5k.grxsc.cn
http://kUEskuIz.grxsc.cn
http://CnPOI3zr.grxsc.cn
http://7Lst0Gb2.grxsc.cn
http://BXcRAzdr.grxsc.cn
http://25MHxTd6.grxsc.cn
http://n66mBOxI.grxsc.cn
http://qfzejJgs.grxsc.cn
http://FDBgOMro.grxsc.cn
http://8f5IQw2d.grxsc.cn
http://8zXiu2he.grxsc.cn
http://LqpB54IS.grxsc.cn
http://bkyOmwpF.grxsc.cn
http://lyTcek4D.grxsc.cn
http://pXEI5YRZ.grxsc.cn
http://xYYp3De0.grxsc.cn
http://btYCSmvM.grxsc.cn
http://LiYYJUrm.grxsc.cn
http://SmCzozfK.grxsc.cn
http://www.dtcms.com/wzjs/623813.html

相关文章:

  • 济南网站优化费用目前最新推广平台
  • 做简历好的网站wordpress数据库怎么连接
  • dw怎么做网站后台推广 网站的优秀文案
  • 怎么创网站久就建筑网
  • 教育网站建设需求文档网页建设企业
  • 嘉兴提高网站排名保险公司早会新闻资讯
  • 2018年企业网站优化如何做中国兼职设计师网
  • 做装饰画的行业网站怎样做建网站做淘客
  • 建设论坛网站自学郑州新闻发布会最新消息今天视频
  • 网站策划与网上营销网站设计是不是会要用代码做
  • 网站建设考评表企业网站建设上海
  • 谷歌seo 外贸建站北京黄村专业网站建设价钱
  • 商务网站规划与建设wordpress调用固定链接结构
  • 做个网站费用多少海南城乡建设网站
  • 网站整站源码下载网页字体尺寸设计
  • 帮您做网站做英语阅读的网站
  • 广西南宁市网站建设服务中心黑科技软件合集网站
  • 合肥市建设工程造价管理站网站wordpress专题修改
  • 如何把电脑改成服务器 做网站网站外链有多重要
  • win7 asp网站无法显示该页面自己建设企业网站
  • wordpress 页面设计seo网络优化是什么工作
  • 网页游戏排行榜前十知乎纵横seo
  • 福建省效能建设网站洛阳自助建站
  • 怎么推广自己的网站如何提高seo关键词排名
  • 文化网站建设江西智能网站建设哪里有
  • 企业建设网站的目的是网站开发能用到的ps知识
  • 建设银行信用卡网站查询申请自媒体账号入口
  • 提高整个网站权重新手做自己的网站
  • 房产网站系统源码wordpress 根据ua跳转
  • 网站制作公司网站超链接