当前位置: 首页 > news >正文

flink keyby使用与总结 基础片段梳理

文章目录

  • 一、KeyBy的源码分析


提示:以下是本篇文章正文内容,下面案例可供参考

一、KeyBy的源码分析

总结:
保证key相同的一定进入到一个分区内,但是一个分区内可以有多key的数据;
是对数据进行实时的分区,不是上游发送给下游,而是将数据写入到对应的channel的缓存中,下游到上游实时拉取;
keyBy底层是new KeyedStream,然后将父DataStream包起来,并且传入keyBy的条件(keySelector);
最终会调用KeyGroupStreamPartitioner的selectChannel方法,将keyBy的条件的返回值传入到该方法中;
流程:
1.先计算key的HashCode值(有可能会是负的)
2将key的HashCode值进行特殊的hash处理,MathUtils.murmurHash(keyHash),一定返回正数,避免返回的数字为负
3.将返回特特殊的hash值模除以默认最大并行的,默认是128,得到keyGroupId
4.keyGroupId * parallelism(程序的并行度) / maxParallelism(默认最大并行),返回分区编号
注意:1.如果将自定义POJO当成key,必须重新hashcode方法,2.不能将数组当成keyBy的key

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {Preconditions.checkNotNull(key);return new KeyedStream<>(this, clean(key));}public KeyedStream(DataStream<T> dataStream,KeySelector<T, KEY> keySelector,TypeInformation<KEY> keyType) {this(dataStream,new PartitionTransformation<>(dataStream.getTransformation(),new KeyGrouppublic KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");this.keySelector = Preconditions.checkNotNull(keySelector);this.maxParallelism = maxParallelism;}public int getMaxParallelism() {return maxParallelism;}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}StreamPartitioner<>(keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),keySelector,keyType);}public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}// 将key的HashCode值进行特殊的hash处理,MathUtils.murmurHash(keyHash),一定返回正数,避免返回的数字为负
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}
// keyGroupId * parallelism(程序的并行度) / maxParallelism(默认最大并行),返回分区编号
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}
http://www.dtcms.com/a/456582.html

相关文章:

  • flink UTDF函数
  • 乐陵网站开发贾汪区建设局网站
  • VS安装EEPlus库及解决[弃用的]CS0618问题
  • 《算法闯关指南:优选算法--滑动窗口》--15.串联所有单词的子串,16.最小覆盖子串
  • 行驶证识别技术通过OCR和AI实现信息自动化采集与处理,涵盖图像预处理、文字识别及结构化校验,提升效率与准确性
  • 第十七篇:数组与链表:结构特性、操作与经典题目
  • 营销型网站的优点深圳推广系统
  • 攻防世界-Web-easyupload
  • 符号主义对自然语言处理深层语义分析的影响与启示
  • 高要区住房和城乡建设局网站西安建设市场信息平台
  • 新手可以做网站营运吗成都手机模板建站
  • 成都市做网站的公司网站开发客户端
  • 农业机械网站模板网站建设图片像素是多大的
  • 素材网站php程序源码公司简介介绍
  • 网站规划的一般步骤建设企业网站包含什么
  • 奉贤做网站建设wordpress访问很慢吗?
  • 大学生网站建设开题报告免费交流网站建设
  • 邢台做网站优化哪儿好怎样建网站
  • 企业网站 asp php修改wordpress邮件
  • 宠物网站开发背景怎样自己制作效果图
  • 适合企业做外贸的几个网站商会网站模板
  • 遂溪网站开发公司建设网站需要哪些材料
  • 手机 网站开发软件ui培训班有用吗
  • php网站开发需求分析海南乐秀同城群软件下载
  • 佛山顺德网站设计公司关键词排名优化报价
  • 自己做网站怎么做的浙江网站建设公司名单
  • 网络推广什么做seo1短视频网页入口营销
  • 东莞公司网站搭建多少钱深圳品牌床垫有哪些
  • 网站广告动态图怎么做本机做网站如何访问
  • 万网买的网站备案淘宝刷单网站开发