当前位置: 首页 > wzjs >正文

西安网站公司比较大的容易被百度收录的网站

西安网站公司比较大的,容易被百度收录的网站,wordpress 微信通知,北京手机网站建设公司大纲 1.ZooKeeper如何进行序列化 2.深入分析Jute的底层实现原理 3.ZooKeeper的网络通信协议详解 4.客户端的核心组件和初始化过程 5.客户端核心组件HostProvider 6.客户端核心组件ClientCnxn 7.客户端工作原理之会话创建过程 6.客户端核心组件ClientCnxn (1)客户端核心…

大纲

1.ZooKeeper如何进行序列化

2.深入分析Jute的底层实现原理

3.ZooKeeper的网络通信协议详解

4.客户端的核心组件和初始化过程

5.客户端核心组件HostProvider

6.客户端核心组件ClientCnxn

7.客户端工作原理之会话创建过程

6.客户端核心组件ClientCnxn

(1)客户端核心类ClientCnxn和Packet

(2)请求队列outgoingQueue与响应等待队列pendingQueue

(3)SendThread

(4)EventThread

(5)总结

(1)客户端核心类ClientCnxn和Packet

一.ClientCnxn

ClientCnxn是zk客户端的核心工作类,负责维护客户端与服务端间的网络连接并进行一系列网络通信。

二.Packet

Packet是ClientCnxn内部定义的、作为zk客户端中请求与响应的载体。也就是说Packet可以看作是一个用来进行网络通信的数据结构,Packet的主要作用是封装网络通信协议层的数据。

Packet中包含了一些请求协议的相关属性字段:请求头信息requestHeader、响应头信息replyHeader、请求体request、响应体response、节点路径clientPath以及serverPath、Watcher监控信息。

Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。该方法只会将requestHeader、request和readOnly三个属性进行序列化。Packet的其余属性保存在客户端的上下文,不进行服务端的网络传输。

