当前位置: 首页 > news >正文

Flink中聚合算子介绍

前言

在flink api中,聚合算子是非常常用的。所谓的聚合就是在分组的基础上做比较计算的操作。下面通过几个简单案例来说明聚合算子的用法和注意事项。

聚合算子案例

因为flink的api操作流程比较固定,从获取执行环境==》获取数据源==》执行数据转换操作==》输出结果。为了复用代码,参考代码使用了一个模板设计模式。

先定义一个Stream的泛型接口

package com.tml.common;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public interface StreamService<T> {

    StreamExecutionEnvironment getEnv();

    DataStream<T>  getSource(StreamExecutionEnvironment env);
}

抽象一个模板

package com.tml.common;

import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public abstract class AbsStreamCommonService<T> implements StreamService<T> {


    public void processStream(Integer parallelism) throws Exception {
        StreamExecutionEnvironment env = getEnv();
        env.setParallelism(parallelism);
        DataStream<T> stream = getSource(env);
        handle(stream);
        env.execute();
    }

    public abstract void handle(DataStream<T> source);

    @Override
    public StreamExecutionEnvironment getEnv() {

        return StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    }

    public DataStream<String> getSourceFromSocket(StreamExecutionEnvironment environment) {
        return environment.socketTextStream("43.139.114.233", 9999);
    }

    public DataStream<CommonMsg> getSourceFromCollection(StreamExecutionEnvironment environment) {
        DataStreamSource<CommonMsg> source = environment.fromElements(
                new CommonMsg("11", "hello world", 11L),
                new CommonMsg("11", "hello flink", 3L),
                new CommonMsg("12", "hello kitty", 13L),
                new CommonMsg("13", "hello world", 12L),
                new CommonMsg("11", "hello java", 23L));

        return source;
    }

    public DataStream<Long> getSourceFromDataGenerator(StreamExecutionEnvironment environment) {
        DataGeneratorSource<Long> dataGeneratorSource =
                new DataGeneratorSource<>((GeneratorFunction<Long, Long>) o -> o, 100000L,RateLimiterStrategy.perSecond(2), Types.LONG);
        return environment.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource", Types.LONG);
    }


}

注:使用 

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())可以在控制台看到flink的web-ui界面,默认是http://localhost:8081,方便看到flink job的执行参数,这种方式适用于本地调试和学习

 比如这样

 对应的pom文件依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.tml</groupId>
  <artifactId>flink-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>flink-demo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 -->
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink Streaming API -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink Table API and SQL -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.20</version>
    </dependency>
  </dependencies>
</project>

keyBy

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class KeyByDemo extends AbsStreamCommonService<CommonMsg> {

    public static void main(String[] args) throws Exception {
        new KeyByDemo().processStream(4);
    }

    @Override
    public void handle(DataStream<CommonMsg> stream) {
        /**
         * keyby算子返回的是一个keyedStream
         * 1.keyby不是一个转换算子,只对数据进行了重分区,另外还不能设置并行度
         * 2.keyby分组和分区的概念
         *  keyby是对数据进行分组,保证同一个分组的数据会落到同一个数据分区内
         *  分区:一个子任务可以理解为一个分区,一个分区可以包含有多个分组的数据
         */
        KeyedStream<CommonMsg, String> keyBy = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
        keyBy.print();
    }


    @Override
    public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromCollection(env);
    }
}

数据源是一个有界的数组,对应的数据是程序中自己new出来的,执行结果如下

 2> CommonMsg(id=11, msg=hello world, time=11)
2> CommonMsg(id=11, msg=hello flink, time=3)
2> CommonMsg(id=11, msg=hello java, time=23)
1> CommonMsg(id=12, msg=hello kitty, time=13)
3> CommonMsg(id=13, msg=hello world, time=12)

可以看到,通过keyBy的分组操作,相同的数据放在了同一个分区去执行。 

sum/min/minBy/max/maxBy

这几个是最基本的聚合算子。

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SimpleAggregateDemo extends AbsStreamCommonService<CommonMsg> {

    public static void main(String[] args) throws Exception {
        new SimpleAggregateDemo().processStream(1);
    }


    @Override
    public void handle(DataStream<CommonMsg> stream) {
        KeyedStream<CommonMsg, String> keyStream = stream.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));
        //使用sum聚合
        //SingleOutputStreamOperator<CommonMsg> time = stream.sum("time");
        //SingleOutputStreamOperator<CommonMsg> min = stream.min("time");
        /**
         * max、maxyBy的区别在于
         * max不会对非比较字段重新赋值,而maxBy会更新非比较字段的值
         */
        SingleOutputStreamOperator<CommonMsg> minBy = keyStream.minBy("time");
        //min.print();
        minBy.print();
    }

    @Override
    public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromCollection(env);
    }
}

先看一下minBy这个算子结果输出

CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello flink, time=3)

将聚合操作的api换成min(),对比一下程序的输出

CommonMsg(id=11, msg=hello world, time=11)
CommonMsg(id=11, msg=hello world, time=3)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
CommonMsg(id=11, msg=hello world, time=3)

两个对比输出可以发现,min、minBy的区别在于

min不会对非比较字段重新赋值,而minBy会更新非比较字段的值

当然max、maxBy也是一样

