Fluent Bit针对kafka心跳重连机制详解(上)
#作者:程宏斌
文章目录
- 测试方案
- 测试日志汇总
- 测试总结
- 代码详解
心跳重连验证测试
测试方案
当我们进行测试时,按照以下步骤操作:
- 首先启动Fluent Bit,并记录在数据正常采集到Kafka后,Fluent Bit debug日志的内容。
- 停止Kafka服务后,发送日志并观察Fluent Bit debug日志的变化。特别注意观察在采集日志时是否有尝试请求Kafka服务。
- 再次启动Kafka服务,继续观察Fluent Bit debug日志的变化。注意检查之前存储的日志是否重新发送,并确认是否自动重新连接到Kafka。
- 这样的测试可以帮助我们验证Fluent Bit在Kafka服务可用性变化时的表现和自动处理能力。
测试日志汇总
- 下面是针对kafka断开重连日志汇总,详细的地方有#号开头的注解,请仔细阅读注解。
# 下面两个coro_id是正常的日志采集流程
[2024/06/13 11:31:58] [debug] in produce_message
[2024/06/13 11:31:58] [debug] [output:kafka:kafka.0] enqueued message (466 bytes) for topic 'testlog'
[2024/06/13 11:31:58] [debug] [out flush] cb_destroy coro_id=0
[2024/06/13 11:32:20] [debug] in produce_message
[2024/06/13 11:32:20] [debug] [output:kafka:kafka.0] enqueued message (466 bytes) for topic 'testlog'
[2024/06/13 11:32:20] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:32:20] [debug] [out flush] cb_destroy coro_id=1# 此处停止kafka服务,模拟kafka故障,会发现有明显的Disconnected关键字。此时开始fluent-bit针对kafka的自检代码开始运行。
[2024/06/13 11:32:47] [info] 192.168.123.50:9092/bootstrap: Disconnected (after 50404ms in state UP)
[2024/06/13 11:32:47] [info] 961ddde63cfb:9092/0: Disconnected (after 49401ms in state UP)
[2024/06/13 11:32:47] [error] 2/2 brokers are down
[2024/06/13 11:32:47] [error] Connect to ipv4#192.168.128.2:9092 failed: Connection refused
[2024/06/13 11:32:49] [error] Failed to resolve '961ddde63cfb:9092': Name or service not known
[2024/06/13 11:32:49] [warn] Metadata request failed: broker down: Local: Host resolution failure
[2024/06/13 11:32:49] [error] Connect to ipv4#192.168.123.50:9092 failed: Connection refused# 当Kafka服务发生故障时,Fluent Bit在采集日志后会自动尝试重新连接到Kafka。
[2024/06/13 11:33:18] [error] Connect to 192.168.123.50:9092 failed: Connection refused
[2024/06/13 11:33:24] [error] Failed to resolve '961ddde63cfb:9092'
[2024/06/13 11:34:02] [error] Failed to resolve '961ddde63cfb:9092'
[2024/06/13 11:34:18] [error] Connect to 192.168.123.50:9092 failed: Connection refused
[2024/06/13 11:34:21] [error] Connect to ipv4#192.168.128.2:9092 failed: Connection refused# 在启动 Kafka 后,之前未成功发送的历史消息将会随着下一批日志数据一同发送到 Kafka,并显示为 "message delivered"。
[2024/06/13 11:34:33] [debug] in produce_message
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] enqueued message (466 bytes) for topic 'testlog'
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:33] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
[2024/06/13 11:34:45] [debug] [output:kafka:kafka.0] message delivered (466 bytes, partition 0)
测试总结
这里是针对上诉日志结果的一个总结:
在停止 Kafka 服务后,模拟 Kafka 故障会导致在日志中出现明显的 “Disconnected” 关键字。 fluent-bit 针对 Kafka 的自检代码开始运行。
当 Kafka 服务发生故障时,Fluent Bit 在采集日志后会自动尝试重新连接到 Kafka。在 Fluent Bit 的 debug 日志中,coro_id=6 表示第七条消息的发送(计数从 0 开始)。在前两条消息成功发送后,从第三到第七条每条日志都会触发一次 Kafka 请求。
在启动 Kafka 后,之前未成功发送的历史消息将会随着下一批日志数据一同发送到 Kafka,并显示为 “message delivered”。
代码详解
fluent-bit kafka reconnect逻辑:
1.
代码位置: fluent-bit/lib/librdkafka-2.3.0/src
librdkafka本质上是一个状态机: state+op构成eventloop循环;
terminate
// ./lib/librdkafka-2.3.0/src/rdkafka_int.h
/**
* @returns true if \p rk handle is terminating.
* 判断rdkafka是否停止服务, 如果已经停止, 则eventloop不再循环
*/
#define rd_kafka_terminating(rk) \(rd_atomic32_get(&(rk)->rk_terminate) & RD_KAFKA_DESTROY_F_TERMINATE)
// ./lib/librdkafka-2.3.0/src/rdkafka.c
// producer/consumer退出时, 设置关闭标记
static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) {
...rd_atomic32_set(&rk->rk_terminate,flags | RD_KAFKA_DESTROY_F_TERMINATE);
...
}
connect
// ./lib/librdkafka-2.3.0/src/rdkafka_transport.c
// 管理连接的线程
// state == RD_KAFKA_BROKER_STATE_TRY_CONNECT时, 尝试进行连接
static int rd_kafka_broker_thread_main(void *arg) {
…while (!rd_kafka_broker_terminating(rkb)) {
…switch (rkb->rkb_state) {
…case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
…/* Initiate asynchronous connection attempt.* Only the host lookup is blocking here. */r = rd_kafka_broker_connect(rkb);
…
/**
* Initiate asynchronous connection attempt.
* 初始化一个异步连接
* Locality: broker thread
*/
rd_kafka_transport_t *rd_kafka_transport_connect(rd_kafka_broker_t *rkb,const rd_sockaddr_inx_t *sinx,char *errstr,size_t errstr_size) {
…// 创建socket, 默认linux的socket函数s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family, SOCK_STREAM,IPPROTO_TCP,rkb->rkb_rk->rk_conf.opaque);
…/* Connect to broker */if (rkb->rkb_rk->rk_conf.connect_cb) {// 使用自定义的connectrd_kafka_broker_lock(rkb); /* for rkb_nodename */r = rkb->rkb_rk->rk_conf.connect_cb(s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);rd_kafka_broker_unlock(rkb);} else {// 使用标准api的connectif (connect(s, (struct sockaddr *)sinx,RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR &&(rd_socket_errno != EINPROGRESS
#ifdef _WIN32&& rd_socket_errno != WSAEWOULDBLOCK
#endif))r = rd_socket_errno;elser = 0;}
…
// ./lib/librdkafka-2.3.0/src/rdposix.h
/** @brief Last socket error */
#define rd_socket_errno errno
// ./lib/librdkafka-2.3.0/src/rdkafka_broker.c
/**
* @brief Initiate asynchronous connection attempt to the next address
* in the broker's address list.
* While the connect is asynchronous and its IO served in the
* CONNECT state, the initial name resolve is blocking.
*
* @returns -1 on error, 0 if broker does not have a hostname, or 1
* if the connection is now in progress.
*/
static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
…if (*nodename) // 有name, 预先设置为连接成功状态rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECT);
…// 设置/更新重连时间rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf,rd_clock());
…// 尝试连接if (!(rkb->rkb_transport = rd_kafka_transport_connect(rkb, sinx, errstr, sizeof(errstr)))) {rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,"%s", errstr);return -1;}
rkb->rkb_ts_connect = rd_clock();
return 1;
}