public class ApiOperatorDemo implements Watcher {private final static String CONNECT_STRING = "192.168.30.10:2181";private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zookeeper;public static void main(String[] args) throws Exception {zookeeper = new ZooKeeper(CONNECT_STRING, 5000, new ApiOperatorDemo());countDownLatch.await();String result = zookeeper.setData("/node", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}@Overridepublic void process(WatchedEvent watchedEvent) {//如果当前的连接状态是连接成功的,那么通过计数器去控制if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {countDownLatch.countDown();}}
}public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {if (clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;watchManager = defaultWatchManager();watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider;//创建ClientCnxn实例cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);cnxn.start();}protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);}private ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null || clientCnxnSocketName.equals(ClientCnxnSocketNIO.class.getSimpleName())) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();} else if (clientCnxnSocketName.equals(ClientCnxnSocketNetty.class.getSimpleName())) {clientCnxnSocketName = ClientCnxnSocketNetty.class.getName();}Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());return clientCxnSocket;}...public Stat setData(final String path, byte data[], int version) {final String clientPath = path;PathUtils.validatePath(clientPath);final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.setData);SetDataRequest request = new SetDataRequest();request.setPath(serverPath);request.setData(data);request.setVersion(version);SetDataResponse response = new SetDataResponse();//提交请求ReplyHeader r = cnxn.submitRequest(h, request, response, null);...return response.getStat();}...
}public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {...sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();...}public void start() {sendThread.start();eventThread.start();}...public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) {return submitRequest(h, request, response, watchRegistration, null);}public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {ReplyHeader r = new ReplyHeader();//封装成Packet对象Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);synchronized (packet) {if (requestTimeout > 0) {waitForPacketFinish(r, packet);} else {while (!packet.finished) {packet.wait();}}}if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {sendThread.cleanAndNotifyState();}return r;}public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath,String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {Packet packet = null;packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;synchronized (state) {if (!state.isAlive() || closing) {conLossPacket(packet);} else {if (h.getType() == OpCode.closeSession) {closing = true;}//将Packet对象添加到outgoingQueue队列,后续请求的发送交给SendThread来处理outgoingQueue.add(packet);}}sendThread.getClientCnxnSocket().packetAdded();return packet;}...static class Packet {RequestHeader requestHeader;//请求头ReplyHeader replyHeader;//响应头Record request;//请求体Record response;//响应体ByteBuffer bb;String clientPath;//节点路径String serverPath;//节点路径boolean finished;AsyncCallback cb;Object ctx;WatchRegistration watchRegistration;public boolean readOnly;WatchDeregistration watchDeregistration;...public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}}...
}

(2)请求队列outgoingQueue与响应等待队列pendingQueue

ClientCnxn中有两个核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端的响应等待队列。

outgoingQueue队列是一个客户端的请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。

pendingQueue队列是一个服务端的响应等待队列,用于存储已从客户端发送到服务端,但是需要等待服务端响应的Packet集合。

当zk客户端对请求信息进行封装和序列化后,zk不会立刻就将一个请求信息通过网络直接发送给服务端,而是会先将请求信息添加到请求队列中,之后通过SendThread线程来处理相关的请求发送操作。

public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();...public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {...sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();...}public void start() {sendThread.start();eventThread.start();}...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));state = States.CONNECTING;this.clientCnxnSocket = clientCnxnSocket;setDaemon(true);}...@Overridepublic void run() {...while (state.isAlive()) {...//通过clientCnxnSocket.doTransport方法处理请求发送和响应接收clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);...}...}}
}

一.请求发送

SendThread线程在调用ClientCnxnSocket的doTransport()方法时,会从ClientCnxn的outgoingQueue队列中提取出一个可发送的Packet对象,同时生成一个客户端请求序号XID并将其设置到Packet对象的请求头中,然后再调用Packet对象的createBB方法进行序列化,最后才发送出去。

请求发送完毕后,会立即将该Packet对象保存到pendingQueue队列中,以便等待服务端的响应返回后可以进行相应的处理。

public class ClientCnxnSocketNIO extends ClientCnxnSocket {private final Selector selector = Selector.open();protected ClientCnxn.SendThread sendThread;protected LinkedBlockingDeque<Packet> outgoingQueue;...@Overridevoid doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {selector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}updateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();updateSocketAddresses();sendThread.primeConnection();}} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//通过doIO方法处理请求发送和响应接收doIO(pendingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}}selected.clear();}void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {SocketChannel sock = (SocketChannel) sockKey.channel();//处理响应接收if (sockKey.isReadable()) {...}//处理请求发送if (sockKey.isWritable()) {//从outgoingQueue队列中提取出一个可发送的Packet对象Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());if (p != null) {updateLastSend();if (p.bb == null) {if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) {//同时生成一个客户端请求序号XID并将其设置到Packet对象的请求头中p.requestHeader.setXid(cnxn.getXid());}//进行序列化p.createBB();}//发送请求给服务端sock.write(p.bb);if (!p.bb.hasRemaining()) {sentCount.getAndIncrement();outgoingQueue.removeFirstOccurrence(p);if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}if (outgoingQueue.isEmpty()) {disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {disableWrite();} else {enableWrite();}}}...
}

二.响应接收

客户端获取到来自服务端的响应后,其中的SendThread线程在调用ClientCnxnSocket的doTransport()方法时,便会调用ClientCnxnSocket的doIO()方法,根据不同的响应进行不同的处理。

情况一:如果检测到当前客户端尚未进行初始化,则客户端和服务端还在创建会话,那么此时就直接将收到的ByteBuffer序列化成ConnectResponse对象。

情况二:如果接收到的服务端响应是一个事件,那么此时就会将接收到的ByteBuffer序列化成WatcherEvent对象,并将WatchedEvent对象放入待处理队列waitingEvents中。

情况三:如果接收到的服务端响应是一个常规的请求响应,那么就从pendingQueue队列中取出一个Packet对象来进行处理;此时zk客户端会检验服务端响应中包含的XID值来确保请求处理的顺序性,然后再将接收到的ByteBuffer序列化成相应的Response对象。

