如何查看网站cms系统媒体公关公司
前言
在flink api中,聚合算子是非常常用的。所谓的聚合就是在分组的基础上做比较计算的操作。下面通过几个简单案例来说明聚合算子的用法和注意事项。
聚合算子案例
因为flink的api操作流程比较固定,从获取执行环境==》获取数据源==》执行数据转换操作==》输出结果。为了复用代码,参考代码使用了一个模板设计模式。
先定义一个Stream的泛型接口
package com.tml.common;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public interface StreamService<T> {StreamExecutionEnvironment getEnv();DataStream<T> getSource(StreamExecutionEnvironment env);
}
抽象一个模板
package com.tml.common;import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public abstract class AbsStreamCommonService<T> implements StreamService<T> {public void processStream(Integer parallelism) throws Exception {StreamExecutionEnvironment env = getEnv();env.setParallelism(parallelism);DataStream<T> stream = getSource(env);handle(stream);env.execute();}public abstract void handle(DataStream<T> source);@Overridepublic StreamExecutionEnvironment getEnv() {return StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());}public DataStream<String> getSourceFromSocket(StreamExecutionEnvironment environment) {return environment.socketTextStream("43.139.114.233", 9999);}public DataStream<CommonMsg> getSourceFromCollection(StreamExecutionEnvironment environment) {DataStreamSource<CommonMsg> source = environment.fromElements(new CommonMsg("11", "hello world", 11L),new CommonMsg("11", "hello flink", 3L),new CommonMsg("12", "hello kitty", 13L),new CommonMsg("13", "hello world", 12L),new CommonMsg("11", "hello java", 23L));return source;}public DataStream<Long> getSourceFromDataGenerator(StreamExecutionEnvironment environment) {DataGeneratorSource<Long> dataGeneratorSource =new DataGeneratorSource<>((GeneratorFunction<Long, Long>) o -> o, 100000L,RateLimiterStrategy.perSecond(2), Types.LONG);return environment.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource", Types.LONG);}}
注:使用
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())可以在控制台看到flink的web-ui界面,默认是http://localhost:8081,方便看到flink job的执行参数,这种方式适用于本地调试和学习
比如这样
对应的pom文件依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.tml</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>flink-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Streaming API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API and SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency></dependencies>
</project>
keyBy
package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo extends AbsStreamCommonService<CommonMsg> {public static void main(String[] args) throws Exception {new KeyByDemo().processStream(4);}@Overridepublic void handle(DataStream<CommonMsg> stream) {/*** keyby算子返回的是一个keyedStream* 1.keyby不是一个转换算子,只对数据进行了重分区,另外还不能设置并行度* 2.keyby分组和分区的概念* keyby是对数据进行分组,保证同一个分组的数据会落到同一个数据分区内* 分区:一个子任务可以理解为一个分区,一个分区可以包含有多个分组的数据*/KeyedStream<CommonMsg, String> keyBy = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));keyBy.print();}@Overridepublic DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {return super.getSourceFromCollection(env);}
}
数据源是一个有界的数组,对应的数据是程序中自己new出来的,执行结果如下
2> CommonMsg(id=11, msg=hello world, time=11)
2> CommonMsg(id=11, msg=hello flink, time=3)
2> CommonMsg(id=11, msg=hello java, time=23)
1> CommonMsg(id=12, msg=hello kitty, time=13)
3> CommonMsg(id=13, msg=hello world, time=12)
可以看到,通过keyBy的分组操作,相同的数据放在了同一个分区去执行。
sum/min/minBy/max/maxBy
这几个是最基本的聚合算子。
package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SimpleAggregateDemo extends AbsStreamCommonService<CommonMsg> {public static void main(String[] args) throws Exception {new SimpleAggregateDemo().processStream(1);}@Overridepublic void handle(DataStream<CommonMsg> stream) {KeyedStream<CommonMsg, String> keyStream = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));//使用sum聚合//SingleOutputStreamOperator<CommonMsg> time = stream.sum("time");//SingleOutputStreamOperator<CommonMsg> min = stream.min("time");/*** max、maxyBy的区别在于* max不会对非比较字段重新赋值,而maxBy会更新非比较字段的值*/SingleOutputStreamOperator<CommonMsg> minBy = keyStream.minBy("time");//min.print();minBy.print();}@Overridepublic DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {return super.getSourceFromCollection(env);}
}
先看一下minBy这个算子结果输出
CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello flink, time=3)
将聚合操作的api换成min(),对比一下程序的输出
CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello world, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello world, time=3)
两个对比输出可以发现,min、minBy的区别在于
min不会对非比较字段重新赋值,而minBy会更新非比较字段的值
当然max、maxBy也是一样
reduce
package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceDemo extends AbsStreamCommonService<CommonMsg> {public static void main(String[] args) throws Exception {new ReduceDemo().processStream(1);}@Overridepublic void handle(DataStream<CommonMsg> source) {KeyedStream<CommonMsg, String> stream = source.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));/*** reduce函数是非常灵活的,可以根据业务需求,非常灵活的进行聚合计算* 当每个分组中只有一条数据的时候,是不会进行reduce的,因为只有一条数据,没有比较的数据,进行reduce没有必要*/SingleOutputStreamOperator<CommonMsg> reduce = stream.reduce((t1, t2) -> {System.out.println("t1==>" + t1);System.out.println("t2==>" + t2);CommonMsg commonMsg = new CommonMsg(t1.getId(), t2.getMsg(), t1.getTime() + t2.getTime());return commonMsg;});reduce.print();}@Overridepublic DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {return super.getSourceFromCollection(env);}
}
看一下运行结果
CommonMsg(id=11, msg=hello world, time=11)
t1==>CommonMsg(id=11, msg=hello world, time=11)
t2==>CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=11, msg=hello flink, time=14)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
t1==>CommonMsg(id=11, msg=hello flink, time=14)
t2==>CommonMsg(id=11, msg=hello java, time=23)
CommonMsg(id=11, msg=hello java, time=37)
通过运行结果可以看到,reduce算子是非常灵活的,可以在两个数据之间做非常灵活的操作,当然,如果对应的分组中只有一条数据,自然是 不会触发reduce函数的执行了。
richFunction
package com.tml.operator.aggregation;import com.tml.common.AbsStreamCommonService;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** richfunction添加了一些额外的功能* 提供了一些生命周期的管理方法,比如open()\close()* open() 在每个子任务启动的时候调用一次* close() 在每个任务结束的时候调用一次,如果是flink程序挂掉,不会调用这个close方法,在控制台上点击cancel任务,这个close方法也是可以额正常调用的** 另外多了一些运行时上下文,可以通过getRuntimeContext() 来获取上下文中的一些关键信息* 在close方法中可以做一些释放资源的操作,回调通知操作等一些hook函数*/
public class RichFunctionDemo extends AbsStreamCommonService<String> {public static void main(String[] args) throws Exception {new RichFunctionDemo().processStream(1);}@Overridepublic void handle(DataStream<String> stream) {SingleOutputStreamOperator<String> map = stream.map(new RichMapFunction<String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext context = getRuntimeContext();String taskName = context.getTaskName();int subtasks = context.getNumberOfParallelSubtasks();System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call open()");}@Overridepublic void close() throws Exception {super.close();RuntimeContext context = getRuntimeContext();String taskName = context.getTaskName();int subtasks = context.getNumberOfParallelSubtasks();System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call close()");}@Overridepublic String map(String value) throws Exception {return "(" + value + ")";}}, TypeInformation.of(String.class));map.print();}@Overridepublic DataStream<String> getSource(StreamExecutionEnvironment env) {return super.getSourceFromSocket(env);}
}
运行程序前需要先运行socket,这里使用了nc,详细可以参考Flink实时统计单词【入门】
看一下运行结果
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call open()
(hello kitty)
(hello flink)
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call close()
总结
- 这些聚合算子的基础实在keyBy之后,只有对数据进行了分组之后,才能执行后面的聚合操作。
- min、minBy和max、maxBy之间有细微的区别,前者不会对非比较字段重新赋值,而后者会更新非比较字段的值
- reduce算子是在两个数据之间进行操作的,可以非常灵活
- richFunction不算聚合函数,这里写进来是富函数可以做非常多的额外功能,open()方法是对应的子任务启动时调用一下,close()方法是在对应的子任务结束的时候调用一次,通过这个可以做一些监控或者hook通知的操作
代码案例已经上传到了github,欢迎前来围观!