zk源码-7.ZAB协议和数据存储二
大纲
1.两阶段提交Two-Phase Commit(2PC)
2.三阶段提交Three-Phase Commit(3PC)
3.ZAB协议算法
4.ZAB协议与Paxos算法
5.zk的数据存储原理之内存数据
6.zk的数据存储原理之事务日志
7.zk的数据存储原理之数据快照
8.zk的数据存储原理之数据初始化和数据同步流程
7.zk的数据存储原理之数据快照
(1)文件存储
(2)数据快照过程
(1)文件存储
一.数据快照文件和事务日志文件的命名规则一样
二.数据快照文件没有采用事务日志文件中的预分配机制
一.数据快照文件和事务日志文件的命名规则一样
数据快照文件也是使用ZXID的十六进制来作为文件名后缀,数据快照文件名的后缀标识了本次数据快照开始时的服务器最新ZXID。在数据恢复阶段,zk会根据该ZXID来确定数据恢复的起始点。
二.数据快照文件没有采用事务日志文件中的预分配机制
所以不会像事务日志文件那样,文件内容中包含大量的0。由于每个数据快照文件中的所有内容都是有效的,因此数据快照文件的大小能一定程度反映当前zk内存中全量数据的大小。
(2)数据快照过程
一.确定是否需要进行数据快照
二.切换事务日志文件
三.创建数据快照异步线程
四.获取全量数据和会话信息
五.生成数据快照文件名
六.执行FileSnap.serialize方法进行数据序列化
zk会将客户端的每一次事务操作都记录到事务日志中。zk在进行若干次事务日志记录后,会将内存的全量数据Dump到文件中,这个过程就是数据快照。可以使用snapCount参数来配置每次数据快照间的事务操作次数,也就是zk会在snapCount次事务日志记录后执行一次数据快照。
FileSnap负责维护数据快照文件对外的接口,包括数据快照的写入和读取,数据快照的过程如下:
一.确定是否需要进行数据快照
每执行一次事务日志记录后,zk都会检测当前是否需要进行数据快照。理论上进行snapCount次事务操作后就会开始进行数据快照。但考虑到数据快照会对所在机器的整体性能造成一定影响,所以需要尽量避免zk集群中的所有机器都在同一时刻进行数据快照。因此在zk的具体实现中,会采取"过半随机"策略来进行数据快照。
logCount > (snapCount / 2 + randRoll)
其中logCount代表当前已经记录的事务日志数量,而randRoll代表1到snapCount/2之间的随机数。所以如果snapCount配置为10000,那么zk会在5000到10000次事务日志记录后进行一次数据快照。
二.切换事务日志文件
也就是当前的事务日志已经写满了,已经写入了snapCount条事务日志,需要重新创建一个新的事务日志文件。
三.创建数据快照异步线程
为保证数据快照不影响z的主流程,会创建一个异步线程来进行数据快照。
四.获取全量数据和会话信息
数据快照本质上就是将内存中的所有节点信息和会话信息保存到磁盘中,所以会先通过ZKDatabase的getDataTree()方法获取到DataTree,然后再通过ZKDatabase的getSessionWithTimeOuts()方法获取会话信息。
五.生成数据快照文件名
zk会根据当前已提交的最大ZXID来生成数据快照文件名。
六.执行FileSnap.serialize方法进行数据序列化
首先会序列化文件头信息:魔数 + 版本号 + dbid,然后序列化DataTree和会话信息,接着生成一个Checksum,再一起写入数据快照文件中。
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
private final ZooKeeperServer zks;
private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
private final LinkedList<Request> toFlush = new LinkedList<Request>();
private final Random r = new Random();
private static int snapCount = ZooKeeperServer.getSnapCount();
...
@Override
public void run() {
int logCount = 0;
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
...
if (si != null) {
//将事务请求写入到事务日志文件
if (zks.getZKDatabase().append(si)) {
logCount++;//logCount代表当前已经记录的事务日志数量
//1.确定是否需要进行数据快照,采取"过半随机"策略进行数据快照
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);//randRoll代表1到snapCount/2之间的随机数
//2.切换事务日志文件
zks.getZKDatabase().rollLog();
//每切换一个事务日志文件就尝试启动一个线程进行数据快照
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
//3.创建数据快照异步线程
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
//进行数据快照
zks.takeSnapshot();
}
};
snapInProcess.start();
}
logCount = 0;
}
}
...
toFlush.add(si);
//当需要强制刷盘的请求达到1000个时,就发起批量刷盘操作
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
}
...
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
...
public void takeSnapshot() {
//进行数据快照
takeSnapshot(false);
}
//进行数据快照
public void takeSnapshot(boolean syncSnap){
//4.获取全量数据和会话信息
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
}
...
}
public class FileTxnSnapLog {
private SnapShot snapLog;
...
public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap) {
//获取内存数据库最后处理的ZXID
long lastZxid = dataTree.lastProcessedZxid;
//5.生成数据快照文件名
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
//6.开始数据序列化
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
}
...
}
public class FileSnap implements SnapShot {
...
//serialize the datatree and session into the file snapshot
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) {
if (!close) {
try (CheckedOutputStream crcOut = new CheckedOutputStream(
new BufferedOutputStream(
fsync ? new AtomicFileOutputStream(snapShot) : new FileOutputStream(snapShot)),
new Adler32())
) {
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
//构造文件头对象
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
//序列化文件头 + DataTree + 会话信息
serialize(dt, sessions, oa, header);
//生成一个Checksum
long val = crcOut.getChecksum().getValue();
//写入数据快照文件中
oa.writeLong(val, "val");
oa.writeString("/", "path");
crcOut.flush();
lastSnapshotInfo = new SnapshotInfo(Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX), snapShot.lastModified() / 1000);
}
} else {
throw new IOException("FileSnap has already been closed");
}
}
protected void serialize(DataTree dt,Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException {
if (header == null)
throw new IllegalStateException("Snapshot's not open for writing: uninitialized header");
//序列化文件头信息:魔数 + 版本号 + dbid
header.serialize(oa, "fileheader");
//序列化DataTree和会话信息
SerializeUtils.serializeSnapshot(dt,oa,sessions);
}
...
}
8.zk的数据存储原理之数据初始化和数据同步流程
(1)zk的数据初始化流程
(2)zk的数据同步流程
(1)zk的数据初始化流程
一.初始化FileTxnSnapLog
二.初始化ZKDatabase
三.创建PlayBackListener监听器
四.开始处理数据快照文件
五.获取最新的100个数据快照文件
六.逐个解析数据快照文件
七.根据数据快照文件的文件名获取最新的ZXID
八.开始处理事务日志文件
九.获取所有最新ZXID之后提交的事务
十.进行事务应用
十一.获取最新ZXID
十二.校验epoch
zk的数据初始化过程,其实就是从磁盘中加载数据的过程,包括从数据快照文件中加载数据和根据事务日志文件来订正数据两个过程。
一.初始化FileTxnSnapLog
FileTxnSnapLog是zk的事务日志和数据快照访问层,FileTxnSnapLog是用来衔接上层业务和底层数据存储的。底层数据存储包含了事务日志和数据快照两部分。因此FileTxnSnapLog的初始化包括FileTxnLog和FileSnap的初始化,分别代表事务日志管理器和数据快照管理器的初始化。
二.初始化ZKDatabase
完成FileTxnSnapLog的初始化,就完成了zk服务器和底层数据存储的对接。接下来就会初始化ZKDatabase:首先会创建一个初始化的DataTree,然后创建一个用于保存客户端会话超时时间的记录器,接着将初始化好的FileTxnSnapLog交给ZKDatabase。
三.创建PlayBackListener监听器
PlayBackListener监听器主要用来接收事务应用过程中的回调。在zk数据恢复后期,会有一个事务订正的过程。这个过程中,就会回调PlayBackListener监听器来进行数据订正。
四.开始处理数据快照文件
完成内存数据库ZKDatabase的初始化后,就可以从磁盘中恢复数据了。
五.获取最新的100个数据快照文件
不能只获取最新的那个数据快照文件,因为有可能该文件是不可用的。
六.逐个解析数据快照文件
获取到这最多100个的最新数据快照文件后,zk会开始逐个解析。首先会对数据快照文件里的二进制数据反序列化,然后对文件进行CheckSum校验以确定快照文件的正确性。如果校验通过,那么就可以完成解析了。也就是说:只有当最新的数据快照文件不可用时,才会逐个进行解析。如果解析完这100个文件都无法恢复一个完整的DataTree和Session集合,则认为无法从磁盘中加载数据,服务器启动失败。
七.根据数据快照文件的文件名获取最新的ZXID
此时已基于数据快照文件构建了一个完整的DataTree实例和Session集合,所以接着会根据这个数据快照文件的文件名来解析出一个最新的ZXID。此时zk服务器的内存数据库已有一份近似全量的数据了,已完成从数据快照文件中加载数据的过程。
八.开始处理事务日志文件
由于此时zk服务器的内存数据库已有一份近似全量的数据了,所以接下来要进行根据事务日志文件来订正数据的过程。
九.获取所有最新ZXID之后提交的事务
根据恢复的内存数据库的最新ZXID,从事务日志文件中获取该ZXID之后提交的所有事务。
十.进行事务应用
将获取到的事务应用到基于数据快照文件恢复出的DataTree和Session中。每当有一个事务被应用到内存数据库后,需要回调PlayBackListener监听。以便将该事务操作记录转换成Proposal保存到ZKDatabase.commitedLog,让Learner可以进行快速同步。
十一.获取最新ZXID
十二.校验epoch
public class QuorumPeerMain {
protected QuorumPeer quorumPeer;
...
public void runFromConfig(QuorumPeerConfig config) {
...
quorumPeer = getQuorumPeer();
//1.创建zk数据管理器FileTxnSnapLog
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
...
//2.创建并初始化内存数据库ZKDatabase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
...
//初始化集群版服务器实例QuorumPeer
quorumPeer.start();
...
}
}
//1.初始化FileTxnSnapLog
public class FileTxnSnapLog {
private TxnLog txnLog;//事务日志实例
private SnapShot snapLog;//快照日志实例
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
...
//创建FileTxnLog事务日志管理器的实例
txnLog = new FileTxnLog(this.dataDir);
//创建FileSnap数据快照管理器的实例
snapLog = new FileSnap(this.snapDir);
}
...
}
//2.初始化ZKDatabase
public class ZKDatabase {
//保存zk所有节点的DataTree
protected DataTree dataTree;
//会话超时时间记录器
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//数据存储管理器
protected FileTxnSnapLog snapLog;
...
//3.创建PlayBackListener监听器
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn){
addCommittedProposal(hdr, txn);
}
};
...
public ZKDatabase(FileTxnSnapLog snapLog) {
//首先会创建一个初始化的DataTree
dataTree = createDataTree();
//创建一个用于保存所有客户端会话超时时间的记录器
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
//将初始化好的FileTxnSnapLog交给ZKDatabase
this.snapLog = snapLog;
...
}
...
}
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
private ZKDatabase zkDb;
...
@Override
public synchronized void start() {
...
//4.从磁盘加载数据到内存
loadDataBase();
...
}
private void loadDataBase() {
//从磁盘加载数据到内存
zkDb.loadDataBase();
...
}
}
public class ZKDatabase {
protected FileTxnSnapLog snapLog;
public long loadDataBase() throws IOException {
//从磁盘中恢复数据
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
...
}
public class FileTxnSnapLog {
...
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
//4.处理快照文件
long deserializeResult = snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
RestoreFinalizer finalizer = () -> {
//8.处理事务日志,会根据事务日志订正数据
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
return highestZxid;
};
if (-1L == deserializeResult) {
if (txnLog.getLastLoggedZxid() != -1) {
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
return finalizer.run();
}
}
save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
return 0;
}
return finalizer.run();
}
...
}
public class FileSnap implements SnapShot {
...
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
//5.获取最新的100个数据快照文件
List<File> snapList = findNValidSnapshots(100);
...
File snap = null;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
//6.逐个解析数据快照文件,直到某个数据快照文件可用
snap = snapList.get(i);
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
//反序列化生成DataTree对象和SessionsWithTimeouts集合
deserialize(dt, sessions, ia);
//对文件进行CheckSum校验以确定快照文件的正确性
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
}
}
...
//7.根据数据快照文件的文件名获取最新的ZXID
//此时,zk服务器内存已有一份近似全量的数据了,至此完成了从数据快照文件中加载快照数据的过程
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
return dt.lastProcessedZxid;
}
...
}
public class FileTxnSnapLog {
...
//8.处理事务日志,接下来根据事务日志订正数据
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
//9.根据恢复的内存数据库的最新ZXID,从事务日志中获取该ZXID之后提交的所有事务
TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
while (true) {
hdr = itr.getHeader();
if (hdr == null) {
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
//10.事务应用
processTransaction(hdr,dt,sessions, itr.getTxn());
//10.回调PlayBackListener监听器
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next()) break;
}
return highestZxid;
}
...
}
public class FileSnap implements SnapShot {
...
public TxnIterator read(long zxid) throws IOException {
return read(zxid, true);
}
public TxnIterator read(long zxid, boolean fastForward) throws IOException {
return new FileTxnIterator(logDir, zxid, fastForward);
}
...
}
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
...
private void loadDataBase() {
//从磁盘加载数据到内存
zkDb.loadDataBase();
...
//11.获取最新ZXID
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
...
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
...
//12.校验epoch
if (epochOfZxid > currentEpoch) {
File currentTmp = new File(getTxnFactory().getSnapDir(), CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
if (currentTmp.exists()) {
long epochOfTmp = readLongFromFile(currentTmp.getName());
setCurrentEpoch(epochOfTmp);
} else {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
}
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
...
}
...
}
(2)zk的数据同步流程
一.Leader获取Learner状态
二.Leader进行数据同步初始化
三.Leader在Learner完成注册后解析Learner最后处理的ZXID
四.Leader执行LearnerHandler.syncFollower方法决定以那种方式进行同步
当集群完成Leader选举后,Learner会向Leader进行注册。当Learner向Leader完成注册后,就会进入数据同步环节。数据同步过程就是:Leader将那些没有在Learner提交过的事务请求同步给Learner。
一.Leader获取Learner状态
在Learner向Leader注册的最后阶段,Learner会发送Leader一个ACK消息。Leader会从该消息中解析出该Learner的currentEpoch和lastZxid。
二.Leader进行数据同步初始化
在开始数据同步前,Leader会进行数据同步初始化。首先会从zk的内存数据库中提取出事务请求对应的提议缓存队列,然后完成对以下三个ZXID值的初始化:
peerLastZxid:Learner最后处理的ZXID
minCommittedLog:Leader的提议缓存队列committedLog中最小ZXID
maxCommittedLog:Leader的提议缓存队列committedLog中最大ZXID
注意:数据初始化流程中回调PlayBackListener监听,就会触发数据同步流程中的初始化环节:添加请求到提议缓存队列等。
三.Leader在Learner完成注册后解析Learner最后处理的ZXID
四.Leader执行LearnerHandler.syncFollower方法决定以那种方式进行同步
方式一:直接差异化同步(DIFF同步),peerLastZxid介于minCommittedLog与maxCommittedLog之间。
方式二:先回滚再差异化同步(TRUNC + DIFF同步),peerLastZxid介于minCommittedLog与maxCommittedLog之间,但是Leader虽已将事务记录到了本地事务日志文件,却没能发起Proposal流程就挂了。
方式三:仅回滚同步(TRUNC同步),peerLastZxid大于maxCommittedLog。
方式四:全量同步(SNAP同步),peerLastZxid小于minCommittedLog,或者Leader没有提议缓存队列且peerLastZxid不等于Leader.lastProcessZxid。
public class ZKDatabase {
//minCommittedLog是Leader的提议缓存队列committedLog中最小ZXID
//maxCommittedLog是Leader的提议缓存队列committedLog中最大ZXID
protected long minCommittedLog, maxCommittedLog;
//Leader的提议缓存队列committedLog
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
...
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn){
addCommittedProposal(hdr, txn);
}
};
private void addCommittedProposal(TxnHeader hdr, Record txn) {
Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
addCommittedProposal(r);
}
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
byte[] data = SerializeUtils.serializeRequest(request);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
//将事务请求加入提议缓存队列
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
...
}
public class LearnerHandler extends ZooKeeperThread {
final Leader leader;
//ZooKeeper server identifier of this learner
protected long sid = 0;
protected final Socket sock;
private BinaryInputArchive ia;
private BinaryOutputArchive oa;
//The packets to be sent to the learner
final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
...
@Override
public void run() {
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.initLimit + leader.self.syncLimit;
//将ia和oa与Socket进行绑定
//以便当Leader通过oa发送LeaderInfo消息给Learner时,可以通过ia读取到Learner的ackNewEpoch响应
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
byte learnerInfoData[] = qp.getData();
...
//根据LearnerInfo信息解析出Learner的SID
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
...
//根据Learner的ZXID解析出对应Learner的epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long zxid = qp.getZxid();
//将Learner的epoch和Leader的epoch进行比较
//如果Learner的epoch更大,则更新Leader的epoch为Learner的epoch + 1
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
...
//发送一个包含该epoch的LeaderInfo消息给该LearnerHandler对应的Learner
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
//发送包含该epoch的LeaderInfo消息后等待Learner响应
//读取Learner返回的ackNewEpoch响应
ia.readRecord(ackEpochPacket, "packet");
...
//等待过半Learner响应
leader.waitForEpochAck(this.getSid(), ss);
...
//解析Learner最后处理的ZXID,接下来执行与Learner的数据同步
peerLastZxid = ss.getLastZxid();
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
if (needSnap) {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
}
LOG.debug("Sending NEWLEADER message to " + sid);
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Start thread that blast packets in the queue to learner
startSendingPackets();
//Have to wait for the first ACK, wait until the leader is ready, and only then we can start processing messages.
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
//阻塞等待过半Learner完成数据同步,接下来就可以启动QuorumPeer服务器实例了
leader.waitForNewLeaderAck(getSid(), qp.getZxid());
...
while (true) {
//这里有关于Leader和Learner之间保持心跳的处理
}
}
public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
//Keep track of the latest zxid which already queued
long currentZxid = peerLastZxid;
boolean needSnap = true;
boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock = db.getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
long maxCommittedLog = db.getmaxCommittedLog();
long minCommittedLog = db.getminCommittedLog();
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
if (db.getCommittedLog().isEmpty()) {
minCommittedLog = lastProcessedZxid;
maxCommittedLog = lastProcessedZxid;
}
if (forceSnapSync) {
//Force leader to use snapshot to sync with follower
LOG.warn("Forcing snapshot sync - should not see this in production");
} else if (lastProcessedZxid == peerLastZxid) {
//Follower is already sync with us, send empty diff
LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) + " for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, peerLastZxid);
needOpPacket = false;
needSnap = false;
} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
//Newer than committedLog, send trunc and done
//方式三:仅回滚同步(TRUNC同步),peerLastZxid大于maxCommittedLog
LOG.debug("Sending TRUNC to follower zxidToSend=0x" + Long.toHexString(maxCommittedLog) + " for peer sid:" + getSid());
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
} else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
//Follower is within commitLog range
LOG.info("Using committedLog for peer sid: " + getSid());
Iterator<Proposal> itr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
needSnap = false;
} else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
//方式四:全量同步(SNAP同步),peerLastZxid小于minCommittedLog
//Use txnlog and committedLog to sync
//Calculate sizeLimit that we allow to retrieve txnlog from disk
long sizeLimit = db.calculateTxnLogSizeLimit();
//This method can return empty iterator if the requested zxid is older than on-disk txnlog
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
if (txnLogItr.hasNext()) {
LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
needSnap = false;
}
//closing the resources
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
} else {
LOG.warn("Unhandled scenario for peer sid: " + getSid());
}
LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) + " for peer sid: " + getSid());
leaderLastZxid = leader.startForwarding(this, currentZxid);
} finally {
rl.unlock();
}
if (needOpPacket && !needSnap) {
// This should never happen, but we should fall back to sending
// snapshot just in case.
LOG.error("Unhandled scenario for peer sid: " + getSid() + " fall back to use snapshot");
needSnap = true;
}
return needSnap;
}
rivate void queueOpPacket(int type, long zxid) {
QuorumPacket packet = new QuorumPacket(type, zxid, null, null);
queuePacket(packet);
}
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
}
protected long queueCommittedProposals(Iterator<Proposal> itr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid)
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
long queuedZxid = peerLastZxid;
//as we look through proposals, this variable keeps track of previous proposal Id.
long prevProposalZxid = -1;
while (itr.hasNext()) {
Proposal propose = itr.next();
long packetZxid = propose.packet.getZxid();
//abort if we hit the limit
if ((maxZxid != null) && (packetZxid > maxZxid)) {
break;
}
//skip the proposals the peer already has
if (packetZxid < peerLastZxid) {
prevProposalZxid = packetZxid;
continue;
}
//If we are sending the first packet, figure out whether to trunc or diff
if (needOpPacket) {
//Send diff when we see the follower's zxid in our history
if (packetZxid == peerLastZxid) {
LOG.info("Sending DIFF zxid=0x" + Long.toHexString(lastCommittedZxid) + " for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
continue;
}
if (isPeerNewEpochZxid) {
//Send diff and fall through if zxid is of a new-epoch
LOG.info("Sending DIFF zxid=0x" + Long.toHexString(lastCommittedZxid) + " for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
} else if (packetZxid > peerLastZxid ) {
//Peer have some proposals that the leader hasn't seen yet
//it may used to be a leader
if (ZxidUtils.getEpochFromZxid(packetZxid) != ZxidUtils.getEpochFromZxid(peerLastZxid)) {
//We cannot send TRUNC that cross epoch boundary.
//The learner will crash if it is asked to do so.
//We will send snapshot this those cases.
LOG.warn("Cannot send TRUNC to peer sid: " + getSid() + " peer zxid is from different epoch" );
return queuedZxid;
}
LOG.info("Sending TRUNC zxid=0x" + Long.toHexString(prevProposalZxid) + " for peer sid: " + getSid());
queueOpPacket(Leader.TRUNC, prevProposalZxid);
needOpPacket = false;
}
}
if (packetZxid <= queuedZxid) {
// We can get here, if we don't have op packet to queue
// or there is a duplicate txn in a given iterator
continue;
}
//Since this is already a committed proposal, we need to follow it by a commit packet
queuePacket(propose.packet);
queueOpPacket(Leader.COMMIT, packetZxid);
queuedZxid = packetZxid;
}
if (needOpPacket && isPeerNewEpochZxid) {
//We will send DIFF for this kind of zxid in any case.
//This if-block is the catch when our history older than learner and there is no new txn since then.
//So we need an empty diff
LOG.info("Sending DIFF zxid=0x" + Long.toHexString(lastCommittedZxid) + " for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
}
return queuedZxid;
}
...
}