当前位置: 首页 > news >正文

dede网站模版桂林注册公司

dede网站模版,桂林注册公司,辅助教学网站开发技术讨论,页面开发系列文章目录 文章目录系列文章目录一、作用和不足作用不足二、zk全部宕机后,kafka集群能正常收发消息吗?三、controller功能和如何选举四、kafka发送数据到client一、作用和不足 作用 2.8版本以前,kafka使用zk管理并存储kafka的关键元数据…

系列文章目录

文章目录

  • 系列文章目录
  • 一、作用和不足
    • 作用
    • 不足
  • 二、zk全部宕机后,kafka集群能正常收发消息吗?
  • 三、controller功能和如何选举
  • 四、kafka发送数据到client


一、作用和不足

作用

2.8版本以前,kafka使用zk管理并存储kafka的关键元数据,每个kafka集群都有broker作为控制器,是zk watcher 选举产生的。controller不仅像其他普通节点一样存储主体分区日志和处理消费生成数据请求,而且还维护着kafka集群元数据,比如broker id,主题分区,leader, isr信息。它将这些信息持久化到zk中。

0.9版本以前,controller通过单线程循环的方式,将更新后的元数据同步给其他broker节点,生产者和消费者都需要直接连接到zk集群进行发送和消费数据,现在这种方式已经被通过 bootstrap.servers连接的方式取代

不足

在一个健康的zk集群中,每时每刻只有一个健康的leader节点,zk必须保证半数以上的节点可用,才能保证zk集群可用。浪费资源。

zk和kafka是两个不同的系统,运维工作加倍。

controller节点存在单点故障。

controller宕机的话,新controller 需要从zk拉取所有分区的元数据信息,向每个broker节点发送leaderAndISR和updateMetaData请求,时间复杂度是O(num.partitions)。

broker宕机的话, 受控关机时间长,尤其是某个broker节点上分布的分区leader非常多时,需要重选leader,重新拉取元数据信息,分区越多重启时间越长,时间复杂度是O(partitions.leader)。

二、zk全部宕机后,kafka集群能正常收发消息吗?

能够正常收发消息。但是在kafka的server log里面,会报错:xxxx拒绝连接
原因是,kafka0.9以后,zk的功能被弱化,用来帮助kafka集群选举controller节点等基本操作,zk节点宕机后,kafka集群只是不能再发现新的broker节点加入了,但是不影响既有的节点收发日志。

三、controller功能和如何选举

  1. 分区 Leader 选举
    当某个 Broker 宕机或重新加入时,Controller 负责重新选举该 Broker 上的分区 Leader。
    确保每个分区都有一个 Leader 和多个 Follower(副本)。
  2. 分区分配与重平衡
    在集群扩容、Broker 故障恢复等情况下,Controller 会重新分配分区到不同的 Broker 上,以实现负载均衡。
    控制 Rebalance(再平衡)过程,确保所有副本同步。
  3. 元数据管理
    维护并更新集群的元数据信息,如:
    Topic 的分区数量
    分区的 Leader 和副本信息
    Broker 的状态(在线/离线)
    Controller 自己的状态(主/备)
  4. 监控 Broker 状态
    检测 Broker 是否正常运行。
    如果某个 Broker 失联,Controller 会触发相应的处理逻辑(如重新分配分区)。
  5. 控制副本同步
    确保副本之间的数据同步,防止数据丢失

集群的启动中,如何选出controller: 抢占机制

集群运行中,一些broker节点压力过大,导致controller反应慢,由于没有超时,zk无法判断是否下线,但是导致某些分区选主超时,可以删除controller临时节点,zk触发controller重新选主。健康的broker节点抢先注册成为controller, epoch任期号是2.
path: /controller path:/controller_epoch

value:{broker: 1} value:2

四、kafka发送数据到client

向broker节点传输数据是通过networkclient完成的。kafka客户端的networkclient类使用java nio 实现与broker节点通信,源代码可以看Selector类,

producer获取元数据信息,判断需要发送的topic分区位于哪个broker节点,selector和具体的broker节点创建一个socket 连接。this.nioSelector = java.nio.channels.Selector.open();

