当前位置: 首页 > news >正文

flink广播算子Broadcast

文章目录

  • 一、Broadcast
  • 二、代码示例
  • 三.或者第二种(只读取一个csv文件到广播内存中)


提示:以下是本篇文章正文内容,下面案例可供参考

一、Broadcast

为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),我们可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream,具有类型方法 process(),传入一个特殊的 CoProcessFunction 来书写我们的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型:

  • 如果流是一个 keyed 流,那就是 KeyedBroadcastProcessFunction 类型;
  • 如果流是一个 non-keyed 流,那就是 BroadcastProcessFunction 类型。

1).例如非keyby的要实现两个方法

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
   
    //主流 
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
   //广播操作
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

2).keyby的

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
   
    //主流
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    //广播
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    //只有keyby的可以onTimer。此方法可以不重写
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的。 这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。
广播算子是不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State。

二、代码示例

此处将本地csv文件加载到内存广播中
CSV文件的内容是:
1.user_details.csv

1,Alice,30
2,Bob,25

2.user_details03.csv

3,Charlie,35
5,name,5

下面是代码(下面是将两个本地CSV文件放到广播内存中案例)

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;
public <

相关文章:

  • Docker一键部署OpenObserve打造低成本的云原生观测平台操作详解
  • 操作系统知识点35
  • 基于MLA的人类语音情感分类
  • centos 7 部署FTP 服务用shell 脚本搭建
  • Git的安装
  • 代理IP与AI的碰撞:网络安全新防线解码
  • 【Java全栈进阶架构师实战:从设计模式到SpringCloudAlibaba,打造高可用系统】
  • 爬虫逆向解决debugger问题
  • 【QA】QT事件处理流程是怎么样的?
  • 如何理解前端工程化
  • 蓝桥杯备考:差分算法之 语文成绩
  • 信号的产生和保存
  • Netty源码—5.Pipeline和Handler一
  • Vue3+vite项目 使用require 解决 ReferenceError: require is not defined 报错问题
  • CodeBrick笔记
  • Jenkins集成Trivy安全漏洞检查指南
  • venv 和 conda 哪个更适合管理python虚拟环境
  • 【C#】将数字转换为中文,如123转换为一百二十三
  • 卷积神经网络 - 参数学习
  • NVIDIA V100显卡支持Tensor Core技术,而Granite-3.1-8B模型在适当的条件下可以利用Tensor Core来加速数据处理
  • 国台办:提醒相关人员不要假借去第三地名义绕道赴台
  • 中国人民解放军南部战区位南海海域进行例行巡航
  • 烟花、美食和购物优惠都安排上了,上海多区开启热闹模式
  • 自称“最美”通缉犯出狱当主播?央广网:三观怎能跟着“五官”跑
  • 玉渊谭天丨中方减少美国农产品进口后,舟山港陆续出现巴西大豆船
  • 商超展销延长、专区专柜亮相……上海“外贸拓内销”商品与市民见面