最后,会在finishPacket()方法中处理Packet对象中关联的Watcher事件。

public class ClientCnxnSocketNIO extends ClientCnxnSocket {...void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {SocketChannel sock = (SocketChannel) sockKey.channel();//处理响应接收if (sockKey.isReadable()) {int rc = sock.read(incomingBuffer);...if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount.getAndIncrement();readLength();} else if (!initialized) {//如果检测到当前客户端的网络练车ClientCnxnSocket尚未进行初始化readConnectResult();enableRead();if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;} else {//处理服务端返回的响应sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}//处理请求发送if (sockKey.isWritable()) {...}}...
}abstract class ClientCnxnSocket {protected ByteBuffer incomingBuffer = lenBuffer;...void readConnectResult() throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);//将接收到的ByteBuffer序列化成ConnectResponse对象ConnectResponse conRsp = new ConnectResponse();conRsp.deserialize(bbia, "connect");// read "is read-only" flagboolean isRO = false;isRO = bbia.readBool("readOnly");this.sessionId = conRsp.getSessionId();//通过SendThread.onConnected方法建立连接sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);}...
}public class ClientCnxn {...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");...//如果服务端返回的响应是一个事件if (replyHdr.getXid() == -1) {//将接收到的ByteBuffer序列化成WatcherEvent对象WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");...WatchedEvent we = new WatchedEvent(event);...//将WatchedEvent对象放入待处理队列waitingEvents中eventThread.queueEvent( we );return;}...//如果服务端返回的响应是一个常规的请求响应Packet packet;synchronized (pendingQueue) {//从pendingQueue队列中取出一个Packetpacket = pendingQueue.remove();}try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("...");}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}//将接收到的ByteBuffer序列化成Response对象if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}} finally {finishPacket(packet);}}}...protected void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());p.replyHeader.setErr(Code.OK.intValue());}}}if (p.cb == null) {synchronized (p) {p.finished = true;//客户端封装好Packet发送请求时,会调用Packet对象的wait()方法进行阻塞,这里就进行了通知p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}...
}

(3)SendThread

SendThread是客户端ClientCnxn内部的一个IO调度线程,SendThread的作用是用于管理客户端和服务端之间的所有网络IO操作。

在zk客户端的实际运行过程中:

一.一方面SendThread会维护客户端与服务端之间的会话生命周期

通过在一定的周期频率内向服务端发送一个PING包来实现心跳检测。同时如果客户端和服务端出现TCP连接断开,就会自动完成重连操作。

二.另一方面SendThread会管理客户端所有的请求发送和响应接收操作

将上层客户端API操作转换成相应的请求协议并发送到服务端,并且完成对同步调用的返回和异步调用的回调,同时SendThread还负责将来自服务端的事件传递给EventThread去处理。

注意:为了向服务端证明自己还存活,客户端会周期性发送Ping包给服务端。服务端收到Ping包之后,会根据当前时间重置与客户端的Session时间,更新该Session的请求延迟时间,进而保持客户端与服务端的连接状态。

public class ClientCnxn {...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...//处理服务端的响应void readResponse(ByteBuffer incomingBuffer) throws IOException {...eventThread.queueEvent( we );...finishPacket(packet);...}@Overridepublic void run() {...while (state.isAlive()) {...//出现TCP连接断开,就会自动完成重连操作if (!clientCnxnSocket.isConnected()) {if (rwServerAddress != null) {serverAddress = rwServerAddress;rwServerAddress = null;} else {serverAddress = hostProvider.next(1000);}startConnect(serverAddress);clientCnxnSocket.updateLastSendAndHeard();}...if (state.isConnected()) {...//发送PING包进行心跳检测sendPing();...}}...//处理请求发送和响应接收clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);}private void sendPing() {lastPingSentNs = System.nanoTime();RequestHeader h = new RequestHeader(-2, OpCode.ping);queuePacket(h, null, null, null, null, null, null, null, null);}...}...
}

