当前位置: 首页 > news >正文

flink的多种部署模式

## 部署模式和运行模式
### 部署模式
- 本地local
    - 单机无需分布式资源管理
- 集群
    - 独立集群standalone
        - 需要flink自身的任务管理工具
            - jobmanager接收和调度任务
            - taskmanager执行
    - on其他资源管理工具yarn/k8s
        - yarn
            - 注意区分flink的和yarn的taskmanager
### 运行模式
- session
    - 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。 集群启动时所有资源就都已经确定,所有提交的作业会竞争集群中的资源。
    - 适合任务规模小,执行时间短的大量作业。
    - 作业执行环境会一直保留在集群上,直到会话被人为终止。
- per-job
    -  每次提交的job都会创建一个独立的作业执行环境,该作业执行环境仅用于通过客户端提交上来的特定的那个作业。
    - 适合执行时间长的少量作业
    - 当作业完成后,作业执行环境会被自动释放,集群关闭,资源释放
- application-mode
    - 为解决客户端执行代码,客户端提交任务给jobmanager占用带宽而生
        
### 不需要外部资源管理
- per job-一个任务(作业)起一个集群 (提交作业后建立集群)
- standalone-一个集群跑多个任务,手动分配集群整个的资源,启动起来就已经固定了。任务从集群中取得的资源是否能调整?
### 需要外部资源管理
- application-一个任务(作业/应用(同时提交、有依赖关系的多个作业))起一个集群 (提交作业后建立集群)
- session-一个集群跑多个任务,yarn根据配置生成集群?并且将集群里各个任务对资源的使用情况来分配资源?

## 分层API
- sql 与tableAPI类似,以sql查询表达式的形式表现程序
- table API 以表为中心(处理结构化数据)
    - 遵循关系模型(像关系型数据库中的表)
    - 提供可比较操作(select\project\join\group by\aggregate)
    - 通常以方法链的形式调用(提供了一个声明式的接口来处理批处理和流处理任务),语法上更接近于编程语言
- sparkstreaming API 封装处理函数,提供通用模块:
    - 转换 transformations(map\flatmap)
    - 连接 joins
    - 聚合 aggregations
    - 窗口 windows
- 有状态的流处理 处理函数

## 运行架构(standalone会话模式)

