当前位置: 首页 > wzjs >正文

校园二手网站的建设方案小程序引流推广平台

校园二手网站的建设方案,小程序引流推广平台,wordpress知识,如何在虚拟机中建设网站目录 Watcher的一些常识Watcher是什么Watcher怎么用Watcher特性 回顾回调&观察者模式&发布订阅模式Zookeeper 客户端/ 服务端 watcher机制流程说明getChildren 并注册watcher的例子流程说明 Watcher的一些常识 Watcher是什么 ZK中引入Watcher机制来实现分布式的通知功…

目录

    • Watcher的一些常识
      • Watcher是什么
      • Watcher怎么用
      • Watcher特性
    • 回顾回调&观察者模式&发布订阅模式
    • Zookeeper 客户端/ 服务端 watcher机制
      • 流程说明
      • getChildren 并注册watcher的例子流程说明

Watcher的一些常识

Watcher是什么

ZK中引入Watcher机制来实现分布式的通知功能。

ZK允许客户端向服务端注册一个Watcher监听,当服务点的的指定事件触发监听时,那么服务端就会向客户端发送事件通知,以便客户端完成逻辑操作(即客户端向服务端注册监听,并将watcher对象存在客户端的Watchermanager中
服务端触发事件后,向客户端发送通知,客户端收到通知后从wacherManager中取出对象来执行回调逻辑)

Watcher怎么用

参考:https://blog.csdn.net/qq_26437925/article/details/145715160 中的一个例子代码

判断特定路径,添加了Watcher 对其中的结点删除事件做出对应处理。

  zk.exists(preNodePath, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted) {// 前一个节点删除了释放锁了,就唤醒本结点synchronized (node) {node.notify();}}}});

可以查看Watcher接口如下:
在这里插入图片描述

public class WatchedEvent {private final KeeperState keeperState; //用于记录Event发生时的zk状态(通知状态private final EventType eventType; // 记录Event的类型private String path; 

Watcher特性

  • 一次性:一旦一个watcher被触发,ZK都会将其从相应的的存储中移除,所以watcher是需要每注册一次,才可触发一次。
  • 客户端串行执行:客户端watcher回调过程是一个串行同步的过程
  • 轻量:watcher数据结构中只包含:通知状态、事件类型和节点路径

之所以这么说,其实可以在源码实现中找到:

org.apache.zookeeper.server.WatchManager#triggerWatch,此为真正出发watcher执行的逻辑, 后文将再次说明。

可以看到watcher执行一次就删除了,for循环逐一执行的。

Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);
}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#processw.process(e);}return watchers;
}

回顾回调&观察者模式&发布订阅模式

  • 回调的思想
  1. 类A的a()方法调用类B的b()方法
  2. 类B的b()方法执行完毕主动调用类A的callback()方法
  • 观察者模式
    在这里插入图片描述
  • 发布订阅,对比 观察者模式
    在这里插入图片描述

Zookeeper 客户端/ 服务端 watcher机制

流程说明

  1. 一般客户端调用exists/getData/getChildren注册监听,其中请求会封装为zookeeper内部的协议packet,添加到outgoingQueue队列中
  2. 然后客户端内部的SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端
  3. 客户端发给服务端了,等待服务端响应的packet结合放到了SendThread内部的pendingQueue队列中
  4. 客户端会将watcher存储到WatchManager的watchTablewatch2Paths中,缓存一样; 服务端仅仅只是保存了当前连接的 ServerCnxn 对象(ServerCnxn是服务端与客户端进行网络交互的一个NIO接口,代表了客户端与服务端的连接)
/*** Interface to a Server connection - represents a connection from a client* to the server.*/
public abstract class ServerCnxn implements Stats, Watcher {

  1. 这样当指定的节点发生相关的事件时,即要处理此请求了,会判断是否有watcher,注册到WatchManager, 最后会调用WatchManager的triggerWatch方法触发相关的事件。
  2. triggerWatch就是客户端从之前WatchManager缓存取出wacther执行的过程

举例org.apache.zookeeper.server.FinalRequestProcessor#handleGetDataRequest

 private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {GetDataRequest getDataRequest = (GetDataRequest) request;String path = getDataRequest.getPath();DataNode n = zks.getZKDatabase().getNode(path);if (n == null) {throw new KeeperException.NoNodeException();}zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);Stat stat = new Stat();byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);return new GetDataResponse(b, stat);}
  1. 客户端有一个专门处理watcher时间的线程EventThread,其保持了一个待处理事件的队列。它根据传递的事件的类型和节点信息,从客户端的ZKWatcherManager中取出相关的watcher,将其添加到EventThread事件队列中,并在去run方法中不断取出watcher事件进行处理。

  2. 通过将节点信息和事件类型进行封装成为watchedevent,并查找到到对应节点的注册的watcher,然后调用watcher的回调方法process进行处理。

在这里插入图片描述

getChildren 并注册watcher的例子流程说明

