zk源码—7.ZAB协议和数据存储一
大纲
1.两阶段提交Two-Phase Commit(2PC)
2.三阶段提交Three-Phase Commit(3PC)
3.ZAB协议算法
4.ZAB协议与Paxos算法
5.zk的数据存储原理之内存数据
6.zk的数据存储原理之事务日志
7.zk的数据存储原理之数据快照
8.zk的数据存储原理之数据初始化和数据同步流程
1.两阶段提交Two-Phase Commit(2PC)
(1)数据库事务通过undo和redo保证数据强一致性
(2)分布式事务通过2PC保证数据强一致性
(3)2PC的优点和缺点
(1)数据库事务通过undo和redo保证数据强一致性
2PC即二阶段提交算法,是强一致性算法。它是数据库领域内,为了使基于分布式系统架构下的所有节点,在进行事务处理过程中能够保持原子性和一致性而设计的算法。所以很适合用作数据库的分布式事务,其实数据库经常用到的TCC本身就是一种2PC。
在InnoDB存储引擎中,对数据库的事务修改都会写undo日志和redo日志。其实不只是数据库,很多需要事务支持的都会用到undo和redo思路。
一.对一条数据的修改操作首先写undo日志,记录数据原来的样子
二.然后执行事务修改操作,把数据写到redo日志里
三.万一事务失败了,可以从undo日志恢复数据
数据库通过undo和redo能保证数据强一致性。
(2)分布式事务通过2PC保证数据强一致性
解决分布式事务的前提就是节点是支持事务的。在这个前提下,2PC把整个分布式事务分两个阶段:投票阶段(Prepare)和执行阶段(Commit)。
阶段一:投票阶段
在阶段一中,各参与者投票表明是否要继续执行接下来的事务提交操作。
步骤一:协调者向参与者发起事务询问。协调者向所有的参与者发送事务内容,询问是否可以执行事务操作,并开始等待各参与者的响应。
步骤二:参与者收到协调者的询问后执行事务。各参与者节点执行事务操作,并将undo和redo信息记入事务日志中。
步骤三:参与者向协调者反馈事务询问的响应。如果参与者成功执行事务操作,就反馈协调者Yes响应,表示事务可执行。如果参与者没成功执行事务,就反馈协调者No响应,表示事务不可执行。
阶段二:执行阶段
在阶段二中,协调者会根据各参与者的反馈来决定是否可以进行事务提交。有两种提交可能:执行事务提交和中断事务。
可能一:执行事务提交
假如协调者从所有参与者获得的反馈都是Yes响应,那么就会执行事务提交。
步骤一:协调者向参与者发送提交请求,协调者向所有参与者节点发出commit请求。
步骤二:参与者收到协调者的commit请求后进行事务提交。参与者接收到commit请求后,会正式执行事务提交操作,并在完成提交之后释放在整个事务执行期间占用的事务资源。
步骤三:参与者向协调者反馈事务提交结果(Ack消息)。参与者在完成事务提交之后,向协调者发送Ack消息。
步骤四:协调者收到所有参与者反馈(Ack)后完成事务。协调者接收到所有参与者反馈的Ack消息后,完成事务。
可能二:中断事务
假如任何一个参与者向协调者反馈了No响应或者等待超时,那么协调者无法接收到所有参与者的反馈响应,就会中断事务。
步骤一:协调者向参与者发送回滚请求,协调者向所有参与者节点发出rollback请求。
步骤二:参与者收到协调者的rollback请求后进行事务回滚。参与者接收到rollback请求后,会利用undo信息来执行事务回滚操作,并在完成回滚之后释放占用的事务资源。
步骤三:参与者向协调者反馈事务回滚结果(Ack消息)。参与者在完成事务回滚之后,向协调者发送Ack消息。
步骤四:协调者收到所有参与者反馈(Ack)后中断事务。协调者接收到所有参与者反馈的Ack消息后,完成事务中断。
(3)2PC的优点和缺点
一.2PC优点
优点一:原理简单
优点二:实现方便
二.2PC缺点
总结来说有四个缺点:同步阻塞、单点故障、数据不一致、容错机制不完善。
缺点一:同步阻塞
在二阶段提交过程中,所有节点都在等其他节点响应,无法进行其他操作,这种同步阻塞极大的限制了分布式系统的性能。
缺点二:单点问题
协调者在整个二阶段提交过程中很重要。如果协调者在提交阶段出现问题,那么整个流程将无法运转。而且其他参与者会处于一直锁定事务资源的状态中,无法完成事务操作。
缺点三:数据不一致
假设当协调者向所有参与者发送commit请求后,发生了局部网络异常。或者是协调者在尚未发送完所有commit请求之前自身发生了崩溃,导致最终只有部分参与者收到了commit请求,这将导致数据不一致问题。
缺点四:容错性不好
二阶段提交协议没有较为完善的容错机制,任意一个参与者或协调者故障都会导致整个事务的失败。
2.三阶段提交Three-Phase Commit(3PC)
(1)第一阶段canCommit
(2)第二阶段preCommit
(3)第三阶段doCommit
(4)3PC的优缺点
(5)3PC与2PC区别
(1)第一阶段canCommit
步骤一:协调者向参与者发起事务询问。协调者向所有参与者发送一个包含事务内容的canCommit请求。询问是否可以执行事务提交操作,并开始等待各参与者响应。
步骤二:参与者收到协调者的询问后反馈响应。参与者在接收到协调者的canCommit请求后,如果认为可以顺利执行事务,会反馈Yes响应并进入预备状态,否则反馈No响应。
这一阶段其实就是确认所有的资源是否都是健康、在线的。因为有了这一阶段,大大的减少了2PC提交的阻塞时间。
因为这一阶段优化了以下这种情况:2PC提交时,如果有两个参与者1和2而恰好参与者2出现问题,参与者1执行了耗时的事务操作,最后却发现参与者2连接不上。
(2)第二阶段preCommit
包含两种可能:执行事务预提交和中断事务。
可能一:执行事务预提交
假如协调者从所有参与者获得的反馈都是Yes响应,则执行事务预提交。
步骤一:协调者向参与者发送预提交请求。协调者向所有参与者发出preCommit请求,然后协调者会进入预提交状态。
步骤二:参与者收到协调者的preCommit请求后执行事务。参与者接收到协调者发出的preCommit请求后,会执行事务操作,并将undo和redo信息记录到事务日志中。
步骤三:参与者向协调者反馈事务执行的响应(Ack)。如果参与者成功执行了事务操作,那么就会反馈给协调者Ack响应。
可能二:中断事务
假如任何一个参与者向协调者反馈了No响应,或者等待超时。协调者无法接收到所有参与者的反馈响应,那么就会中断事务。
步骤一:协调者向参与者发送中断请求,协调者向所有参与者节点发出abort请求。
步骤二:参与者收到协调者abort请求则中断事务。无论是收到来自协调者的abort请求,或者在等待协调者请求过程中出现超时,参与者都会中断事务。
(3)第三阶段doCommit
包含两种可能:执行提交和中断事务。
可能一:执行提交
接收到来自所有参与者的Ack响应。
步骤一:协调者向参与者发送提交请求。协调者向所有参与者发出doCommit请求,由预提交状态进入提交状态。
步骤二:参与者收到协调者的doCommit请求后提交事务。参与者接收到doCommit请求后,会正式执行事务提交操作,并在完成提交之后释放整个事务执行期间占用的事务资源。
步骤三:参与者向协调者反馈事务提交结果。参与者在完成事务提交之后,向协调者发送Ack消息。
步骤四:协调者收到所有参与者反馈(Ack)完成事务。协调者接收到所有参与者反馈的Ack消息后,完成事务。
可能二:中断事务
假如任何一个参与者向协调者反馈了No响应,或者等待超时。协调者无法接收到所有参与者的反馈响应,那么就会中断事务。
步骤一:协调者向参与者发送回滚请求,协调者向所有参与者节点发出abort请求。
步骤二:参与者收到协调者的abort请求后进行事务回滚。参与者接收到协调者的abort请求后,会利用undo信息执行事务回滚操作,并在完成回滚之后释放占用的事务资源。
步骤三:参与者向协调者反馈事务回滚结果(Ack消息)。参与者在完成事务回滚之后,会向协调者发送Ack消息。
步骤四:协调者收到所有参与者反馈(Ack)后中断事务。协调者接收到所有参与者反馈的Ack消息后,完成事务中断。
注意:一旦进入阶段三doCommit,无论出现哪一种故障:协调者出现了问题、协调者和参与者之间网络故障,最终都会导致参与者无法及时接收来自协调者的doCommit或abort请求。参与者都会在等待超时后,继续进行事务提交。
(4)3PC的优缺点
三阶段提交协议的优点
优点一:改善同步阻塞
与2PC相比,降低了参与者的阻塞范围。
优点二:改善单点故障
与2PC相比,出现单点故障后能继续达成一致。
三阶段提交协议的缺点
缺点一:同步阻塞
相比2PC虽然降低阻塞范围,但依然存在阻塞。
缺点二:单点故障
虽然单点故障后能继续提交,但单点故障依然存在。
缺点三:数据不一致
正因出现单点故障后能继续提交,所以数据不一致。
缺点四:容错机制不完善
参与者或协调者节点失败会导致事务失败,所以数据库的分布式事务一般都是采用2PC,而3PC更多是被借鉴扩散成其他的算法。
(5)3PC与2PC区别
区别一:3PC第二阶段才写undo和redo事务日志。
区别二:3PC第三阶段协调者出现异常或网络超时参与者也会Commit。
3.ZAB协议算法
(1)ZAB协议介绍
(2)ZAB协议的消息广播模式
(3)ZAB协议的崩溃恢复模式
(4)崩溃恢复模式中完成Leader选举后的数据同步
(1)ZAB协议介绍
一.ZAB协议的一主多从模式
二.ZAB协议的三个保证
三.Leader和非Leader对事务请求的处理
四.ZAB协议的由三个主要步骤
五.ZAB协议的两种基本模式
ZAB协议是为分布式协调服务zk设计的一种支持崩溃恢复的原子广播协议。它在设计之初并没有要求具有很好的扩展性,最初只是为了应对高吞吐量、低延迟、健壮、简单的分布式场景。ZAB协议并不像Paxos算法那样是一种通用的分布式一致性算法,它是特别为zk设计的支持崩溃恢复的原子广播算法。
一.ZAB协议的一主多从模式
zk主要依赖ZAB协议来实现分布式数据一致性。ZAB协议采用主从模式的系统架构来保证集群间各副本数据的一致性。具体如下:
ZAB协议会使用一个单一的主进程来接收并处理客户端的所有事务请求,然后该主进程会将请求的处理结果以Proposal形式广播到所有副本进程中。
二.ZAB协议的三个保证
保证同一时刻集群中只有一个主进程来广播数据变更,保证一个全局的变更序列被顺序应用,保证当主进程崩溃或重启时集群能自动恢复正常工作。
三.Leader和非Leader对事务请求的处理
ZAB协议只允许唯一一个Leader来处理事务请求。Leader接收到客户端事务请求后,会生成对应的Proposal提议并发起广播。非Leader接收到客户端事务请求后,会将事务请求转发给Leader。
四.ZAB协议的由三个主要步骤
步骤一:Leader发送Proposal提议给集群中的所有节点,包括它自己。
步骤二:节点收到Proposal提议后把Proposal提议落盘,然后发送一个ACK给Leader。
步骤三:Leader收到过半节点的ACK后,发送Commit给集群中的所有节点。
五.ZAB协议的两种基本模式
分别是崩溃恢复模式和消息广播模式。ZAB协议中的状态同步是指,集群中过半机器和Leader的数据状态一致。当集群启动或Leader出现崩溃推出等异常时,就会进入恢复模式进行选举。当集群已有过半Follower完成和Leader的状态同步,就会进入广播模式。如果集群已存在Leader进行消息广播,新加入的服务器就会进入恢复模式。
ZAB协议进入崩溃恢复模式后,只要集群中存在过半服务器能相互通信,那么就可以产生一个新的Leader并再次进入消息广播模式。比如一个由3台机器组成的ZAB服务 = 1个Leader + 2个Follower。当其中一个Follower挂掉时,整个ZAB集群是不会中断服务的,因为Leader服务器依然能够获得过半机器(包括自己)的支持。
(2)ZAB协议的消息广播模式
一.ZAB协议的消息广播过程
二.ZAB协议的消息广播过程与2PC的区别
三.ZAB协议的消息广播过程能保证消息接收和发送的顺序性
一.ZAB协议的消息广播过程
ZAB协议的消息广播过程使用的是一个原子广播协议,类似于2PC过程。首先,Leader会为客户端的事务请求生成对应的事务Proposal并进行广播。然后,Leader会收集Follower返回的ACK是否已过半。若已过半,Leader则会发送Commit消息到所有Follower去提交事务。
二.ZAB协议的消息广播过程与2PC的区别
区别一:移除了中断逻辑
在ZAB协议的消息广播(二阶段提交)过程中,移除了中断逻辑。所有的Follower要么正常反馈Leader提出的事务Proposal,要么不反馈。
区别二:无需等所有Follower都反馈响应
移除了中断逻辑,意味着可以在过半Follower反馈ACK后就可以提交事务,不需要等待集群中的所有Follower都反馈ACK响应。
区别三:添加崩溃恢复模式解决Leader崩溃退出导致的数据不一致问题
由于移除了中断逻辑 + 过半Follower反馈就可以提交事务,所以这时是无法处理Leader崩溃退出而带来的数据不一致问题的,因此ZAB协议添加了崩溃恢复模式来解决这个问题。
三.ZAB协议的消息广播过程能保证消息接收和发送的顺序性
另外,整个消息广播过程是基于FIFO特性的TCP协议来进行网络通信的,所以能够很容易保证消息广播过程中消息接收和发送的顺序性。广播时是由一个主进程Leader去通过FIFO的TCP协议进行发送,所以某Follower接收的多个Proposal和Commit请求都能按顺序入队和响应。
(3)ZAB协议的崩溃恢复模式
一.崩溃恢复过程可能出现数据不一致的两种情况
二.ZAB需要设计这样一个Leader选举算法
ZAB协议的这个基于原子广播协议的消息广播过程,正常情况下运行良好。但一旦Leader出现崩溃,或者由于网络Leader失去与过半Follower的联系。那么就会进入崩溃恢复模式,选举出一个新的Leader。
一.崩溃恢复过程可能出现数据不一致的两种情况
情况一:假设一个事务在Leader上被提交了,并且已有过半Follower反馈ACK响应。但是在Leader将Commit消息发送给所有Follower前Leader挂了。所以ZAB协议需要确保已在Leader提交的Proposal提议也能被所有Follower提交。
情况二:如果在崩溃恢复过程中出现一个需要被丢弃的Proposal提议,那么在崩溃恢复结束后需要跳过该Proposal提议,所以ZAB协议需要确保丢弃那些只在Leader上处理过的事务Proposal提议。
二.ZAB需要设计这样一个Leader选举算法
算法要求:能够确保提交已经被Leader提交的事务Proposal提议,同时丢弃已经被跳过的事务Proposal提议。
算法设计:让新选举出来的Leader拥有集群中ZXID最大的事务Proposal提议,那么就可以保证新Leader一定拥有所有已经提交的Proposal提议,同时也省去Leader去检查Proposal提议的提交和丢弃操作。
对于出现不一致的情况一:如果Leader将Commit消息发送给所有Follower前,Leader崩溃了。那么Leader崩溃后,就会选举一个拥有最大ZXID的Follower作为Leader。这个Leader会检查事务日志:如果发现自己事务日志里有一个还没进行提交的Proposal提议,那么就说明旧Leader没来得及发送Commit消息就崩溃了,此时它作为新Leader会为这个Proposal提议向Follower发送Commit消息,从而保证旧Leader提交的事务最终可以被提交到所有Follower中。
对于不一致的情况二:如果一个Leader刚把一个Proposal提议写入本地磁盘日志,还没来得及广播Proposal提议给全部Follower就崩溃了。那么当新Leader选举出来后,事务的epoch会自增长一位。然后当旧Leader重启后重新加入集群成为Follower时,会发现自己比新Leader多出一条Proposal提议,但该Proposal提议的epoch比新Leader的epoch低,所以会丢弃这条数据。
(4)崩溃恢复模式中完成Leader选举后的数据同步
一.ZAB协议在正常情况下的数据同步逻辑
二.ZAB协议如何处理需要丢弃的事务Proposal
一.ZAB协议在正常情况下的数据同步逻辑
完成Leader选举后,Leader会首先确认Follower是否完成数据同步,也就是Leader事务日志中的Proposal提议是否已被过半Follower提交。
Leader需要确保所有Follower能接收到每一条事务Proposal,并且能正确将所有已提交了的事务Proposal应用到内存数据库中。
Leader会为每个Follower都准备一个队列,并将那些没有被Follower同步的事务以Proposal提议的形式逐个发送给Follower,并在发送完每个Proposal提议消息后紧接着再发送一个Commit消息表示事务已提交。
等Follower将所有未同步的事务Proposal提议都同步并应用到内存数据库后,Leader就会将该Follower加入到真正的可用Follower列表中。
二.ZAB协议如何处理需要丢弃的事务Proposal
ZXID是一个64位的数字。低32位是一个递增计数器,Leader产生一个新Proposal都会对其+1。高32位是选举轮次的编号,每当选举产生一个新Leader都会对其+1。
4.ZAB协议与Paxos算法
(1)Paxos算法介绍
(2)联系和区别
(1)Paxos算法介绍
一.Paxos的三种角色
二.Paxos算法类似2PC的执行流程
三.Paxos的Learner如何学习被选定的value
四.Paxos算法如何保证活性
一.Paxos的三种角色
角色一:提案者(Proposer)
提出提案(Proposal),Proposal信息包括提案编号(ID)和提案的值(Value)。
角色二:决策者(Acceptor)
参与决策,回应Proposer的提案,收到提案后可以接受提案。若提案获得超半数Acceptor的接受,则称该提案被选定(批准)。
角色三:学习者(Learner)
不参与决策,从Proposer和Acceptor学习被选定的提案的值(Value)。
二.Paxos算法类似2PC的执行流程
下面是Paxos算法的概述:
与ZAB协议不同,Paxos算法处理来自客户端的事务请求时:
步骤一:首先会触发一个或多个服务器进程,向其他服务器发起提案
步骤二:然后其他服务器会向发起提案的服务器反馈提案的执行情况
步骤三:接着发起提案的服务器会对接收到的反馈信息进行统计
步骤四:当过半服务器批准该事务请求操作后,则可在本地执行提交
可见,Paxos算法对事务请求的投票过程与ZAB协议十分相似。但ZAB协议中发起投票的机器,是集群中运行的一台Leader服务器,而Paxos算法则是采用多副本的处理方式。也就是存在多个副本,每个副本分别包含提案者、决策者以及学习者。
下面是详细的Paxos两阶段提交算法:
阶段一.Prepare请求
步骤一:Proposer选择一个提案编号M,然后向半数以上的Acceptor发送编号为M的Prepare请求。
步骤二:如果一个Acceptor收到一个编号为M的Prepare请求,且M大于该Acceptor已经响应过的所有Prepare请求的编号,那么它就会将它已经批准过的编号最大的提案作为响应反馈给Proposer,同时该Acceptor承诺不再批准任何编号小于M的提案。
阶段二.Accept请求
步骤一:如果Proposer收到半数以上Acceptor,对其发出的编号为M的Prepare请求的响应,那么它就会发送一个针对[M, V]提案的Accept请求给半数以上的Acceptor。注意:V就是收到的响应中编号最大的提案的value。如果响应中不包含任何提案,那么V就由Proposer自己决定。
步骤二:如果Acceptor收到一个针对[M, V]提案的Accept请求,只要该Acceptor没有对编号大于M的Prepare请求做出过响应,它就批准该提案。
三.Paxos的Learner如何学习被选定的value
Learner获取提案,有三种方案:
四.Paxos算法如何保证活性
一个极端的活锁场景:
(2)Paxos与ZAB的联系和区别
相同之处:
一.两者都存在一个类似于Leader进程的角色,由Leader角色负责协调多个Follower进程的运行。
二.Leader进程都会等待超过半数的Follower做出正确的反馈,之后才会将一个提案进行提交。
三.都存在一个标识表示当前的Leader周期,比如ZAB是epoch、Paxos是Ballot。
不同之处:
一.ZAB协议中发起投票的机器是集群中运行的一台Leader服务器,Paxos算法则是采用多副本的处理方式。即存在多个副本,每个副本分别包含提案者、决策者以及学习者。
二.两者的设计目标不一样。ZAB协议主要用于构建一个高可用的分布式数据主从系统,Paxos算法主要用于构建一个分布式的一致性状态机系统。
5.zk的数据存储原理之内存数据
(1)DataNode
(2)DataTree和nodes
(3)ZKDatabase
从数据存储位置角度看,zk产生的数据可以分为内存数据和磁盘数据。从数据的种类和作用看,又可以分为事务日志数据和全量数据快照。
zk的数据模型是一棵树。zk存储了整棵树的内容,包括所有的节点路径、节点数据及ACL信息等。zk会定时将整棵树的数据存储到磁盘上。
(1)DataNode
DataNode是数据存储的最小单元,DataNode会保存:节点的数据内容、ACL列表、节点状态和子节点列表。
public class DataNode implements Record {
byte data[];//节点的数据内容
Long acl;//ACL列表
public StatPersisted stat;//节点状态
private Set<String> children = null;//子节点列表
...
}
(2)DataTree和nodes
DataTree是zk内存数据存储的核心,代表了内存中的一份完整数据,它不包含任何与网络、客户端连接以及请求处理相关的逻辑。DataTree用于存储所有zk节点的路径、内容及其ACL信息,它的核心存储结构是一个ConcurrentHashMap类型的nodes。
在DataTree.nodes这个Map中,存放了zk上所有的数据节点。对zk数据的所有操作,都是对DataTree.nodes这个Map进行操作的。DataTree.nodes的key是节点路径path,value是节点内容DataNode。DataTree.ephemerals专门存储了zk的临时节点,以便实时访问和及时清理。
public class DataTree {
//在DataTree.nodes这个Map中,存放了zk上所有的数据节点,包括节点路径、内容及ACL信息
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
//存储监听节点的WatchManager
private final WatchManager dataWatches = new WatchManager();
//存储监听子节点的WatchManager
private final WatchManager childWatches = new WatchManager();
//存储了zk的临时节点
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
...
public void createNode(final String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) {
...
String parentName = path.substring(0, lastSlash);
...
DataNode parent = nodes.get(parentName);
...
synchronized (parent) {
...
nodes.put(path, child);
...
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
...
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
}
public Set<String> getEphemerals(long sessionId) {
HashSet<String> retv = ephemerals.get(sessionId);
if (retv == null) {
return new HashSet<String>();
}
HashSet<String> cloned = null;
synchronized (retv) {
cloned = (HashSet<String>) retv.clone();
}
return cloned;
}
...
}
(3)ZKDatabase
ZKDatabase是zk的内存数据库,它负责管理zk的所有会话、DataTree存储和事务日志,它会定时向磁盘dump数据快照。在zk服务器启动时会通过磁盘上的事务日志和快照文件恢复ZKDatabase。
public class ZKDatabase {
protected DataTree dataTree;//zk的内存数据
protected FileTxnSnapLog snapLog;//数据文件管理器
...
public ZKDatabase(FileTxnSnapLog snapLog) {
dataTree = createDataTree();
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
...
}
...
}
6.zk的数据存储原理之事务日志
(1)事务日志的存储
(2)事务日志的写入之FileTxnLog的创建
(3)事务日志的写入之FileTxnLog写入日志的步骤
(4)事务日志截断
(1)事务日志的存储
部署zk集群时需要默认配置一个目录dataDir,用于存储事务日志文件。zk中也可以为事务日志单独分配一个文件存储目录dataLogDir。
zk的事务日志文件都具有两个特点:
一.文件大小都是64M
二.文件名后缀都是一个十六进制的、写入文件的第一条事务记录的ZXID
使用ZXID作为后缀,可迅速定位某一个事务操作所在的事务日志文件。由于ZXID的高32位当表当前Leader周期,低32位代表事务操作的计数器。所以将ZXID作为文件后缀,可以清楚看出当前运行的zk的Leader周期。
(2)事务日志的写入之FileTxnLog的创建
zk会通过FileTxnLog类来实现事务日志的写入操作。zk服务端启动时会先创建数据管理器FileTxnSnapLog,在FileTxnSnapLog的构造方法中便会创建FileTxnLog实例。
public class QuorumPeerMain {
protected QuorumPeer quorumPeer;
...
//1.启动程序入口
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
//启动程序
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
...
}
LOG.info("Exiting normally");
System.exit(0);
}
protected void initializeAndRun(String[] args) {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//2.解析配置文件
config.parse(args[0]);
}
//3.创建和启动历史文件清理器
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//4.根据配置判断是集群模式还是单机模式
if (args.length == 1 && config.isDistributed()) {
//集群模式
runFromConfig(config);
} else {
//单机模式
ZooKeeperServerMain.main(args);
}
}
public void runFromConfig(QuorumPeerConfig config) {
...
ServerCnxnFactory cnxnFactory = null;
if (config.getClientPortAddress() != null) {
//1.创建网络连接工厂实例ServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
//2.初始化网络连接工厂实例ServerCnxnFactory
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
}
//接下来就是初始化集群版服务器实例QuorumPeer
//3.创建集群版服务器实例QuorumPeer
quorumPeer = getQuorumPeer();
//4.创建zk数据管理器FileTxnSnapLog
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
...
//5.创建并初始化内存数据库ZKDatabase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
...
quorumPeer.initialize();
//6.初始化集群版服务器实例QuorumPeer
quorumPeer.start();
//join方法会将当前线程挂起,等待QuorumPeer线程结束后再执行当前线程
quorumPeer.join();
}
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer();
}
}
public class FileTxnSnapLog {
private final File dataDir;//事务日志文件
private final File snapDir;//数据快照文件
private TxnLog txnLog;//事务日志实例
private SnapShot snapLog;//快照日志实例
...
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
...
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
...
//创建FileTxnLog事务日志管理器的实例
txnLog = new FileTxnLog(this.dataDir);
//创建FileSnap数据快照管理器的实例
snapLog = new FileSnap(this.snapDir);
}
...
}
(3)事务日志的写入之FileTxnLog写入日志的步骤
一.确定是否有事务日志文件可写,如果没有就创建一个事务日志文件
二.确定事务日志文件是否需要扩容--即预分配
三.对事务头和事务体进行序列化生成一个字节数组
四.生成Checksum来保证事务日志文件的完整性和数据的准确性
五.将序列化后的事务头、事务体和Checksum写入到文件流中
六.SyncRequestProcessor处理器会触发将事务日志刷入磁盘
FileTxnLog中进行事务日志的写入操作是由append()方法来负责的。
一.确定是否有事务日志文件可写,如果没有就创建一个事务日志文件
当zk服务器启动完需要进行第一次事务日志写入或上一个事务日志写满时,zk服务器都不会和任意一个事务日志文件进行关联。所以在进行事务日志写入前,FileTxnLog会先判断logStream是否为空,来判断FileTxnLog实例是否已经关联上一个可写的事务日志文件。
如果为空则根据该事务操作相关的ZXID作为后缀来创建一个事务日志文件,然后构建事务日志文件头信息:魔数 + 版本号 + dbid,接着将事务日志文件头信息写入到事务日志文件流中,最后将事务日志文件流fos添加到streamsToFlush。
二.确定事务日志文件是否需要扩容--即预分配
当检测到当前事务日志文件剩余空间不足4096字节时,就会开始进行扩容。扩容过程就是在现有文件大小的基础上,将文件增加64M+使用0进行填充。由于事务日志的写入过程可以看成是一个磁盘IO过程,所以文件的写入操作会触发磁盘IO为文件开辟新的磁盘块,即磁盘Seek。为了避免磁盘Seek频繁出现,zk在创建文件初就预分配一个64M磁盘块。一旦已分配的文件空间不足4K时,那么将会再次预分配,从而避免每次事务日志写入时由于文件大小的增长而带来的Seek开销。
三.对事务头和事务体进行序列化生成一个字节数组
事务序列化包括对事务头TxnHeader和事务体Record的序列化。
四.生成Checksum来保证事务日志文件的完整性和数据的准确性
为了保证事务日志文件的完整性和数据的准确性,在写入事务日志到文件前,会根据序列化的字节数组来计算Checksum。
五.将序列化后的事务头、事务体和Checksum写入到文件流中
由于zk使用BufferedOutputStream,此时写入的数据并非写入到文件。
六.SyncRequestProcessor处理器会触发将事务日志刷入磁盘
前面的步骤已经将事务操作日志写入了文件流中,但由于缓存的原因,这些事务操作日志还无法实时地写入磁盘文件中。因此zk会通过SyncRequestProcessor处理器发起事务日志刷盘操作,最终会调用到FileTxnLog的commit()方法来将事务日志刷入磁盘。也就是当需要刷盘的事务请求达到1000个时,才发起强制刷盘操作。FileTxnLog的commit()方法会从streamsToFlush中提取出文件流,然后调用FileChannel的force()方法强制将数据刷入磁盘文件中,而FileChannel的force()方法会调用到底层的fsync接口。
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
private final ZooKeeperServer zks;
private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
private final LinkedList<Request> toFlush = new LinkedList<Request>();
private final Random r = new Random();
private static int snapCount = ZooKeeperServer.getSnapCount();
...
@Override
public void run() {
int logCount = 0;
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
...
if (si != null) {
//将事务请求写入到事务日志文件
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
//切换事务日志文件
zks.getZKDatabase().rollLog();
//每切换一个事务日志文件就尝试启动一个线程进行数据快照
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
zks.takeSnapshot();
}
};
snapInProcess.start();
}
logCount = 0;
}
}
...
toFlush.add(si);
//当需要强制刷盘的请求达到1000个时,就发起批量刷盘操作
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
}
private void flush(LinkedList<Request> toFlush) {
//将事务日志刷入磁盘
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
...
}
public class ZKDatabase {
protected FileTxnSnapLog snapLog;
...
//将事务请求写入到事务日志中
public boolean append(Request si) throws IOException {
return this.snapLog.append(si);
}
//切换事务日志文件
public void rollLog() throws IOException {
this.snapLog.rollLog();
}
//将事务日志刷入磁盘
public void commit() throws IOException {
this.snapLog.commit();
}
...
}
public class FileTxnSnapLog {
private TxnLog txnLog;
...
public boolean append(Request si) throws IOException {
return txnLog.append(si.getHdr(), si.getTxn());
}
public void commit() throws IOException {
txnLog.commit();
}
public void rollLog() throws IOException {
txnLog.rollLog();
}
...
}
public class FileTxnLog implements TxnLog, Closeable {
//和一个事务日志文件相关联的输出流
volatile BufferedOutputStream logStream = null;
//用来记录当前需要强制进行数据落盘的文件流
private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
...
//传入的hdr是事务头,传入的txn是事务体
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException {
if (hdr == null) {
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
//1.确定是否有事务日志文件可写,如果没有就创建一个事务日志文件
if (logStream == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
//使用ZXID作为后缀创建一个事务日志文件
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
//将创建的事务日志文件和logStream进行关联
logStream = new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//构建事务日志文件头:魔数 + 版本号 + dbid
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
//将事务日志文件头写入到事务日志文件流中
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
//设置当前文件写入大小
filePadding.setCurrentSize(fos.getChannel().position());
//将事务日志文件流fos放入streamsToFlush集合,以便后续进行强制数据落盘
streamsToFlush.add(fos);
}
//2.确定事务日志文件是否需要扩容--即预分配
filePadding.padFile(fos.getChannel());
//3.对事务头hdr和事务体txn进行序列化生成一个字节数组
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " + "and txn");
}
//4.生成Checksum来保证事务日志文件的完整性和数据的准确性
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
//5.写入事务日志文件流
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
return true;
}
//切换事务日志文件
public synchronized void rollLog() throws IOException {
if (logStream != null) {
this.logStream.flush();
this.logStream = null;
oa = null;
}
}
//将事务日志刷入磁盘
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
if(serverStats != null) {
serverStats.incrementFsyncThresholdExceedCount();
}
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
...
}
public class FilePadding {
private static long preAllocSize = 65536 * 1024;
private long currentSize;//当前文件写入大小
private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
...
long padFile(FileChannel fileChannel) throws IOException {
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
//如果当前事务日志文件的剩余空间不足4096字节
if (preAllocSize > 0 && position + 4096 >= fileSize) {
// If we have written more than we have previously preallocated we need to make sure the new file size is larger than what we already have
if (position > fileSize) {
fileSize = position + preAllocSize;
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
...
}
(4)事务日志截断
为了避免某Learner机器上的事务ID比Leader的还要大(peerLastZxid),只要集群中存在Leader,所有机器都必须与该Leader数据保持同步。
因此,只要发现一台Learner机器出现这样的情况,Leader就会发送TRUNC命令给该Learner,要求进行事务日志截断。该Learner收到命令后,就会删除所有大于peerLastZxid的事务日志文件。