当前位置: 首页 > news >正文

zookeeper Watcher

目录

    • Watcher的一些常识
      • Watcher是什么
      • Watcher怎么用
      • Watcher特性
    • 回顾回调&观察者模式&发布订阅模式
    • Zookeeper 客户端/ 服务端 watcher机制
      • 流程说明
      • getChildren 并注册watcher的例子流程说明

Watcher的一些常识

Watcher是什么

ZK中引入Watcher机制来实现分布式的通知功能。

ZK允许客户端向服务端注册一个Watcher监听,当服务点的的指定事件触发监听时,那么服务端就会向客户端发送事件通知,以便客户端完成逻辑操作(即客户端向服务端注册监听,并将watcher对象存在客户端的Watchermanager中
服务端触发事件后,向客户端发送通知,客户端收到通知后从wacherManager中取出对象来执行回调逻辑)

Watcher怎么用

参考:https://blog.csdn.net/qq_26437925/article/details/145715160 中的一个例子代码

判断特定路径,添加了Watcher 对其中的结点删除事件做出对应处理。

  zk.exists(preNodePath, new Watcher() {
    @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                // 前一个节点删除了释放锁了,就唤醒本结点
                synchronized (node) {
                    node.notify();
                }
            }
        }
    });

可以查看Watcher接口如下:
在这里插入图片描述

public class WatchedEvent {

    private final KeeperState keeperState; //用于记录Event发生时的zk状态(通知状态
    private final EventType eventType; // 记录Event的类型
    private String path; 

Watcher特性

  • 一次性:一旦一个watcher被触发,ZK都会将其从相应的的存储中移除,所以watcher是需要每注册一次,才可触发一次。
  • 客户端串行执行:客户端watcher回调过程是一个串行同步的过程
  • 轻量:watcher数据结构中只包含:通知状态、事件类型和节点路径

之所以这么说,其实可以在源码实现中找到:

org.apache.zookeeper.server.WatchManager#triggerWatch,此为真正出发watcher执行的逻辑, 后文将再次说明。

可以看到watcher执行一次就删除了,for循环逐一执行的。

Set<Watcher> triggerWatch(String path, EventType type) {
    return triggerWatch(path, type, null);
}

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    // 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的
    synchronized (this) {
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        // 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#process
        w.process(e);
    }
    return watchers;
}

回顾回调&观察者模式&发布订阅模式

  • 回调的思想
  1. 类A的a()方法调用类B的b()方法
  2. 类B的b()方法执行完毕主动调用类A的callback()方法
  • 观察者模式
    在这里插入图片描述
  • 发布订阅,对比 观察者模式
    在这里插入图片描述

Zookeeper 客户端/ 服务端 watcher机制

流程说明

  1. 一般客户端调用exists/getData/getChildren注册监听,其中请求会封装为zookeeper内部的协议packet,添加到outgoingQueue队列中
  2. 然后客户端内部的SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端
  3. 客户端发给服务端了,等待服务端响应的packet结合放到了SendThread内部的pendingQueue队列中
  4. 客户端会将watcher存储到WatchManager的watchTablewatch2Paths中,缓存一样; 服务端仅仅只是保存了当前连接的 ServerCnxn 对象(ServerCnxn是服务端与客户端进行网络交互的一个NIO接口,代表了客户端与服务端的连接)
/**
 * Interface to a Server connection - represents a connection from a client
 * to the server.
 */
public abstract class ServerCnxn implements Stats, Watcher {

  1. 这样当指定的节点发生相关的事件时,即要处理此请求了,会判断是否有watcher,注册到WatchManager, 最后会调用WatchManager的triggerWatch方法触发相关的事件。
  2. triggerWatch就是客户端从之前WatchManager缓存取出wacther执行的过程

举例org.apache.zookeeper.server.FinalRequestProcessor#handleGetDataRequest

 private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
        GetDataRequest getDataRequest = (GetDataRequest) request;
        String path = getDataRequest.getPath();
        DataNode n = zks.getZKDatabase().getNode(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
        Stat stat = new Stat();
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }
  1. 客户端有一个专门处理watcher时间的线程EventThread,其保持了一个待处理事件的队列。它根据传递的事件的类型和节点信息,从客户端的ZKWatcherManager中取出相关的watcher,将其添加到EventThread事件队列中,并在去run方法中不断取出watcher事件进行处理。