/*** Return the list of the children of the node of the given path.* <p>* If the watch is non-null and the call is successful (no exception is thrown),* a watch will be left on the node with the given path. The watch willbe* triggered by a successful operation that deletes the node of the given* path or creates/delete a child under the node.* <p>* The list of children returned is not sorted and no guarantee is provided* as to its natural or lexical order.* <p>* A KeeperException with error code KeeperException.NoNode will be thrown* if no node with the given path exists.** @param path* @param watcher explicit watcher* @return an unordered array of children of the node with the given path* @throws InterruptedException If the server transaction is interrupted.* @throws KeeperException If the server signals an error with a non-zero error code.* @throws IllegalArgumentException if an invalid path is specified*/public List<String> getChildren(final String path, Watcher watcher)throws KeeperException, InterruptedException{final String clientPath = path;PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new ChildWatchRegistration(watcher, clientPath);}final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.getChildren);GetChildrenRequest request = new GetChildrenRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetChildrenResponse response = new GetChildrenResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}return response.getChildren();}

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();// 客户端与服务端的网络传输ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);synchronized(packet) {while(!packet.finished) {packet.wait();}return r;}
}ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {ClientCnxn.Packet packet = null;LinkedList var11 = this.outgoingQueue;synchronized(this.outgoingQueue) {// 传输的对象都包装成Packet对象packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (this.state.isAlive() && !this.closing) {if (h.getType() == -11) {this.closing = true;}// 放入发送队列中,等待发送this.outgoingQueue.add(packet);} else {this.conLossPacket(packet);}}this.sendThread.getClientCnxnSocket().wakeupCnxn();return packet;
}

outgoingQueue的处理
在这里插入图片描述
在这里插入图片描述
服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理

 case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),ZooDefs.Perms.READ,request.authInfo);// 返回children,// 这里根据客户端设置的是否有watch变量来传入watcher对象// 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接)      List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}

将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTablewatch2Paths

 public List<String> getChildren(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {if (stat != null) {n.copyStat(stat);}List<String> children=new ArrayList<String>(n.getChildren());if (watcher != null) {childWatches.addWatch(path, watcher);}return children;}}
  • 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中

/*** This class services the outgoing request queue and generates the heart* beats. It also spawns the ReadThread.*/class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime());        private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket               if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED;                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) );            		            		}if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia,"token");zooKeeperSaslClient.respondToServer(request.getToken(),ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {if (pendingQueue.size() == 0) {throw new IOException("Nothing in the queue, but got "+ replyHdr.getXid());}packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid "+ replyHdr.getXid() + " with err " ++ replyHdr.getErr() +" expected Xid "+ packet.requestHeader.getXid()+ " for a packet with details: "+ packet );}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}if (LOG.isDebugEnabled()) {LOG.debug("Reading reply sessionid:0x"+ Long.toHexString(sessionId) + ", packet:: " + packet);}} finally {finishPacket(packet);}}private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}// Add all the removed watch events to the event queue, so that the// clients will be notified with 'Data/Child WatchRemoved' event type.if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;try {materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err,watchers, entry.getKey());// ignore connectionloss when removing from local// sessionp.replyHeader.setErr(Code.OK.intValue());}}} catch (KeeperException.NoWatcherException nwe) {LOG.error("Failed to find watcher!", nwe);p.replyHeader.setErr(nwe.code().intValue());} catch (KeeperException ke) {LOG.error("Exception when removing watcher", ke);p.replyHeader.setErr(ke.code().intValue());}}if (p.cb == null) {synchronized (p) {p.finished = true;p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}

触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch

 Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#processw.process(e);}return watchers;}
http://www.dtcms.com/wzjs/184496.html

相关文章:

  • 网站seo诊断报告例子新闻稿件
  • 饮食中心网站建设方案百度指数分析大数据
  • .net开发的网站能做优化吗成都爱站网seo站长查询工具
  • 好听的公司名字大全集北京网站建设优化
  • 南京企业网站cps广告是什么意思
  • 全景网站建设手机网站制作软件
  • 建设网站需要机房吗网站代理公司
  • 英语培训学校网站怎么做如何推销自己的产品
  • 网站建设所需素材深圳网络公司推广
  • 哈尔滨网站搜索优化公司seo什么职位
  • vultr怎么做网站百度收录快速提交
  • 手机设计长沙专业seo优化公司
  • 网站建设公司程序今天的新闻 最新消息
  • 网站运营规划seo搜索优化技术
  • 快速网站建设费用企业营销策划书如何编写
  • 局域网怎么做网站长春网站制作系统
  • 游戏网站开发试验报告今日中央新闻
  • 给网站开发自己的一封信推荐就业的培训机构
  • 网页设计网站教程抖音竞价推广怎么做
  • 做英文网站 赚钱台州网站建设方案推广
  • 深圳网站seo公司seo中文意思是
  • 安装网站seo搜索优化推广
  • 免费的黄冈网站有哪些下载软件如何推广小程序
  • 怎么查那些人输入做网站东莞网络推广策略
  • 网页图片怎么居中郑州搜索引擎优化公司
  • 网络视频网站建设多少钱网站自动提交收录
  • 北京模板网站建设全包优质外链
  • 做网站的公司 经营范围中国互联网公司排名
  • 公司品牌logo设计商标设计网站搜索引擎优化方案的案例
  • 企业网站建设算什么费用任务推广引流平台