Fluent Bit针对kafka心跳重连机制详解(下)
#作者:程宏斌
文章目录
- disconnect
- reconnect
接上篇:https://blog.csdn.net/qq_40477248/article/details/150957571?spm=1001.2014.3001.5501
disconnect
断开连接的情况主要是两种:
连接或传输过程中有错误发生
超时, 比如空闲时间超时
**
* Close and destroy a transport handle
*/
void rd_kafka_transport_close(rd_kafka_transport_t *rktrans) {
...// 清除接收缓冲区if (rktrans->rktrans_recv_buf)rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
...if (rktrans->rktrans_s != -1) // 自定义close或者socket.close()rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,rktrans->rktrans_s);
rd_free(rktrans);
}
/**
* @brief Failure propagation to application.
*
* Will tear down connection to broker and trigger a reconnect.
*
* \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
* be debug-logged.
*
* @locality broker thread
*/
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,int level,rd_kafka_resp_err_t err,const char *fmt,...) {
...if (rkb->rkb_transport) {// close socketrd_kafka_transport_close(rkb->rkb_transport);rkb->rkb_transport = NULL;
...// 设置状态rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
}
/**
* @brief Check if connections.max.idle.ms has been exceeded and if so
* close the connection.
* 空闲时间探查
* @remark Must only be called if connections.max.idle.ms > 0 and
* the current broker state is UP (or UPDATE).
*
* @locality broker thread
*/
static RD_INLINE void rd_kafka_broker_idle_check(rd_kafka_broker_t *rkb) {
…// 连接空闲时间 是否超过 服务端最大空闲时间, 默认10分钟if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))return;// 超过, 服务端会断开连接; client保险起见, 强制关闭连接rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__TRANSPORT,"Connection max idle time exceeded ""(%dms since last activity)",idle_ms);
reconnect
连接失败时, 系统自动发起重连. 重连不会终止, 直到连接成功或者系统退出.
nodename更改时, 会尝试断开重连
/**
* @brief Update the reconnect backoff.
* Should be called when a connection is made, or all addresses
* a broker resolves to has been exhausted without successful connect.
* 设置更新重试时间
* @locality broker thread
* @locks none
*/
static void
rd_kafka_broker_update_reconnect_backoff(rd_kafka_broker_t *rkb,const rd_kafka_conf_t *conf,rd_ts_t now) {
…/* 重试时间(间隔)已超过最大限制时间reconnect.backoff.max.ms* 重置下次的重试时间. */if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) < now)rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
/* 在区间[-25%, +50%]内随机取一个重试时间*/backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),(int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
/* 不能超过reconnect.backoff.max.ms. */backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
/* Set time of next reconnect */rkb->rkb_ts_reconnect = now + (backoff * 1000);rkb->rkb_reconnect_backoff_ms = RD_MIN(rkb->rkb_reconnect_backoff_ms * 2, conf->reconnect_backoff_max_ms);
}
/**
* @brief Calculate time until next reconnect attempt.
*
* @returns the number of milliseconds to the next connection attempt, or 0
* if immediate.
* @locality broker thread
* @locks none
*/
// 计算距离下次重试的时间间隔
static RD_INLINE int
rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
…remains = rkb->rkb_ts_reconnect - now;
…
}
static int rd_kafka_broker_thread_main(void *arg) {
...switch (rkb->rkb_state) {
...case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
.../* Throttle & jitter reconnects to avoid* thundering horde of reconnecting clients after* a broker / network outage. Issue #403 */backoff =rd_kafka_broker_reconnect_backoff(rkb, rd_clock());if (backoff > 0) {rd_rkb_dbg(rkb, BROKER, "RECONNECT","Delaying next reconnect by %dms",backoff);rd_kafka_broker_serve(rkb, (int)backoff);continue;}
...case RD_KAFKA_BROKER_STATE_CONNECT:case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:case RD_KAFKA_BROKER_STATE_AUTH_REQ:case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:/* Asynchronous connect in progress. */rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
/* Connect failure.* Try the next resolve result until we've* tried them all, in which case we back off the next* connection attempt to avoid busy looping. */if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN &&rd_kafka_broker_addresses_exhausted(rkb))rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf, rd_clock());/* If we haven't made progress from the last state, and* if we have exceeded* socket_connection_setup_timeout_ms, then error out.* Don't error out in case this is a reauth, for which* socket_connection_setup_timeout_ms is not* applicable. */else if (rkb->rkb_state == orig_state &&!rkb->rkb_reauth_in_progress &&rd_clock() >=(rkb->rkb_ts_connect +(rd_ts_t)rk->rk_conf.socket_connection_setup_timeout_ms *1000))rd_kafka_broker_fail(rkb, LOG_WARNING,RD_KAFKA_RESP_ERR__TRANSPORT,"Connection setup timed out in state %s",rd_kafka_broker_state_names[rkb->rkb_state]);
break;
…
}
/**
* @brief Update the nodename (address) of broker \p rkb
* with the nodename from broker \p from_rkb (may be NULL).
*
* If \p rkb is connected, the connection will be torn down.
* A new connection may be attempted to the new address
* if a persistent connection is needed (standard connection rules).
*
* The broker's logname is also updated to include \p from_rkb's
* broker id.
*
* @param from_rkb Use the nodename from this broker. If NULL, clear
* the \p rkb nodename.
*
* @remark Must only be called for logical brokers.
*
* @locks none
*/
void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,rd_kafka_broker_t *from_rkb) {
…// nodename已更改过, 需要触发断线和重连/* Trigger a disconnect & reconnect */rd_kafka_broker_schedule_connection(rkb);
}