zookeeper Watcher
目录
- Watcher的一些常识
- Watcher是什么
- Watcher怎么用
- Watcher特性
- 回顾回调&观察者模式&发布订阅模式
- Zookeeper 客户端/ 服务端 watcher机制
- 流程说明
- getChildren 并注册watcher的例子流程说明
Watcher的一些常识
Watcher是什么
ZK中引入Watcher机制来实现分布式的通知功能。
ZK允许客户端向服务端注册一个Watcher监听,当服务点的的指定事件触发监听时,那么服务端就会向客户端发送事件通知,以便客户端完成逻辑操作(即客户端向服务端注册监听,并将watcher对象存在客户端的Watchermanager中
服务端触发事件后,向客户端发送通知,客户端收到通知后从wacherManager中取出对象来执行回调逻辑)
Watcher怎么用
参考:https://blog.csdn.net/qq_26437925/article/details/145715160 中的一个例子代码
判断特定路径,添加了Watcher 对其中的结点删除事件做出对应处理。
zk.exists(preNodePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
// 前一个节点删除了释放锁了,就唤醒本结点
synchronized (node) {
node.notify();
}
}
}
});
可以查看Watcher接口如下:
public class WatchedEvent {
private final KeeperState keeperState; //用于记录Event发生时的zk状态(通知状态
private final EventType eventType; // 记录Event的类型
private String path;
Watcher特性
- 一次性:一旦一个watcher被触发,ZK都会将其从相应的的存储中移除,所以watcher是需要每注册一次,才可触发一次。
- 客户端串行执行:客户端watcher回调过程是一个串行同步的过程
- 轻量:watcher数据结构中只包含:通知状态、事件类型和节点路径
之所以这么说,其实可以在源码实现中找到:
org.apache.zookeeper.server.WatchManager#triggerWatch
,此为真正出发watcher执行的逻辑, 后文将再次说明。
可以看到watcher执行一次就删除了,for循环逐一执行的。
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#process
w.process(e);
}
return watchers;
}
回顾回调&观察者模式&发布订阅模式
- 回调的思想
- 类A的a()方法调用类B的b()方法
- 类B的b()方法执行完毕主动调用类A的callback()方法
- 观察者模式
- 发布订阅,对比 观察者模式
Zookeeper 客户端/ 服务端 watcher机制
流程说明
- 一般客户端调用exists/getData/getChildren注册监听,其中请求会封装为zookeeper内部的协议packet,添加到outgoingQueue队列中
- 然后客户端内部的SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端
- 客户端发给服务端了,等待服务端响应的packet结合放到了SendThread内部的pendingQueue队列中
- 客户端会将watcher存储到WatchManager的
watchTable
和watch2Paths
中,缓存一样; 服务端仅仅只是保存了当前连接的 ServerCnxn 对象(ServerCnxn是服务端与客户端进行网络交互的一个NIO接口,代表了客户端与服务端的连接)
/**
* Interface to a Server connection - represents a connection from a client
* to the server.
*/
public abstract class ServerCnxn implements Stats, Watcher {
- 这样当指定的节点发生相关的事件时,即要处理此请求了,会判断是否有watcher,注册到WatchManager, 最后会调用WatchManager的triggerWatch方法触发相关的事件。
- triggerWatch就是客户端从之前WatchManager缓存取出wacther执行的过程
举例org.apache.zookeeper.server.FinalRequestProcessor#handleGetDataRequest
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
String path = getDataRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
}
-
客户端有一个专门处理watcher时间的线程EventThread,其保持了一个待处理事件的队列。它根据传递的事件的类型和节点信息,从客户端的ZKWatcherManager中取出相关的watcher,将其添加到EventThread事件队列中,并在去run方法中不断取出watcher事件进行处理。
-
通过将节点信息和事件类型进行封装成为watchedevent,并查找到到对应节点的注册的watcher,然后调用watcher的回调方法process进行处理。
getChildren 并注册watcher的例子流程说明
/**
* Return the list of the children of the node of the given path.
* <p>
* If the watch is non-null and the call is successful (no exception is thrown),
* a watch will be left on the node with the given path. The watch willbe
* triggered by a successful operation that deletes the node of the given
* path or creates/delete a child under the node.
* <p>
* The list of children returned is not sorted and no guarantee is provided
* as to its natural or lexical order.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
*
* @param path
* @param watcher explicit watcher
* @return an unordered array of children of the node with the given path
* @throws InterruptedException If the server transaction is interrupted.
* @throws KeeperException If the server signals an error with a non-zero error code.
* @throws IllegalArgumentException if an invalid path is specified
*/
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = new GetChildrenRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildrenResponse response = new GetChildrenResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
发送请求给服务端
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 客户端与服务端的网络传输
ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);
synchronized(packet) {
while(!packet.finished) {
packet.wait();
}
return r;
}
}
ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {
ClientCnxn.Packet packet = null;
LinkedList var11 = this.outgoingQueue;
synchronized(this.outgoingQueue) {
// 传输的对象都包装成Packet对象
packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (this.state.isAlive() && !this.closing) {
if (h.getType() == -11) {
this.closing = true;
}
// 放入发送队列中,等待发送
this.outgoingQueue.add(packet);
} else {
this.conLossPacket(packet);
}
}
this.sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
outgoingQueue的处理
服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest
处理
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenRequest);
DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
// 返回children,
// 这里根据客户端设置的是否有watch变量来传入watcher对象
// 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接)
List<String> children = zks.getZKDatabase().getChildren(
getChildrenRequest.getPath(), null, getChildrenRequest
.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
break;
}
将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTable
和watch2Paths
中
public List<String> getChildren(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
if (stat != null) {
n.copyStat(stat);
}
List<String> children=new ArrayList<String>(n.getChildren());
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
return children;
}
}
- 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中
/**
* This class services the outgoing request queue and generates the heart
* beats. It also spawns the ReadThread.
*/
class SendThread extends ZooKeeperThread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;
private Random r = new Random(System.nanoTime());
private boolean isFirstConnect = true;
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
LOG.error("Failed to find watcher!", nwe);
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
LOG.error("Exception when removing watcher", ke);
p.replyHeader.setErr(ke.code().intValue());
}
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#process
w.process(e);
}
return watchers;
}