- jobManager 作业管理器控制执行应用(管理应用里的任务执行和调度)
    - jobmaster 负责处理单独的作业(job),等同于早期flink版本中的jobmanager
        - 接收要执行的应用
        - jobGraph->excutionGraph(包含了所有可并发执行的任务
        - 向resourceManager发送请求,申请执行所需资源
        - 获取足够资源后,分发执行图到运行它们的taskmanager上
        -  运行过程中,负责所有需要中央协调的操作(checkpoints的协调)
    - resourceManager 资源管理器
        - 资源(taskManager的任务槽task slots)的分配和管理
        - 任务槽-资源调配的最小单元,包含了机器用来执行计算的一组CPU和内存
        - 每个任务都要分配到一个slot上执行
        - 注意区分flink和yarn的resourceManager
    - dispatcher 分发器
        - 提供一个reset接口,用来提交应用
        - 为每一个新提交的作业启动一个新的jobMaster组件
        - 启动Web UI
        - 非必需架构,有些部署模式下会被省掉(本地模式,一个JVM,不涉及集群部署,所以不需要分发器;kurbernets有自己的controller控制器管理Pod的生命周期,所以替代分发器的部分功能)。
- taskManager任务管理器
    - 是flink的工作进程
        - 对数据流做具体的计算
        - 集群中至少有一个
        - 每个包含了一定量的任务槽
        - 任务槽的数量限制taskManager处理任务的并行数
    - 启动后,先向资源管理器注册它的slots
        - 收到资源管理器的反馈指令后,将至少一个槽位提供给jobMaster调用
        - jobMaster来分配任务
        - taskManager可以缓冲数据,并和运行同一应用的taskManager交换数据


## 并行度和任务槽
- 基本概念
    - 算子 是Flink数据处理的基本单元,每个算子负责执行特定的任务。通过组合不同的算子,可以构建复杂的数据处理逻辑。常见的算子包括 Map、Filter、FlatMap、KeyBy、Reduce、Window、Join、Union 和 Sink 、source等。
        - source \ sink 类似于datax从哪个地址获取数据输送给哪个地址
        - map 一对一 类型转换、四则运算 对数据流中的每个元素应用一个函数,生成一个新的元素
        - flatMap 一对多
        - keyby 分组
        - reduce 聚合
    -  并行子任务 
        - 一个算子任务被拆分成多个并行子任务,再分发到不同节点,实现并行计算。
        - 如果处理的数据量大,把一个算子操作复制到多个节点,数据来了后可以到任意一个节点执行。
            - 并行是将一个大任务分给两个人同时做
                - 比如一个平台上的不同大屏,两个人同时做。这两个人都做过这个项目下的大屏,再来了新的数据新的大屏任务,两个人都能做。(两个人间可能有信息不同步问题,但是算子操作是复制粘贴的,一定同步,所以给哪个节点做都行)
            - 并发是一个人同时做好几件事
    - 并行度 
        - 一个特定算子的子任务的个数
        - 一段流处理程序的并行度== 所有算子中最大的并行度
        - 设置
            - #算子层面并行度设置  考虑到动态扩容,通常使用单个算子设置并行度stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
            - 全局层面设置 env.setParallelism(2);
            - 配置文件设置 
                - 集群 flink-conf.yaml 默认1
                - 开发环境无配置文件  默认为当前机器CPU核数
    - 并行数据流 
        - 包含并行子任务的数据流 
        - 需要多个分区来分配并行任务
## 算子链

- 算子间的数据传输
    - 一对一
    - 充分区 类似于shuffle
- 合并算子链 
    - 一对一+算子并行度相同

## 任务槽
- 每个takManager是一个JVM进程,可以启动多个独立线程,并行执行多个子任务
- 计算资源有限,并行任务越多,每个线程资源越少。

```

public class RealTimeWordCountFromPGToMySQL {

    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(2); // 设置全局并行度为2

        // 定义 PostgreSQL CDC 源
        Properties props = new Properties();
        props.setProperty("plugin.name", "pgoutput"); // PostgreSQL 的逻辑解码插件
        props.setProperty("database.hostname", "localhost");
        props.setProperty("database.port", "5432");
        props.setProperty("database.username", "yourusername");
        props.setProperty("database.password", "yourpassword");
        props.setProperty("database.server.id", "1888");
        props.setProperty("database.server.name", "dbserver1");
        props.setProperty("table.whitelist", "public.your_table");

        DataStream<String> source = PostgreSQLSource.<String>builder()
            .hostname(props.getProperty("database.hostname"))
            .port(Integer.parseInt(props.getProperty("database.port")))
            .database(props.getProperty("database.server.name"))
            .tableList(props.getProperty("table.whitelist"))
            .username(props.getProperty("database.username"))
            .password(props.getProperty("database.password"))
            .deserializer(new DebeziumDeserializationSchema<String>() {
                @Override
                public String deserialize(ChangeRecord changeRecord) {
                    if (changeRecord instanceof DataChangeRecord) {
                        DataChangeRecord dataChangeRecord = (DataChangeRecord) changeRecord;
                        return dataChangeRecord.after().getField(0).toString();
                    }
                    return null;
                }

                @Override
                public boolean isEndOfStream(String nextElement) {
                    return false;
                }

                @Override
                public TypeInformation<String> getProducedType() {
                    return Types.STRING();
                }
            })
            .build()
            .addSource();

        // 使用 flatMap 算子将文本拆分为单词
        DataStream<Tuple2<String, Integer>> words = source
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    for (String word : line.split(" ")) {
                        out.collect(new Tuple2<>(word, 1));
                    }
                }
            });

        // 使用 keyBy 算子按单词分组
        DataStream<Tuple2<String, Integer>> wordCounts = words
            .keyBy(value -> value.f0)
            .sum(1);

        // 定义 MySQL 数据目标
        JdbcStatementBuilder<Tuple2<String, Integer>> statementBuilder = (ps, t) -> {
            ps.setString(1, t.f0);
            ps.setInt(2, t.f1);
        };

        JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withUrl("jdbc:mysql://localhost:3306/yourdb")
            .withDriverName("com.mysql.cj.jdbc.Driver")
            .withUsername("yourusername")
            .withPassword("yourpassword")
            .build();

        wordCounts.addSink(JdbcSink.sink(
            "INSERT INTO word_count (word, count) VALUES (?, ?)",
            statementBuilder,
            jdbcOptions
        ));

        // 执行任务
        env.execute("Real-Time Word Count from PG to MySQL Example");
    }
}

```
 

相关文章:

  • Kaggle-Plant Seedlings Classification-(多分类+CNN+图形处理)
  • 解决“在EFI系统上,Windows只能安装到GPT磁盘“错误
  • DINO-R1:激励推理能力的视觉基础模型
  • 最简单的方式突破远程桌面封锁
  • 算法导论第十九章 并行算法:解锁计算新维度
  • Matplotlib 绘图库使用技巧介绍
  • Python 数据分析与可视化 Day 1 - Pandas 数据分析基础入门
  • day037-openssh服务与http协议
  • 视频相似度检测算法(带课设报告)
  • GNSS介绍
  • 【AI编程】第3期,针对AI生成的改枪码列表创建对应的数据库表
  • 50-Oracle awr报告生成-实操
  • Promise静态方法 race
  • LangGraph--结构化输出(.with_structured_output() 方法)
  • 【论文笔记】【强化微调】AgentThink:思维链推理 + 工具调用
  • 高度雾实时渲染~轻松营造GIS场景真实感
  • 2025虚幻引擎文件与文件夹命名规律
  • ssh 服务和 rsync 数据同步
  • MFC中使用CRichEditCtrl控件让文本框中的内容部分加粗
  • 面试第三期
  • 企业营销网站建设/批量查询神马关键词排名
  • WordPress电影网站源码/推广策略可以分为哪三种
  • 如何法院网站建设/百度推广登陆
  • 运营公开网是什么网站/t和p在一起怎么做网站
  • 网站如何做的看起来高大上/广西壮族自治区免费百度推广
  • 技校软件开发专业/seo免费优化