Redis集群主从切换源码解读
一切的开始
打开Redis5.0.5
的源码中server.c
,找到如下代码,这里运行了一个定时任务,每隔100毫秒执行一次。
/* Run the Redis Cluster cron. */
/*
* 每隔100毫秒执行一次
* 要求开启集群模式
*/
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
clusterCron定时任务中做了些什么
我们可以找到cluster.c
源码文件,找到其中的clusterCron()
定时任务,其详细注释的源码已经贴在下文了。在这个定时文件中,需要特别关注以下几点:
- 定时任务大约每100毫秒执行一次,每秒执行10次。
- 遍历集群所有节点,如果其他节点没有与本节点建立有效的连接(link),则尝试重新建立连接,连接建立后立即发送PING命令。
- 每执行 10 次(至少间隔一秒钟,仅此该随机发送PING消息间隔1秒),就向一个随机节点发送 PING 信息。随机选择指的是选出 5 个随机节点,并从中选择最近一次接收 PONG 回复距离现在时间最久的一个节点。
- 每隔100毫秒进行主观下线检测,如果等待某个节点回复PONG的时间超过
cluster-node-timeout
时间的一半,则会重新连接链接。如果重启连接成功则立即向该节点发送PING消息。如果某个节点的 PONG 延迟超过了超时时间,则将节点标记为主观下线PFAIL
。 - 如果当前节点是从节点,则检查是否满足手动故障转移条件,执行方法为
clusterHandleManualFailover()
,这个方法将会mf_can_start
标记设置为1,表示接下来可以执行手动故障转移逻辑。 - 如果手动故障转移超时,则中止手动故障转移。手动故障转移超时时间默认5秒。
- 如果当前节点是主节点,且从节点数量最多,则将一个从节点送给一个孤儿主节点。
- 每隔100毫秒执行处理自动故障转移(自动主从切换)的逻辑,包括选举新的主节点和更新集群状态,执行方法为
clusterHandleSlaveFailover()
。这个我们后面会重点讲解,先知道即可。
/* -----------------------------------------------------------------------------
* CLUSTER cron job
* -------------------------------------------------------------------------- */
/* This is executed 10 times every second */
// 集群定时任务,每0.1秒执行一次,用于处理集群的心跳、故障检测、状态同步、故障转移等任务。
void clusterCron(void) {
dictIterator *di; // 字典迭代器,用于迭代集群中的所有节点
dictEntry *de; // 用于保存迭代器返回的节点
int update_state = 0; // 是否更新集群状态
// 孤儿主节点的数量,孤儿节点即没有工作的从节点的主节点
int orphaned_masters; /* How many masters there are without ok slaves. */
// 单个主节点的最大从节点数量
int max_slaves; /* Max number of ok slaves for a single master. */
// 当前主节点的从节点数量
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime(); // 最小的 PONG 时间,当前时间
clusterNode *min_pong_node = NULL; // 最小 PONG 时间的节点,即最晚发送 PONG 的节点
static unsigned long long iteration = 0; // 函数被调用的次数,大概100毫秒调用一次
mstime_t handshake_timeout; // 握手超时时间
// 函数被调用的次数,大概100毫秒调用一次
iteration++; /* Number of times this function was called so far. */
/* We want to take myself->ip in sync with the cluster-announce-ip option.
* The option can be set at runtime via CONFIG SET, so we periodically check
* if the option changed to reflect this into myself->ip. */
/*
* cluster-announce-ip 是一个重要的配置参数,用于指定 Redis 节点对外宣告的 IP 地址。
* 它的作用是解决 Redis 集群在复杂网络环境(如 Docker、云服务器、NAT 等)中可能遇到的通信问题。
* 容器内的 Redis 节点可能会获取到 Docker 内部的 IP 地址(如 172.17.0.2),而外部节点无法通过这个 IP 地址访问它。
* 此时,可以通过 cluster-announce-ip 指定宿主机的 IP 地址。可以通过config set命令动态修改,因此需要定时检查。
*/
// 检查集群的 announce ip 是否发生了变化,如果发生了变化,则更新 myself->ip
{
static char *prev_ip = NULL; // 之前的 announce ip
char *curr_ip = server.cluster_announce_ip;
int changed = 0;
// 比较集群的 announce ip 和 之前的 announce ip 是否相同
if (prev_ip == NULL && curr_ip != NULL) changed = 1;
else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
// 如果发生了变化,则更新 myself->ip
if (changed) {
if (prev_ip) zfree(prev_ip);
prev_ip = curr_ip;
if (curr_ip) {
/* We always take a copy of the previous IP address, by
* duplicating the string. This way later we can check if
* the address really changed. */
prev_ip = zstrdup(prev_ip); // 复制 announce ip
strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
myself->ip[NET_IP_STR_LEN-1] = '\0';
} else {
myself->ip[0] = '\0'; /* Force autodetection. 强制自动检测 */
}
}
}
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
// 握手超时时间,如果节点超时时间太小,则使用 1 秒
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Update myself flags. */
// 更新 myself 的标志位,flags 是一个 64 位的无符号整数,用于记录节点是否允许故障转移
// flags 还记录了节点的角色(主节点或从节点)、节点的状态(在线或下线),是否有地址等信息
clusterUpdateMyselfFlags();
/* Check if we have disconnected nodes and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code.
* 检查集群中的节点是否断开连接,并尝试重新建立连接。
* 同时,它还更新了一些统计信息,这些信息可以在代码的其他部分用于做出更好的决策
* */
/* 获取一个安全的字典迭代器,用于遍历集群中的所有节点 */
di = dictGetSafeIterator(server.cluster->nodes);
/* 初始化统计信息:标记为 PFAIL(主观下线) 的节点数量 */
server.cluster->stats_pfail_nodes = 0;
/* 遍历集群中的所有节点,主要是与这些节点建立网络连接 */
while((de = dictNext(di)) != NULL) {
// 获取当前节点
clusterNode *node = dictGetVal(de);
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
/* 如果节点是自身节点或者没有地址信息,则跳过 */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
/* 如果节点被标记为 PFAIL(主观下线),则增加 pfail 统计计数 */
if (node->flags & CLUSTER_NODE_PFAIL)
server.cluster->stats_pfail_nodes++;
/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
/* 如果节点处于 HANDSHAKE 状态,并且超过了握手超时时间,则集群删除该节点 */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
/* 如果node节点没有与本节点建立有效的连接(link),则尝试重新建立连接 */
if (node->link == NULL) {
int fd; // 用于保存连接的文件描述符
mstime_t old_ping_sent; // 记录节点最近一次发送 PING 消息的时间
clusterLink *link; // 用于保存连接的 clusterLink 结构
/* 尝试以非阻塞方式连接到节点的 IP 和端口 */
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->cport, NET_FIRST_BIND_ADDR);
/* 如果连接失败,记录错误日志并跳过该节点 */
if (fd == -1) {
/* We got a synchronous error from connect before
* clusterSendPing() had a chance to be called.
* If node->ping_sent is zero, failure detection can't work,
* so we claim we actually sent a ping now (that will
* be really sent as soon as the link is obtained). */
// 记录本次发送 PING 消息的时间,声明现在实际上已经“发送”了一个 ping 消息
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
continue;
}
/* 创建新的 clusterLink 结构并将其与节点关联 */
link = createClusterLink(node);
link->fd = fd;
node->link = link;
// 将连接的文件描述符添加到事件循环中,监听读事件
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
/* 在新的连接上尽快发送一个 PING 消息,这对于避免故障检测中的误报至关重要。
如果节点被标记为 MEET,则发送 MEET 消息而不是 PING,以强制接收方将我们添加到其节点表中
ping_sent = 0 表示没有发送过 ping 消息,如果我们已经接收到PONG,则node->ping_sent为零,
ping_sent != 0 表示最近一次发送 ping 消息的时间
*/
old_ping_sent = node->ping_sent;
// clusterSendPing() 函数会发送一个 PING 消息,重新设置 ping_sent 时间
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
// 如果连接断开之前有一个活动的 ping,则我们要恢复 ping 时间,
// 否则由 clusterSendPing() 调用替换。
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
// 在发送第一个数据包后,我们可以清除标志。
node->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
}
// 释放字典迭代器
dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
// 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息
if (!(iteration % 10)) {
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
// 随机 5 个节点,选出其中一个最久与本节点通信的节点,向其发送 PING 消息
for (j = 0; j < 5; j++) {
// 随机在集群中挑选节点
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
// 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点
if (this->link == NULL || this->ping_sent != 0) continue;
// 跳过自身节点、无地址节点和握手节点
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
// 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this; // 记录最久未接收 PONG 回复的节点
min_pong = this->pong_received; // 记录最久未接收 PONG 回复的时间
}
}
// min_pong_node 就是五个节点中相对很久没通信的节点
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
// 发送 ping 消息
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave.
* /* 迭代节点以检查是否需要将某些节点标记为失败。
* 此循环还负责:
* 1) 检查是否存在孤儿主节点(即没有非失败从节点的主节点)。
* 2) 统计单个主节点拥有的最大非失败从节点数。
* 3) 如果当前节点是从节点,统计其主节点的从节点数。 */
orphaned_masters = 0; // 初始化孤儿主节点数
max_slaves = 0; // 初始化最大从节点数
this_slaves = 0; // 初始化当前主节点的从节点数
di = dictGetSafeIterator(server.cluster->nodes); // 获取集群节点的安全迭代器
while((de = dictNext(di)) != NULL) {// 遍历所有节点
clusterNode *node = dictGetVal(de);
now = mstime(); /* 每次迭代都使用更新的时间 Use an updated time at every iteration. */
mstime_t delay;
// 如果节点是自身节点、无地址节点或握手节点,则跳过
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
/* 孤儿主节点检查,仅在当前实例是从节点且可能迁移到其他主节点时有用。 */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
// 统计主节点的从节点数
int okslaves = clusterCountNonFailingSlaves(node);
/* A master is orphaned if it is serving a non-zero number of
* slots, have no working slaves, but used to have at least one
* slave, or failed over a master that used to have slaves. */
if (okslaves == 0 && node->numslots > 0 &&
node->flags & CLUSTER_NODE_MIGRATE_TO)
{
orphaned_masters++;
}
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}
/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
/* 如果我们等待PONG的时间超过集群超时时间的一半,则重新连接链接:
* 即使节点仍然存活,也可能存在连接问题。 */
if (node->link && /* is connected */
now - node->link->ctime > // 链接已经存在的时间超过了超时时间
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* 已经发送了PING we already sent a ping */
node->pong_received < node->ping_sent && /*仍在等待PONG still waiting pong */
/* 并且等待PONG的时间超过超时时间的一半 and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
/* 断开链接,它将自动重新连接。 */
freeClusterLink(node->link);
}
/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
/* 如果当前实例没有活动的PING,并且接收到的PONG时间超过集群超时时间的一半,
* 则立即发送一个新的PING,以确保所有节点都能在不太大的延迟内被PING到。 */
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); // 发送PING
continue;
}
/* 如果我们是主节点,并且某个从节点请求了手动故障转移,则持续PING它。 */
/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
if (server.cluster->mf_end && // 有节点发起了手动故障转移
nodeIsMaster(myself) && // 我是主
server.cluster->mf_slave == node && // 从节点请求了手动故障转移
node->link) // 连接存活
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); // 发送PING
continue;
}
/* Check only if we have an active ping for this instance. */
/* 仅在我们对该实例有活动的PING时进行检查。 */
if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
/* 计算PONG的延迟。注意,如果我们已经接收到PONG,则node->ping_sent为零,
* 因此无法执行到此代码。 */
delay = now - node->ping_sent;
// pong的延迟超过cluster_node_timeout,标记为主观下线 pfail
// 如果节点的 PONG 延迟超过了超时时间,则将节点标记为主观下线
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
// 如果节点不是已经标记为主观下线,则将其标记为主观下线
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL; // 将节点标记为主观下线
update_state = 1; // 需要更新集群状态
}
}
}
dictReleaseIterator(di); // 释放迭代器
/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
/* 如果我是从节点但复制功能仍然关闭,且我们知道主节点的地址并且主节点似乎在线,
* 则启用复制功能。 */
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); // 设置主节点
}
/* Abourt a manual failover if the timeout is reached. */
/* 如果手动故障转移超时到达,则中止手动故障转移。超时时间默认5秒 */
manualFailoverCheckTimeout();
// 如果当前节点是从节点
if (nodeIsSlave(myself)) {
// 如果myself是从节点则检查是否满足手动故障转移条件
// 如果满足条件,会设置 mf_can_start 标志,允许手动故障转移继续进行
clusterHandleManualFailover();
// 检查是否设置了 CLUSTER_MODULE_FLAG_NO_FAILOVER 标志。该标志用于禁用自动故障转移。
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
// 处理自动故障转移的逻辑,包括选举新的主节点和更新集群状态
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
// 处理从节点迁移的逻辑,将当前从节点迁移到孤儿主节点
clusterHandleSlaveMigration(max_slaves);
}
// 更新集群
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
手动故障转移设置
设置手动故障转移的代码如下,假设在某个从节点上执行了cluster failover
则会设置server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
因此会执行下面的方法,将
server.cluster->mf_can_start = 1;
设置为1,表示接下来的代码可以通过该标记直接执行主从切换逻辑。注意:server.cluster->mf_end = 0
表示没有接收到手动故障转移命令。
/* This function is called from the cluster cron function in order to go
* forward with a manual failover state machine.
手动故障转移
*/
void clusterHandleManualFailover(void) {
/* Return ASAP if no manual failover is in progress. */
// 检查是否正在进行手动故障转移:为0则不是手动切换
if (server.cluster->mf_end == 0) return;
/* If mf_can_start is non-zero, the failover was already triggered so the
* next steps are performed by clusterHandleSlaveFailover(). */
// 检查手动故障转移是否已经触发开始,表示主从同步了,下一步执行clusterHandleSlaveFailover()
if (server.cluster->mf_can_start) return;
// 等待主节点的复制偏移量:
if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
// 检查从节点的复制偏移量是否与主节点一致,如果主从同步了
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
/* Our replication offset matches the master replication offset
* announced after clients were paused. We can start the failover. */
// 表示可以开始故障转移,并记录日志,等于1表示同步了
server.cluster->mf_can_start = 1;
serverLog(LL_WARNING,
"All master replication stream processed, "
"manual failover can start.");
}
}
主从切换(故障转移)
在clusterHandleSlaveFailover()
主从切换方法中,既包括自动主从切换,也执行了手动主存切换的逻辑。手动主从切换是立即要求其他主节点投票本节点的,无需要延迟和等待客观下线。但是投票依旧需要。这里面有很多细节,阅读下面代码的重点如下:
- 该方法每隔100毫秒会被调用一次。
- 选举超时时间等于两倍的
cluster_node_timeout
,即如果在两倍的cluster_node_timeout
时间内还未选举成功则选举失败,如果在此时间内收到了半数主节点投票则选举成功。 - 选举失败重试时间等于4倍的
cluster_node_timeout
,即如果选举失败了则需要距离第一次选举开始4倍的cluster_node_timeout
时间才可以开始下一轮选举。 - 只有从节点且它的主节点下线了(或者收到了手动故障转移标记)才会进行下面的故障转移逻辑。
- 下一次选举的时间是:
当前时间+500ms + 随机 0~500ms + rank * 1000
组成。 - 当执行到发起选举的逻辑代码中,会将当前纪元增加1,将选举状态设置为1,表示正在选举,然后广播投票请求,直接返回。100ms后再次被调用,执行判断是否有收到主节点的投票数超过主节点半数。如果是则执行
clusterFailoverReplaceYourMaster();
方法替换主节点。否则打印日志等待投票。 - 其中向其他节点发送投票广播可以看
clusterRequestFailoverAuth
方法。
/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
*
* 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态,那么执行这个函数。
*
* 这个函数有三个目标:
*
* 1) To check if we are able to perform a failover, is our data updated?
* 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)?
* 2) Try to get elected by masters.
* 选举一个新的主节点
* 3) Perform the failover informing all the other nodes.
* 执行故障转移,并通知其他节点
*/
// 这个方法每100毫秒会被检查是否执行
// 作用:当从节点的主节点被标记为 FAIL 状态时,从节点会尝试发起故障转移,选举自己为新的主节点。
void clusterHandleSlaveFailover(void) {
mstime_t data_age; // 从节点与主节点的断开秒数
// 距离上一次故障转移的秒数 = 现在时间 - 上一次故障转移的时间
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1; //选举所需的法定人数
// 是否手动故障转移,如果设置了mf_end和mf_can_start就是手动切换
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
// auth_timeout 选举超时时间等于两倍的cluster_node_timeout
// auth_retry_time 选举失败重试时间等于4倍的cluster_node_timeout
mstime_t auth_timeout, auth_retry_time;
// 清除 server.cluster->todo_before_sleep 中的 CLUSTER_TODO_HANDLE_FAILOVER 标志位
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
*
* Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout*2; // 默认选举超时时间为 15*2 = 30 秒
if (auth_timeout < 2000) auth_timeout = 2000; // 最小为 2 秒
// 选举失败后的重试时间是 auth_timeout 的两倍。 30*2 = 60 秒
// 即距离第一次选举开始时间需要间隔60s才能再次发起选举
auth_retry_time = auth_timeout*2;
/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave. 我是从节点
* 2) Our master is flagged as FAIL, or this is a manual failover. 我的主客观下线了fail或者手动切换
* 3) We don't have the no failover configuration set, and this is
* not a manual failover. 自动切换需要主节点是fail状态,而且没有设置cluster-replica-no-failover = no
* 4) It is serving slots. 我的主节点被分配了slots */
// 检查从节点是否有资格发起故障转移,同时需要满足以上四个条件
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
(server.cluster_slave_no_failover && !manual_failover) ||
myself->slaveof->numslots == 0)
{
/* There are no reasons to failover, so we set the reason why we
* are returning without failing over to NONE. */
// 设置不能发起故障转移的原因为 CLUSTER_CANT_FAILOVER_NONE
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
/* Set data_age to the number of seconds we are disconnected from
* the master. */
// 计算从节点与主节点断开连接的时间 data_age 秒
// 如果从节点当前与主节点连接,则 data_age 为当前时间减去最后一次与主节点交互的时间。
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
// 如果从节点与主节点断开连接的时间超过了节点超时时间,那么减去节点超时时间
// cluster-node-timeout 的时间不计入断线时间之内
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough according to the slave validity
* factor configured by the user.
*
* Check bypassed for manual failovers. */
// 检查从节点的数据是否足够新
// 目前的检测办法是断线时间不能超过 cluster-node-timeout 的十倍
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
// 如果不是手动切换
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
/* If the previous failover attempt timedout and the retry time has
* elapsed, we can setup a new one. */
// 如果距离上一次发起选举的时间的秒数(auth_age)超过了选举的重试时间(auth_retry_time)
// 则重新调度下一次选举
if (auth_age > auth_retry_time) {
// 设置下一次选举的时间:500ms + 随机 0~500ms
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0; // 投票数清零
server.cluster->failover_auth_sent = 0; // 投票发送状态清零
server.cluster->failover_auth_rank = clusterGetSlaveRank(); // 获取当前节点的排名
/* We add another delay that is proportional to the slave rank.
* Specifically 1 second * rank. This way slaves that have a probably
* less updated replication offset, are penalized. */
// 偏移量最小的从节点等待时间最短,偏移量最大的从节点等待时间最长
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* However if this is a manual failover, no delay is needed. */
// 如果是手动切换,则不需要等待,立即发起选举,优先级为0
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
// 设置可以进行故障转移的状态
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
// 打印下一次选举的日志
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
/* Now that we have a scheduled election, broadcast our offset
* to all the other slaves so that they'll updated their offsets
* if our offset is better. */
// 广播消息通知其他从节点更新自己的复制偏移量
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
/* It is possible that we received more updated offsets from other
* slaves for the same master since we computed our election delay.
* Update the delay if our rank changed.
*
* Not performed if this is a manual failover. */
// 如果是自动切换,则执行下面的,根据rank设置延迟时间
if (server.cluster->failover_auth_sent == 0 && // 没有向其他节点发起选举投票
server.cluster->mf_end == 0) // 不是手动切换
{
int newrank = clusterGetSlaveRank(); // 获取当前节点的优先级排名
if (newrank > server.cluster->failover_auth_rank) { // 如果当前节点的优先级排名大于上一次的优先级排名
long long added_delay = // 计算增加的延迟时间
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
// 打印延迟多少毫秒后发起选举
serverLog(LL_WARNING,
"Replica rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
/* Return ASAP if we can't still start the election. */
// 如果当前时间(mstime())还未达到下一次选举的时间(failover_auth_time),则不能发起选举。
if (mstime() < server.cluster->failover_auth_time) {
// 打印还需要等待多少毫秒才能发起选举,等最少100ms
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
// 距离上一次选举开始的时间超过了选举超时时间,则本轮选举失败
/* Return ASAP if the election is too old to be valid. */
if (auth_age > auth_timeout) {
// 即选举时间超过了 2*cluster_node_timeout,选举失败
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}
/* Ask for votes if needed. */
// 执行到这里表示前面都不符合,说明可以发起选举
if (server.cluster->failover_auth_sent == 0) { // 没有向其他节点发起选举投票
server.cluster->currentEpoch++; // 当前纪元+1
server.cluster->failover_auth_epoch = server.cluster->currentEpoch; // 设置选举纪元
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
// 向所有集群节点发送投票请求
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1; // 选举状态置为1,表示正在选举
// 设置三个信号量
// 在事件循环的 beforeSleep 阶段执行一些与集群相关的任务。beforeSleep 是 Redis 事件循环的一部分,
// 每次事件循环进入休眠之前都会调用这个阶段,用于处理一些需要在事件循环休眠前完成的任务。
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| // 保存配置
CLUSTER_TODO_UPDATE_STATE| // 更新状态
CLUSTER_TODO_FSYNC_CONFIG); // 同步配置文件到磁盘
return; /* Wait for replies. 等待投票,等待时间 cluster-node-timeout*2,除非收到投票大于半数 */
}
/* Check if we reached the quorum. */
// 如果当前节点获得的投票数大于等于法定人数,则故障转移成功
if (server.cluster->failover_auth_count >= needed_quorum) {
/* We have the quorum, we can finally failover the master. */
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
/* Update my configEpoch to the epoch of the election. */
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
/* Take responsibility for the cluster slots. */
clusterFailoverReplaceYourMaster();
} else {
// 如果没有获得过半的选票
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
}
广播投票请求代码
void clusterRequestFailoverAuth(void) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
// 构建消息头,设置消息类型为 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST
// 即故障转移授权请求
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
/* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the master is working. */
// 如果是手动故障转移,设置 CLUSTERMSG_FLAG0_FORCEACK 标志
// 通知接收到消息的节点,即使主节点正常工作,也应该授权故障转移(强制授权)
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
// 将消息广播发送给集群中的所有节点
clusterBroadcastMessage(buf,totlen);
}
投票请求处理与收到投票处理
找到clusterProcessPacket()
方法,这个方法主要是用于处理收到的请求,我们找到处理投票请求和手动切换请求的消息处理逻辑。值得注意以下几点:
- 投票必须由主节点进行投票。请求的配置纪元必须大于等于当前节点的配置纪元。
- 如果在
cluster_node_timeout * 2
默认30秒内已经投过票了则不在进行投票,第二次投票的时间间隔必须大于两倍的集群节点超时时间。 - 对于投票请求处理很简单,只是检查请求者是否满足条件,如果满足代码中的三个条件则进行投票,并设置故障转移检查标记,尽快执行检查是否大于半数投票的逻辑。
- 另外,如果是手动触发的故障转移,同样需要投票,这里手动故障转移超时时间默认5秒。并且,暂停客户端请求,防止在故障转移期间写入数据,暂停的时间是
CLUSTER_MF_TIMEOUT * 2
,默认10秒,直到故障转移成功。
// 收到投票请求
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
// 如果不知道发送者直接返回
if (!sender) return 1; /* We don't know that node. */
// 发送投票消息,这段代码详细解释如下
clusterSendFailoverAuthIfNeeded(sender,hdr);
// 故障转移投票确认,其他主节点会通过发送 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 消息来表示支持
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a master serving
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
// 发送者必须是一个主节点(只有主节点有投票权)
// 发送者必须负责至少一个槽(即它是一个有效的主节点)
// 发送者的当前 epoch 必须大于或等于当前节点发起故障转移选举时的 epoch。这是为了防止过期的投票
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
// 投票计数
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
// 检查是否达到法定票数
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
// 向主节点发送手动切换的消息
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
/* This message is acceptable only if I'm a master and the sender
* is one of my slaves. */
// 我是master,发送者是我的slave,消息才会被接受
if (!sender || sender->slaveof != myself) return 1;
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
// 重设手动故障转移状态
resetManualFailover();
// 设置手动故障转移超时时间戳
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
// 记录发起手动故障转移的从节点
server.cluster->mf_slave = sender;
// 暂停客户端请求,防止在故障转移期间写入数据,暂停的时间是 CLUSTER_MF_TIMEOUT * 2(默认 10 秒)
pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
// 记录日志,通知管理员手动故障转移已由某个从节点发起
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
}
clusterSendFailoverAuthIfNeeded()
请求投票的逻辑如下:
/* Vote for the node asking for our vote if there are the conditions.
*
手动故障转移的投票逻辑主要体现在对 force_ack 标志的处理上:
绕过主节点状态的检查:即使主节点没有处于 FAIL 状态,投票节点仍然可以投票。
其他条件与自动故障转移一致:包括配置纪元检查、重复投票检查、槽布局配置纪元检查等。
投票行为与自动故障转移一致:满足条件后,投票节点会记录投票轮次和投票时间,并向请求节点发送投票。
*/
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
// 请求节点的主节点
clusterNode *master = node->slaveof;
// 请求节点的当前配置纪元
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
// 请求节点想要获得投票的纪元
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
// 请求节点的槽布局
unsigned char *claimed_slots = request->myslots;
// 如果设置了 CLUSTERMSG_FLAG0_FORCEACK 标志,则 force_ack 为 1,表示这是一个手动故障转移请求
// 手动故障转移的唯一区别在于,force_ack 为 1 时可以绕过主节点状态的检查。
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
/* IF we are not a master serving at least 1 slot, we don't have the
* right to vote, as the cluster size in Redis Cluster is the number
* of masters serving at least one slot, and quorum is the cluster
* size + 1 */
// 如果节点为从节点,或者是一个没有处理任何槽的主节点,
// 那么它没有投票权
if (nodeIsSlave(myself) || myself->numslots == 0) return;
/* Request epoch must be >= our currentEpoch.
* Note that it is impossible for it to actually be greater since
* our currentEpoch was updated as a side effect of receiving this
* request, if the request epoch was greater. */
// 请求的配置纪元必须大于等于当前节点的配置纪元
if (requestCurrentEpoch < server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}
/* I already voted for this epoch? Return ASAP. */
// 已经投过票了
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}
/* Node must be a slave and its master down.
* The master can be non failing if the request is flagged
* with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
// 节点必须是 slave 并且它的 master 不能为null,并且master节点是挂了
if (nodeIsMaster(node) || master == NULL ||
// 如果master在线,且是手动故障转移也执行投票,下面逻辑绕过了master必须存活的投票逻辑
(!nodeFailed(master) && !force_ack)) // force_ack 为 1 时,可以绕过主节点 FAIL 状态的检查
{
if (nodeIsMaster(node)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
} else if (master == NULL) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
} else if (!nodeFailed(master)) {
// ((n)->flags & CLUSTER_NODE_PFAIL)
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}
/* We did not voted for a slave about this master for two
* times the node timeout. This is not strictly needed for correctness
* of the algorithm but makes the base case more linear. */
// 如果之前一段时间已经对请求节点进行过投票,那么不进行投票
// 第二次投票的时间间隔必顋大于两倍的集群节点超时时间
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
/* The slave requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the masters currently serving the same
* slots in the current configuration. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
// 跳过未指派节点
if (bitmapTestBit(claimed_slots, j) == 0) continue;
// 查找是否有某个槽的配置纪元大于节点请求的纪元
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
/* If we reached this point we found a slot that in our current slots
* is served by a master with a greater configEpoch than the one claimed
* by the slave requesting our vote. Refuse to vote for this slave. */
// 如果有的话,说明节点请求的纪元已经过期,没有必要进行投票
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}
/* We can vote for this slave. */
// 记录投票轮次和投票时间
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
// 为节点投票
clusterSendFailoverAuth(node);
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
我们再上述代码中总是可以看见clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
类似代码,这里节点解释就是设置集群的标记,参数就是标记,然后在每次进入事件驱动库的主循环之前(理解为另外一个定时任务),都会调用clusterBeforeSleep()
方法,然后会判断设置的标记从而执行对应的操作,代码如下:
/* This function is called before the event handler returns to sleep for
* events. It is useful to perform operations that must be done ASAP in
* reaction to events fired but that are not safe to perform inside event
* handlers, or to perform potentially expansive tasks that we need to do
* a single time before replying to clients. */
/* 这个函数在事件处理程序返回并进入事件等待之前被调用。
* 它用于执行那些必须尽快响应事件但又不适合在事件处理程序中执行的操作,
* 或者用于执行那些在回复客户端之前需要一次性完成的可能耗时的任务。 */
void clusterBeforeSleep(void) {
/* Handle failover, this is needed when it is likely that there is already
* the quorum from masters in order to react fast. */
/* 处理故障转移,当已经有足够的主节点达成共识时,需要快速响应。 */
if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
clusterHandleSlaveFailover(); // 调用处理从节点故障转移的函数
/* Update the cluster state. */
/* 更新集群状态。 */
if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
clusterUpdateState();
/* 保存配置,可能会使用 fsync 进行同步。 */
/* Save the config, possibly using fsync. */
if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
int fsync = server.cluster->todo_before_sleep &
CLUSTER_TODO_FSYNC_CONFIG;
clusterSaveConfigOrDie(fsync);
}
/* Reset our flags (not strictly needed since every single function
* called for flags set should be able to clear its flag). */
/* 重置标志位(严格来说不是必须的,因为每个被调用的函数都应该能够清除自己的标志位)。 */
server.cluster->todo_before_sleep = 0;
}
以上就是故障转移的核心源码,另外重要主观下线和客观下线源码和PING消息发送源码解读将在下一篇文章中继续分析!