当前位置: 首页 > news >正文

Fluent Bit针对kafka心跳重连机制详解(下)

#作者:程宏斌

文章目录

    • disconnect
    • reconnect

接上篇:https://blog.csdn.net/qq_40477248/article/details/150957571?spm=1001.2014.3001.5501

disconnect

断开连接的情况主要是两种:
连接或传输过程中有错误发生
超时, 比如空闲时间超时

**
* Close and destroy a transport handle
*/
void rd_kafka_transport_close(rd_kafka_transport_t *rktrans) {
...// 清除接收缓冲区if (rktrans->rktrans_recv_buf)rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
...if (rktrans->rktrans_s != -1) // 自定义close或者socket.close()rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,rktrans->rktrans_s);
​rd_free(rktrans);
}
/**
* @brief Failure propagation to application.
*
* Will tear down connection to broker and trigger a reconnect.
*
* \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
* be debug-logged.
*
* @locality broker thread
*/
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,int level,rd_kafka_resp_err_t err,const char *fmt,...) {
...if (rkb->rkb_transport) {// close socketrd_kafka_transport_close(rkb->rkb_transport);rkb->rkb_transport = NULL;
...// 设置状态rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
}
​
/**
* @brief Check if connections.max.idle.ms has been exceeded and if so
*        close the connection.
* 空闲时间探查
* @remark Must only be called if connections.max.idle.ms > 0 and
*         the current broker state is UP (or UPDATE).
*
* @locality broker thread
*/
static RD_INLINE void rd_kafka_broker_idle_check(rd_kafka_broker_t *rkb) {
…// 连接空闲时间 是否超过 服务端最大空闲时间, 默认10分钟if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))return;// 超过, 服务端会断开连接; client保险起见, 强制关闭连接rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__TRANSPORT,"Connection max idle time exceeded ""(%dms since last activity)",idle_ms);

reconnect

连接失败时, 系统自动发起重连. 重连不会终止, 直到连接成功或者系统退出.
nodename更改时, 会尝试断开重连

/**
* @brief Update the reconnect backoff.
*        Should be called when a connection is made, or all addresses
*        a broker resolves to has been exhausted without successful connect.
* 设置更新重试时间
* @locality broker thread
* @locks none
*/
static void
rd_kafka_broker_update_reconnect_backoff(rd_kafka_broker_t *rkb,const rd_kafka_conf_t *conf,rd_ts_t now) {
…/* 重试时间(间隔)已超过最大限制时间reconnect.backoff.max.ms* 重置下次的重试时间. */if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) < now)rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
​/* 在区间[-25%, +50%]内随机取一个重试时间*/backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),(int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
​/* 不能超过reconnect.backoff.max.ms. */backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
​/* Set time of next reconnect */rkb->rkb_ts_reconnect         = now + (backoff * 1000);rkb->rkb_reconnect_backoff_ms = RD_MIN(rkb->rkb_reconnect_backoff_ms * 2, conf->reconnect_backoff_max_ms);
}
​
/**
* @brief Calculate time until next reconnect attempt.
*
* @returns the number of milliseconds to the next connection attempt, or 0
*          if immediate.
* @locality broker thread
* @locks none
*/
// 计算距离下次重试的时间间隔
static RD_INLINE int
rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
…remains = rkb->rkb_ts_reconnect - now;
…
}
​
static int rd_kafka_broker_thread_main(void *arg) {
...switch (rkb->rkb_state) {
...case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
.../* Throttle & jitter reconnects to avoid* thundering horde of reconnecting clients after* a broker / network outage. Issue #403 */backoff =rd_kafka_broker_reconnect_backoff(rkb, rd_clock());if (backoff > 0) {rd_rkb_dbg(rkb, BROKER, "RECONNECT","Delaying next reconnect by %dms",backoff);rd_kafka_broker_serve(rkb, (int)backoff);continue;}
...case RD_KAFKA_BROKER_STATE_CONNECT:case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:case RD_KAFKA_BROKER_STATE_AUTH_REQ:case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:/* Asynchronous connect in progress. */rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
​/* Connect failure.* Try the next resolve result until we've* tried them all, in which case we back off the next* connection attempt to avoid busy looping. */if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN &&rd_kafka_broker_addresses_exhausted(rkb))rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf, rd_clock());/* If we haven't made progress from the last state, and* if we have exceeded* socket_connection_setup_timeout_ms, then error out.* Don't error out in case this is a reauth, for which* socket_connection_setup_timeout_ms is not* applicable. */else if (rkb->rkb_state == orig_state &&!rkb->rkb_reauth_in_progress &&rd_clock() >=(rkb->rkb_ts_connect +(rd_ts_t)rk->rk_conf.socket_connection_setup_timeout_ms *1000))rd_kafka_broker_fail(rkb, LOG_WARNING,RD_KAFKA_RESP_ERR__TRANSPORT,"Connection setup timed out in state %s",rd_kafka_broker_state_names[rkb->rkb_state]);
​break;
…
}
​
/**
* @brief Update the nodename (address) of broker \p rkb
*        with the nodename from broker \p from_rkb (may be NULL).
*
*        If \p rkb is connected, the connection will be torn down.
*        A new connection may be attempted to the new address
*        if a persistent connection is needed (standard connection rules).
*
*        The broker's logname is also updated to include \p from_rkb's
*        broker id.
*
* @param from_rkb Use the nodename from this broker. If NULL, clear
*                 the \p rkb nodename.
*
* @remark Must only be called for logical brokers.
*
* @locks none
*/
void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,rd_kafka_broker_t *from_rkb) {
…// nodename已更改过, 需要触发断线和重连/* Trigger a disconnect & reconnect */rd_kafka_broker_schedule_connection(rkb);
}
http://www.dtcms.com/a/354709.html

相关文章:

  • KubeBlocks For MySQL 云原生设计分享
  • Logstash数据迁移之mysql-to-kafka.conf详细配置
  • 卷积神经网络(CNN)搭建详解
  • 区块链+隐私计算护航“东数西算”数据安全报告
  • AppScan扫描电脑上的客户端,C/S架构客户端等
  • 深度学习----卷积神经网络实现数字识别
  • RAW API 的 TCP 总结2
  • 数据结构8---排序
  • 鸿蒙OS与Rust整合开发流程
  • 【边缘计算】RK3576算力评估
  • 排序(Sort)方法详解(冒泡、插入、希尔、选择、堆、快速、归并)
  • 详细介绍Linux 内存管理 struct page数据结构中有一个锁,请问trylock_page()和lock_page()有什么区别?
  • 开源工具新玩法:cpolar提升Penpot协作流畅度
  • 8.28日QT
  • 分布式锁过期危机:4大续命方案拯救超时任务
  • 2025年机械工程与机器人国际研讨会(CMER2025)
  • PAT 1086 Tree Traversals Again
  • React 动画库
  • 2025.8.28总结
  • Docker Swarm vs Kubernetes vs Nomad:容器编排方案对比与选型建议
  • GitHub宕机自救指南技术文章大纲
  • 图论基础篇
  • Oracle 数据库权限管理的艺术:从入门到精通
  • 【第四章】BS 架构测试全解析:从功能验证到问题定位​
  • @HAProxy 介绍部署使用
  • DM LSN 与 Oracle SCN 对比
  • UNIX网络编程笔记:共享内存区和远程过程调用
  • 机器学习基本概述
  • 小白入门:支持深度学习的视觉数据库管理系统
  • 神经网络为何能 “学习”?从神经元到深度学习模型的层级结构解析