(4)EventThread

EventThread是客户端ClientCnxn内部的另一个核心线程,EventThread负责触发客户端注册的Watcher监听和异步接口注册的回调。

EventThread中有一个waitingEvents队列,临时存放要被触发的Object,这些Object包括客户端注册的Watcher监听和异步接口中注册的回调。

EventThread会不断从waitingEvents队列中取出Object,然后识别出具体类型是Watcher监听还是AsyncCallback回调,最后分别调用process()方法和processResult()方法来实现事件触发和回调。

public class ClientCnxn {...class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();EventThread() {super(makeThreadName("-EventThread"));setDaemon(true);}@Overridepublic void run() {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled) {synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}}}public void queuePacket(Packet packet) {if (wasKilled) {synchronized (waitingEvents) {if (isRunning) waitingEvents.add(packet);else processEvent(packet);}} else {waitingEvents.add(packet);}}...}...
}

(5)总结

客户端ClientCnxn的工作原理:

当客户端向服务端发送请求操作时,首先会将请求信息封装成Packet对象并加入outgoingQueue请求队列中,之后通过SendThread网络IO调度线程将请求发送给服务端。当客户端接收到服务端响应时,通过EventThread线程来处理服务端响应及触发Watcher监听和异步回调。

7.客户端工作原理之会话创建过程

(1)初始化阶段:实例化ZooKeeper对象

(2)会话创建阶段:建立连接并发送会话创建请求

(3)响应处理阶段:接收会话创建请求的响应

(1)初始化阶段:实例化ZooKeeper对象

一.初始化ZooKeeper对象

二.设置会话默认的Watcher

三.构造服务器地址管理器StaticHostProvider

四.创建并初始化客户端的网络连接器ClientCnxn

五.初始化SendThread和EventThread

一.初始化ZooKeeper对象

通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象。在初始化过程中,会创建客户端的Watcher管理器ZKWatchManager。

二.设置会话默认的Watcher

如果在ZooKeeper的构造方法中传入了一个Watcher对象,那么客户端会将该对象作为默认的Watcher,保存在客户端的Watcher管理器ZKWatchManager中。

三.构造服务器地址管理器StaticHostProvider

在ZooKeeper构造方法中传入的服务器地址字符串,客户端会将其存放在服务器地址列表管理器StaticHostProvider中。

四.创建并初始化客户端的网络连接器ClientCnxn

创建的网络连接器ClientXnxn是用来管理客户端与服务端的网络交互。ClientCnxn中有两个核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端的响应等待队列。ClientCnxn是客户端的网络连接器,ClientCnxnSocket是客户端的网络连接,ClientCnxn构造方法会传入ClientCnxnSocket。

五.初始化SendThread和EventThread

ClientCnxn的构造方法会创建两个核心线程SendThread和EventThread。SendThread用于管理客户端和服务端之间的所有网络IO,EventThread用于处理客户端的事件,比如Watcher和回调等。

初始化SendThread时,会将ClientCnxnSocket分配给SendThread作为底层网络IO处理器。初始化EventThread时,会初始化队列waitingEvents用于存放所有等待被客户端处理的事件。