具体是如何连接的呢?看源码

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {//1.检查是否和Node重复建立连接,确保每个channel当下只能有一个Node连接ensureNotRegistered(id);//2.创建SocketChannel对象SocketChannel socketChannel = SocketChannel.open();SelectionKey key = null;try {/*** 3.配置SocketChannel对象* 将SocketChannel设置为非阻塞模式,keepAlive设置为true,TCP_NODELAY设置为true* 指定发送和接收缓冲区的大小(如果不是默认值)* */configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);/*** 4.确认连接是否真正建立* 因为是非阻塞模式,scoketChannel.connect()只是尝试发起一个连接请求,并不保证连接一定建立成功* connect()返回true表示连接已经建立成功,返回false表示连接请求已经发出但还没有建立成功* 因此boolean connected会立即返回连接状态,但是并一定是true* */boolean connected = doConnect(socketChannel, address);/*** 5.注册SocketChannel对象到Selector* 注册感兴趣的事件为OP_CONNECT,表示当连接建立成功时,Selector会收到通知* */key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);if (connected) {// OP_CONNECT won't trigger for immediately connected channelslog.debug("Immediately connected to node {}", id);//6.1 将key加入immediatelyConnectedKeys集合,表示该连接已经建立成功immediatelyConnectedKeys.add(key);//6.2 将key的感兴趣事件清空,因为连接已经建立成功,不需要再监听OP_CONNECT事件key.interestOps(0);}} catch (IOException | RuntimeException e) {if (key != null)immediatelyConnectedKeys.remove(key);channels.remove(id);socketChannel.close();throw e;}}

Selector类中还有一个poll()方法,获取准备就绪的SelectionKey,代表的是当前SocketChannel上已经就绪的时间,然后就可以 遍历这些SelectionKey来执行在interestSets中的兴趣操作
在KafkaProducer和KafkaConsumer中,负责与Broker通信的线程会重复调用NetworkClient的poll(), 在poll()中,会执行selector的select()方法,并以IO多路复用的方式处理与Broker之间的通信

 if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {//获取所有准备就绪的SelectionKeySet<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

sender子线程将封装好的发送请求缓存在InFlightRequests中,然后由NIO请求将消息发送到broker
在这里插入图片描述

InFlightRequests中的一个Deque中可以缓存多个sender创建好的ProduceRequest。“飞行中的请求”指的是请求发出去,尚未收到broker端的响应
其中Deque大小由参数max.in.flight.requests.per.connection决定,默认是5.如果将幂等性参数设置为true,broker会按照请求的先后顺序处理produceRequest;反之如果设置为false,只能将max.in.flight.requests.per.connection设置为1,才能保证顺序发送消息

http://www.dtcms.com/a/480912.html

相关文章:

  • 蒲江网站建设山西网络推广
  • 网站建设哪里好 厦门wordpress 支持手机6
  • 建设网站教程视频视频下载商业网站的后缀一般为
  • 大淘客怎么做网站网站未备案wordpress链接
  • 建设部的网站首页手机网店开店网站
  • 知网网站开发推广seo优化公司
  • 南京网站开发荐南京乐识网站后台模板免费
  • 网站建设专家评审意见wamp网站开发
  • 网站深圳优化建设wordpress调用文章描述
  • 网站受攻击高安做网站
  • 网站制作中动态展示怎么做百度高级检索入口
  • 网站管理的主要内容网站开发的硬件设备
  • 马鞍山制作网站中山品牌网站建设报价
  • 一个网站两个域名展位设计
  • 网站空间到期影响产品推广软文300字
  • 网站建设费用明细广东做淘宝的都在哪里网站
  • 大龄网站开发人员南京网站建设小程序
  • 工程建设网站怎么提交设计制作实践活动
  • 湖州网站建设策划wordpress评论怎么去掉网址
  • 合肥大型网站设计公司赣州做网站设计找哪家
  • wordpress小工具没有关键词优化排名软件推荐
  • 网站管理和建设工作职责景观设计网站大全
  • 网站怎么样排名最权威的公文写作网站
  • 网站开发招标文件找建筑网官网
  • 专业建站团队夜夜做新郎网站在线视频
  • 万网制作淘宝客网站wordpress文件夹权限设置方法
  • 网站建设服务采购方案模板合肥网站建设市场
  • 做网站运营的股票莱芜市城乡建设局网站首页
  • 企业电子商务网站建设规划报告培训机构哪家最好
  • 防城港网站设计公司加油站网站大全