  2. 通过将节点信息和事件类型进行封装成为watchedevent,并查找到到对应节点的注册的watcher,然后调用watcher的回调方法process进行处理。

在这里插入图片描述

getChildren 并注册watcher的例子流程说明

/**
     * Return the list of the children of the node of the given path.
     * <p>
     * If the watch is non-null and the call is successful (no exception is thrown),
     * a watch will be left on the node with the given path. The watch willbe
     * triggered by a successful operation that deletes the node of the given
     * path or creates/delete a child under the node.
     * <p>
     * The list of children returned is not sorted and no guarantee is provided
     * as to its natural or lexical order.
     * <p>
     * A KeeperException with error code KeeperException.NoNode will be thrown
     * if no node with the given path exists.
     *
     * @param path
     * @param watcher explicit watcher
     * @return an unordered array of children of the node with the given path
     * @throws InterruptedException If the server transaction is interrupted.
     * @throws KeeperException If the server signals an error with a non-zero error code.
     * @throws IllegalArgumentException if an invalid path is specified
     */
    public List<String> getChildren(final String path, Watcher watcher)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new ChildWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getChildren);
        GetChildrenRequest request = new GetChildrenRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetChildrenResponse response = new GetChildrenResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        return response.getChildren();
    }

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    // 客户端与服务端的网络传输
    ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);
    synchronized(packet) {
        while(!packet.finished) {
            packet.wait();
        }
        return r;
    }
}

ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {
    ClientCnxn.Packet packet = null;
    LinkedList var11 = this.outgoingQueue;
    synchronized(this.outgoingQueue) {
        // 传输的对象都包装成Packet对象
        packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (this.state.isAlive() && !this.closing) {
            if (h.getType() == -11) {
                this.closing = true;
            }

            // 放入发送队列中,等待发送
            this.outgoingQueue.add(packet);
        } else {
            this.conLossPacket(packet);
        }
    }

    this.sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

outgoingQueue的处理
在这里插入图片描述
在这里插入图片描述
服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理

 case OpCode.getChildren: {
     lastOp = "GETC";
     GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
     ByteBufferInputStream.byteBuffer2Record(request.request,
             getChildrenRequest);
     DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
     if (n == null) {
         throw new KeeperException.NoNodeException();
     }
     PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
             ZooDefs.Perms.READ,
             request.authInfo);
     // 返回children,
     // 这里根据客户端设置的是否有watch变量来传入watcher对象
    // 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接)      
     List<String> children = zks.getZKDatabase().getChildren(
             getChildrenRequest.getPath(), null, getChildrenRequest
                     .getWatch() ? cnxn : null);
     rsp = new GetChildrenResponse(children);
     break;
 }

将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTablewatch2Paths

 public List<String> getChildren(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            if (stat != null) {
                n.copyStat(stat);
            }
            List<String> children=new ArrayList<String>(n.getChildren());

            if (watcher != null) {
                childWatches.addWatch(path, watcher);
            }
            return children;
        }
    }
  • 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中

/**
    * This class services the outgoing request queue and generates the heart
    * beats. It also spawns the ReadThread.
    */
   class SendThread extends ZooKeeperThread {
       private long lastPingSentNs;
       private final ClientCnxnSocket clientCnxnSocket;
       private Random r = new Random(System.nanoTime());        
       private boolean isFirstConnect = true;

       void readResponse(ByteBuffer incomingBuffer) throws IOException {
           ByteBufferInputStream bbis = new ByteBufferInputStream(
                   incomingBuffer);
           BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
           ReplyHeader replyHdr = new ReplyHeader();

           replyHdr.deserialize(bbia, "header");
           if (replyHdr.getXid() == -2) {
               // -2 is the xid for pings
               if (LOG.isDebugEnabled()) {
                   LOG.debug("Got ping response for sessionid: 0x"
                           + Long.toHexString(sessionId)
                           + " after "
                           + ((System.nanoTime() - lastPingSentNs) / 1000000)
                           + "ms");
               }
               return;
           }
           if (replyHdr.getXid() == -4) {
               // -4 is the xid for AuthPacket               
               if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                   state = States.AUTH_FAILED;                    
                   eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                           Watcher.Event.KeeperState.AuthFailed, null) );            		            		
               }
               if (LOG.isDebugEnabled()) {
                   LOG.debug("Got auth sessionid:0x"
                           + Long.toHexString(sessionId));
               }
               return;
           }
           if (replyHdr.getXid() == -1) {
               // -1 means notification
               if (LOG.isDebugEnabled()) {
                   LOG.debug("Got notification sessionid:0x"
                       + Long.toHexString(sessionId));
               }
               WatcherEvent event = new WatcherEvent();
               event.deserialize(bbia, "response");

               // convert from a server path to a client path
               if (chrootPath != null) {
                   String serverPath = event.getPath();
                   if(serverPath.compareTo(chrootPath)==0)
                       event.setPath("/");
                   else if (serverPath.length() > chrootPath.length())
                       event.setPath(serverPath.substring(chrootPath.length()));
                   else {
                   	LOG.warn("Got server path " + event.getPath()
                   			+ " which is too short for chroot path "
                   			+ chrootPath);
                   }
               }

               WatchedEvent we = new WatchedEvent(event);
               if (LOG.isDebugEnabled()) {
                   LOG.debug("Got " + we + " for sessionid 0x"
                           + Long.toHexString(sessionId));
               }

               eventThread.queueEvent( we );
               return;
           }

           // If SASL authentication is currently in progress, construct and
           // send a response packet immediately, rather than queuing a
           // response as with other packets.
           if (tunnelAuthInProgress()) {
               GetSASLRequest request = new GetSASLRequest();
               request.deserialize(bbia,"token");
               zooKeeperSaslClient.respondToServer(request.getToken(),
                 ClientCnxn.this);
               return;
           }

           Packet packet;
           synchronized (pendingQueue) {
               if (pendingQueue.size() == 0) {
                   throw new IOException("Nothing in the queue, but got "
                           + replyHdr.getXid());
               }
               packet = pendingQueue.remove();
           }
           /*
            * Since requests are processed in order, we better get a response
            * to the first request!
            */
           try {
               if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                   packet.replyHeader.setErr(
                           KeeperException.Code.CONNECTIONLOSS.intValue());
                   throw new IOException("Xid out of order. Got Xid "
                           + replyHdr.getXid() + " with err " +
                           + replyHdr.getErr() +
                           " expected Xid "
                           + packet.requestHeader.getXid()
                           + " for a packet with details: "
                           + packet );
               }

               packet.replyHeader.setXid(replyHdr.getXid());
               packet.replyHeader.setErr(replyHdr.getErr());
               packet.replyHeader.setZxid(replyHdr.getZxid());
               if (replyHdr.getZxid() > 0) {
                   lastZxid = replyHdr.getZxid();
               }
               if (packet.response != null && replyHdr.getErr() == 0) {
                   packet.response.deserialize(bbia, "response");
               }

               if (LOG.isDebugEnabled()) {
                   LOG.debug("Reading reply sessionid:0x"
                           + Long.toHexString(sessionId) + ", packet:: " + packet);
               }
           } finally {
               finishPacket(packet);
           }
       }
        