public class CreateSessionDemo {private final static String CONNECTSTRING = "192.168.1.5:2181";private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws Exception {//创建zkZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {public void process(WatchedEvent watchedEvent) {//如果当前的连接状态是连接成功, 则通过计数器去控制, 否则进行阻塞, 因为连接是需要时间的//如果已经获得连接了, 那么状态会是SyncConnectedif (watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();System.out.println(watchedEvent.getState());}//如果数据发生了变化if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {System.out.println("节点发生了变化, 路径: " + watchedEvent.getPath());}}});//进行阻塞countDownLatch.await();...}
}public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;protected final ZKWatchManager watchManager;//ZKWatchManager实现了ClientWatchManager...//1.初始化ZooKeeper对象public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {...//创建客户端的Watcher管理器ZKWatchManagerwatchManager = defaultWatchManager();//2.设置会话默认的Watcher,保存在客户端的Watcher管理器ZKWatchManager中watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);//3.构造服务器地址列表管理器StaticHostProviderhostProvider = aHostProvider;//4.创建并初始化客户端的网络连接器ClientCnxn + 5.初始化SendThread和EventThreadcnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);//6.启动SendThread和EventThreadcnxn.start();}protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);}//从配置中获取客户端使用的网络连接配置:使用NIO还是Netty,然后通过反射进行实例化客户端Socketprivate ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());return clientCxnSocket;}static class ZKWatchManager implements ClientWatchManager {private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();protected volatile Watcher defaultWatcher;...}protected ZKWatchManager defaultWatchManager() {//创建客户端的Watcher管理器ZKWatchManagerreturn new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));}...
}public class ClientCnxn {final SendThread sendThread;final EventThread eventThread;private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();private final HostProvider hostProvider;public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {...this.hostProvider = hostProvider;//5.初始化SendThread和EventThreadsendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();...}class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));//客户端刚开始创建ZooKeeper对象时,设置其会话状态为CONNECTINGstate = States.CONNECTING;this.clientCnxnSocket = clientCnxnSocket;//设置为守护线程setDaemon(true);}...}class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();EventThread() {super(makeThreadName("-EventThread"));setDaemon(true);}}...
}

(2)会话创建阶段:建立连接并发送会话创建请求

一.启动SendThread和EventThread

二.获取一个服务端地址

三.创建TCP连接

四.构造ConnectRequest请求

五.发送ConnectRequest请求

一.启动SendThread和EventThread

即执行SendThread和EventThread的run()方法。

二.获取一个服务端地址

在开始创建TCP连接前,SendThread需要先获取一个zk服务端地址,也就是通过StaticHostProvider的next()方法获取出一个地址。

然后把该地址委托给初始化SendThread时传入的ClientCnxnSocket去创建一个TCP连接。

三.创建TCP连接

首先在SocketChannel中注册OP_CONNECT,表明发起建立TCP连接的请求。

然后执行SendThread的primeConnection()方法发起创建TCP长连接的请求。

四.构造ConnectRequest请求

SendThread的primeConnection()方法会构造出一个ConnectRequest请求,ConnectRequest请求代表着客户端向服务端发起的是一个创建会话请求。

SendThread的primeConnection()方法会将该请求包装成IO层的Packet对象,然后将该Packet对象放入outgoingQueue请求发送队列中。

五.发送ConnectRequest请求

ClientCnxnSocket会从outgoingQueue请求发送队列取出待发送的Packet,然后将其序列化成ByteBuffer后再发送给服务端。

ClientCnxnSocket是客户端的网络连接,ClientCnxn是客户端的网络连接器。

public class ZooKeeper implements AutoCloseable {protected final ClientCnxn cnxn;...//初始化ZooKeeper对象public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {...cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);//启动SendThread和EventThreadcnxn.start();}...
}public class ClientCnxn {//1.启动SendThread和EventThreadpublic void start() {sendThread.start();eventThread.start();}class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));//客户端刚开始创建ZooKeeper对象时,设置其会话状态为CONNECTINGstate = States.CONNECTING;this.clientCnxnSocket = clientCnxnSocket;//设置为守护线程setDaemon(true);}@Overridepublic void run() {clientCnxnSocket.introduce(this, sessionId, outgoingQueue);InetSocketAddress serverAddress = null;...while (state.isAlive()) {...//2.获取其中一个zk服务端的地址serverAddress = hostProvider.next(1000);//向zk服务端发起建立连接请求startConnect(serverAddress);...//4.构造请求 + 5.发送请求 + 处理响应clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);}...}private void startConnect(InetSocketAddress addr) throws IOException {...//3.委托给初始化SendThread时传给SendThread的clientCnxnSocket去创建TCP连接//接下来以ClientCnxnSocketNetty的connect为例clientCnxnSocket.connect(addr);}void primeConnection() throws IOException {...long sessId = (seenRwServerBefore) ? sessionId : 0;//4.构造ConnectRequest请求-会话创建请求ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);...//把会话创建请求放入请求发送队列outgoingQueueoutgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));...}...}...
}public class ClientCnxnSocketNIO extends ClientCnxnSocket {...void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock();//3.创建TCP长连接registerAndConnect(sock, addr);initialized = false;//Reset incomingBufferlenBuffer.clear();incomingBuffer = lenBuffer;}void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {//先在SocketChannel中注册OP_CONNECT事件,表明发起建立TCP连接的请求sockKey = sock.register(selector, SelectionKey.OP_CONNECT);boolean immediateConnect = sock.connect(addr);if (immediateConnect) {sendThread.primeConnection();}}void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {selector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}...for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {//对于要发起建立TCP连接的请求,则执行sendThread.primeConnection()方法if (sc.finishConnect()) {updateLastSendAndHeard();updateSocketAddresses();//比如处理发送会话创建的请求sendThread.primeConnection();}} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//处理建立好TCP连接后的其他读写请求doIO(pendingQueue, cnxn);}}...}void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {SocketChannel sock = (SocketChannel) sockKey.channel();...//6.接收服务端对会话创建请求的响应if (sockKey.isReadable()) {...}//5.发送会话创建请求if (sockKey.isWritable()) {//从outgoingQueue中取出会话创建请求的Packet对象Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());...//进行序列化后发送到服务端p.createBB();sock.write(p.bb);outgoingQueue.removeFirstOccurrence(p);pendingQueue.add(p);...}...}...
}

(3)响应处理阶段:接收会话创建请求的响应

一.接收服务端对会话创建请求的响应

二.处理会话创建请求的响应

三.更新ClientCnxn客户端连接器

四.生成SyncConnected-None事件

五.从ZKWatchManager查询Watcher

六.EventThread线程触发处理Watcher

一.接收服务端对会话创建请求的响应

客户端的网络连接接收到服务端响应后,会先判断自己是否已被初始化。如果尚未初始化,那么就认为该响应是会话创建请求的响应,直接通过ClientCnxnSocket的readConnectResult()方法进行处理。ClientCnxnSocket是客户端的网络连接,ClientCnxn是客户端的网络连接器。

二.处理会话创建请求的响应

ClientCnxnSocket的readConnectResult()方法会对响应进行反序列化,也就是反序列化成ConnectResponse对象,然后再从该对象中获取出会话ID。

三.更新ClientCnxn客户端连接器

服务端的响应表明连接成功,那么就需要通知SendThread线程,通过SendThread线程进一步更新ClientCnxn客户端连接器的信息,包括readTimeout、connectTimeout、会话状态、HostProvider.lastIndex。

四.生成SyncConnected-None事件

为了让上层应用感知会话已成功创建,SendThread会生成一个SyncConnected-None事件代表会话创建成功,并将该事件通过EventThread的queueEvent()方法传递给EventThread线程。

五.从ZKWatchManager查询Watcher

EventThread线程通过queueEvent方法收到事件后,会从ZKWatchManager管理器查询出对应的Watcher,然后将Watcher放到EventThread的waitingEvents队列中。

客户端的Watcher管理器是ZKWatchManager。

服务端的Watcher管理器是WatchManager。

六.EventThread线程触发处理Watcher

EventThread线程会不断从waitingEvents队列取出待处理的Watcher对象,然后调用Watcher的process()方法来触发Watcher。

public class ClientCnxnSocketNIO extends ClientCnxnSocket {...void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {SocketChannel sock = (SocketChannel) sockKey.channel();...//1.接收服务端对会话创建请求的响应if (sockKey.isReadable()) {int rc = sock.read(incomingBuffer);if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount.getAndIncrement();readLength();} else if (!initialized) {//判断客户端的网络连接是否已初始化//收到服务端响应时,还没有建立连接,说明这次响应是对建立TCP连接的响应//2.处理会话创建请求的响应readConnectResult();enableRead();if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;//设置客户端的网络连接为已初始化} else {//处理服务端的非建立连接请求的响应sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}if (sockKey.isWritable()) {...}}...
}abstract class ClientCnxnSocket {...void readConnectResult() throws IOException {//对会话创建请求的响应进行反序列化ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ConnectResponse conRsp = new ConnectResponse();conRsp.deserialize(bbia, "connect");boolean isRO = false;try {isRO = bbia.readBool("readOnly");} catch (IOException e) {LOG.warn("Connected to an old server; r-o mode will be unavailable");}this.sessionId = conRsp.getSessionId();//3.更新ClientCnxn客户端连接器:包括状态、HostProvider的lastIndex游标sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);}...
}public class ClientCnxn {...class SendThread extends ZooKeeperThread {private final ClientCnxnSocket clientCnxnSocket;...void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException {negotiatedSessionTimeout = _negotiatedSessionTimeout;if (negotiatedSessionTimeout <= 0) {changeZkState(States.CLOSED);eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));eventThread.queueEventOfDeath();}readTimeout = negotiatedSessionTimeout * 2 / 3;connectTimeout = negotiatedSessionTimeout / hostProvider.size();hostProvider.onConnected();sessionId = _sessionId;sessionPasswd = _sessionPasswd;changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;//4.生成SyncConnected-None事件eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));}}private final ClientWatchManager watcher;class EventThread extends ZooKeeperThread {private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();public void queueEvent(WatchedEvent event) {queueEvent(event, null);}private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {if (event.getType() == EventType.None && sessionState == event.getState()) {return;}sessionState = event.getState();final Set<Watcher> watchers;if (materializedWatchers == null) {watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());} else {watchers = new HashSet<Watcher>();watchers.addAll(materializedWatchers);}WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);waitingEvents.add(pair);}public void run() {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//5.EventThread触发处理WatcherprocessEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}}}private void processEvent(Object event) {if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {watcher.process(pair.event);}}...}...}
}
http://www.dtcms.com/wzjs/17667.html

相关文章:

  • 监狱门户网站的建设产品运营推广方案
  • 个人备案网站可以做电商吗百度资源平台链接提交
  • 手机端网站建设教程视频太原百度推广排名优化
  • 阜阳营销型网站建设柳市网站制作
  • wordpress google提交安徽网络推广和优化
  • 蕲春做网站百度账号注册中心
  • 阿里云搭建多个网站灰色关键词代发可测试
  • 西宁网站制作费用是多少钱外贸国际网站推广
  • 外贸网站建设的败笔防疫管控优化措施
  • wordpress授权小程序高州网站seo
  • 注册域名之后怎么做网站百度一下电脑版首页
  • 虚拟主机控制面板怎么建设网站百度一下百度网页版进入
  • 胶南建网站网站如何被搜索引擎收录
  • 做网站要用什么服务器吗厦门seo怎么做
  • 展示型网站建设流程方案网络设计
  • 海派虫网站推广软件百度推广代理开户
  • 大于二高端网站建设专业的网站优化公司排名
  • 建站系统做的网站百度可以搜索到吗手机推广app
  • 如何给自己的网站做优化百度搜索量怎么查
  • 广州建设网站怎么做惠州市seo广告优化营销工具
  • 效果图在线搜索优化软件
  • 计算机毕设做网站杭州排名优化软件
  • 云南旅游攻略6天5晚多少钱哈尔滨seo服务
  • 同一个域名两个网站网络舆情管控
  • 哪个网站可以做社工试题搜索引擎seo优化平台
  • 北京的网站开发公司怎么样在百度上免费推广
  • 电脑配件电子商务网站设计方案无锡网站关键词推广
  • 买源码的网站今日军事新闻头条
  • wordpress paypal插件北京首页关键词优化
  • 网站建设douyanet爱站网seo