reduce

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import com.tml.msg.CommonMsg;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReduceDemo extends AbsStreamCommonService<CommonMsg> {

    public static void main(String[] args) throws Exception {
        new ReduceDemo().processStream(1);

    }

    @Override
    public void handle(DataStream<CommonMsg> source) {
        KeyedStream<CommonMsg, String> stream = source.keyBy((KeySelector<CommonMsg, String>) CommonMsg::getId, TypeInformation.of(String.class));

        /**
         * reduce函数是非常灵活的,可以根据业务需求,非常灵活的进行聚合计算
         * 当每个分组中只有一条数据的时候,是不会进行reduce的,因为只有一条数据,没有比较的数据,进行reduce没有必要
         */
        SingleOutputStreamOperator<CommonMsg> reduce = stream.reduce((t1, t2) -> {
            System.out.println("t1==>" + t1);
            System.out.println("t2==>" + t2);
            CommonMsg commonMsg = new CommonMsg(t1.getId(), t2.getMsg(), t1.getTime() + t2.getTime());

            return commonMsg;
        });

        reduce.print();
    }

    @Override
    public DataStream<CommonMsg> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromCollection(env);
    }
}

看一下运行结果

CommonMsg(id=11, msg=hello world, time=11)
t1==>CommonMsg(id=11, msg=hello world, time=11)
t2==>CommonMsg(id=11, msg=hello flink, time=3)
CommonMsg(id=11, msg=hello flink, time=14)
CommonMsg(id=12, msg=hello kitty, time=13)
CommonMsg(id=13, msg=hello world, time=12)
t1==>CommonMsg(id=11, msg=hello flink, time=14)
t2==>CommonMsg(id=11, msg=hello java, time=23)
CommonMsg(id=11, msg=hello java, time=37) 

通过运行结果可以看到,reduce算子是非常灵活的,可以在两个数据之间做非常灵活的操作,当然,如果对应的分组中只有一条数据,自然是 不会触发reduce函数的执行了。 

richFunction

package com.tml.operator.aggregation;

import com.tml.common.AbsStreamCommonService;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * richfunction添加了一些额外的功能
 * 提供了一些生命周期的管理方法,比如open()\close()
 * open() 在每个子任务启动的时候调用一次
 * close() 在每个任务结束的时候调用一次,如果是flink程序挂掉,不会调用这个close方法,在控制台上点击cancel任务,这个close方法也是可以额正常调用的
 *
 * 另外多了一些运行时上下文,可以通过getRuntimeContext() 来获取上下文中的一些关键信息
 * 在close方法中可以做一些释放资源的操作,回调通知操作等一些hook函数
 */
public class RichFunctionDemo extends AbsStreamCommonService<String> {
    public static void main(String[] args) throws Exception {
        new RichFunctionDemo().processStream(1);
    }

    @Override
    public void handle(DataStream<String> stream) {
        SingleOutputStreamOperator<String> map = stream.map(new RichMapFunction<String, String>() {

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                RuntimeContext context = getRuntimeContext();
                String taskName = context.getTaskName();
                int subtasks = context.getNumberOfParallelSubtasks();
                System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call open()");
            }

            @Override
            public void close() throws Exception {
                super.close();
                RuntimeContext context = getRuntimeContext();
                String taskName = context.getTaskName();
                int subtasks = context.getNumberOfParallelSubtasks();
                System.out.println("taskName: " + taskName + ", subtasks: " + subtasks + " call close()");
            }

            @Override
            public String map(String value) throws Exception {
                return "(" + value + ")";
            }
        }, TypeInformation.of(String.class));

        map.print();
    }

    @Override
    public DataStream<String> getSource(StreamExecutionEnvironment env) {
        return super.getSourceFromSocket(env);
    }
}

运行程序前需要先运行socket,这里使用了nc,详细可以参考Flink实时统计单词【入门】

看一下运行结果 

taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call open()
(hello kitty)
(hello flink)
taskName: Source: Socket Stream -> Map -> Sink: Print to Std. Out, subtasks: 1 call close()

总结

  • 这些聚合算子的基础实在keyBy之后,只有对数据进行了分组之后,才能执行后面的聚合操作。
  • min、minBy和max、maxBy之间有细微的区别,前者不会对非比较字段重新赋值,而后者会更新非比较字段的值
  • reduce算子是在两个数据之间进行操作的,可以非常灵活
  • richFunction不算聚合函数,这里写进来是富函数可以做非常多的额外功能,open()方法是对应的子任务启动时调用一下,close()方法是在对应的子任务结束的时候调用一次,通过这个可以做一些监控或者hook通知的操作

代码案例已经上传到了github,欢迎前来围观!

相关文章:

  • spring security 认证流程分析
  • 基于DeepSeek-V2的生物量智能反演系统—从光谱特征挖掘到三维生物量场重建
  • 3.27学习总结 算法题
  • pickle 文件是什么?
  • ngx_conf_parse
  • xpp3_min dowload (xpp_3的安装)
  • MySQL聚簇索引和非聚簇索引 通俗易懂
  • 【C++游戏引擎开发】第1周《线性代数》(1):环境配置与基础矩阵类设计
  • pyqt 信号与槽
  • 生物中心论
  • mysqlworkbench导入.sql文件
  • Linux应用:线程基础
  • MATLAB中iscell函数用法
  • 内嵌式触摸显示器在工业视觉设备中的应用
  • python策略模式
  • OpenBMC:BmcWeb 生效路由2 Trie字典树
  • 《Tr0ll2 靶机渗透实战:弱口令+SUID+两种缓冲区溢出+ 提权完整+fcrackzip暴力破解+shellshock漏洞+脏牛三种root提权复盘》
  • 企业级大模型微调
  • SAP-ABAP:SAP IDoc技术详解:架构、配置与实战
  • 若依专题——基础应用篇
  • 推广网站怎么做模版/站长推荐
  • 赣州北京网站建设/整合营销传播方案案例
  • 昆山企业网站制作公司/十大最靠谱培训机构
  • 做艺术品展览的网站/搜索引擎营销的主要模式有哪些
  • 制作一个.net网站需要/网络广告策划流程有哪些?
  • 保险网站建设公司/促销策略的四种方式