private void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        // Add all the removed watch events to the event queue, so that the
        // clients will be notified with 'Data/Child WatchRemoved' event type.
        if (p.watchDeregistration != null) {
            Map<EventType, Set<Watcher>> materializedWatchers = null;
            try {
                materializedWatchers = p.watchDeregistration.unregister(err);
                for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
                        .entrySet()) {
                    Set<Watcher> watchers = entry.getValue();
                    if (watchers.size() > 0) {
                        queueEvent(p.watchDeregistration.getClientPath(), err,
                                watchers, entry.getKey());
                        // ignore connectionloss when removing from local
                        // session
                        p.replyHeader.setErr(Code.OK.intValue());
                    }
                }
            } catch (KeeperException.NoWatcherException nwe) {
                LOG.error("Failed to find watcher!", nwe);
                p.replyHeader.setErr(nwe.code().intValue());
            } catch (KeeperException ke) {
                LOG.error("Exception when removing watcher", ke);
                p.replyHeader.setErr(ke.code().intValue());
            }
        }

        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }

触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch

 Set<Watcher> triggerWatch(String path, EventType type) {
        return triggerWatch(path, type, null);
    }

    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        // 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            // 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#process
            w.process(e);
        }
        return watchers;
    }

相关文章:

  • UEFI Spec 学习笔记---9 - Protocols — EFI Loaded Image
  • 【Ubuntu】GPU显存被占用,但显示没有使用GPU的进程
  • 虚拟机和主机可互相复制粘贴
  • 基于WebGIS技术的校园地图导航系统架构与核心功能设计
  • Linux 内核是如何检测可用物理内存地址范围的?
  • DeepSeek提示词高阶用法全解析:职场效率提升的10个实战案例*——让AI成为你的全能职场助手
  • Java的常用数据类型有哪些?
  • python数据容器
  • spring boot知识点4
  • Android SDK封装打包流程详解
  • [展示]Webrtc NoiseSuppressor降噪模块嵌入式平台移植
  • Ubuntu20.04.2安装Vmware tools
  • git空文件夹不能提交问题
  • Git命令详解与工作流介绍:全面掌握版本控制系统的操作指南
  • 火绒终端安全管理系统V2.0【系统防御功能】
  • 校招后台开发:JAVA和GO选哪一个?
  • 设计模式教程:代理模式(Proxy Pattern)
  • python中的Pillow 库安装后需要重启吗?
  • Python正则表达式处理中日韩字符过滤全解析
  • 基于Spring Security 6的OAuth2 系列之二十 - 高级特性--令牌交换(Token Exchange)
  • dede 网站地图样式/如何在手机上开自己的网站
  • 百度云主机做网站/热点新闻
  • 超级折扣2WordPress/网站为什么要seo?
  • 中国建设银行企业信息门户网站/微信小程序开发流程
  • 做网站侵权吗/网页设计培训学校
  • 小程序代理加盟前景/行者seo