当前位置: 首页 > wzjs >正文

自助式网站网站建设开发上线流程

自助式网站,网站建设开发上线流程,如何在微信上做小程序开店,临沂 企业网站建设大纲 1.基于Curator进行基本的zk数据操作 2.基于Curator实现集群元数据管理 3.基于Curator实现HA主备自动切换 4.基于Curator实现Leader选举 5.基于Curator实现分布式Barrier 6.基于Curator实现分布式计数器 7.基于Curator实现zk的节点和子节点监听机制 8.基于Curator创…

大纲

1.基于Curator进行基本的zk数据操作

2.基于Curator实现集群元数据管理

3.基于Curator实现HA主备自动切换

4.基于Curator实现Leader选举

5.基于Curator实现分布式Barrier

6.基于Curator实现分布式计数器

7.基于Curator实现zk的节点和子节点监听机制

8.基于Curator创建客户端实例的源码分析

9.Curator在启动时是如何跟zk建立连接的

10.基于Curator进行增删改查节点的源码分析

11.基于Curator的节点监听回调机制的实现源码

12.基于Curator的Leader选举机制的实现源码

1.基于Curator进行基本的zk数据操作

Guava is to Java what Curator is to ZooKeeper,引入依赖如下:

<dependencies><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>
</dependencies>

Curator实现对znode进行增删改查的示例如下,其中CuratorFramework代表一个客户端实例。注意:可以通过creatingParentsIfNeeded()方法进行指定节点的级联创建。

public class CrudDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();//启动客户端并建立连接System.out.println("已经启动Curator客户端");client.create().creatingParentsIfNeeded()//进行级联创建.withMode(CreateMode.PERSISTENT)//指定节点类型.forPath("/my/path", "10".getBytes());//增byte[] dataBytes = client.getData().forPath("/my/path");//查System.out.println(new String(dataBytes));client.setData().forPath("/my/path", "11".getBytes());//改dataBytes = client.getData().forPath("/my/path");System.out.println(new String(dataBytes));List<String> children = client.getChildren().forPath("/my");//查System.out.println(children);client.delete().forPath("/my/path");//删Thread.sleep(Integer.MAX_VALUE);}
}

2.基于Curator实现集群元数据管理

Curator可以操作zk。比如自研了一套分布式系统类似于Kafka、Canal,想把集群运行的核心元数据都放到zk里去。此时就可以通过Curator创建一些znode,往里面写入对应的值。

写入的值推荐用json格式,比如Kafka就是往zk写json格式数据。这样,其他客户端在需要的时候,就可以从里面读取出集群元数据了。

3.基于Curator实现HA主备自动切换

HDFS、Kafka、Canal都使用了zk进行Leader选举,所以可以基于Curator实现HA主备自动切换。

HDFS的NameNode是可以部署HA架构的,有主备两台机器。如果主机器宕机了,备用的机器可以感知到并选举为Leader,这样备用的机器就可以作为新的NameNode对外提供服务。

Kafka里的Controller负责管理整个集群的协作,Kafka中任何一个Broker都可以变成Controller,类似于Leader的角色。

Canal也会部署主备两台机器,主机器挂掉了,备用机器就可以跟上去。

4.基于Curator实现Leader选举

(1)Curator实现Leader选举的第一种方式之LeaderLatch

(2)Curator实现Leader选举的第二种方式之LeaderSelector

(1)Curator实现Leader选举的第一种方式之LeaderLatch

通过Curator的LeaderLatch来实现Leader选举:

public class LeaderLatchDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();//"/leader/latch"这其实是一个znode顺序节点LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");leaderLatch.start();leaderLatch.await();//直到等待他成为Leader再往后执行//类似于HDFS里,两台机器,其中一台成为了Leader就开始工作//另外一台机器可以通过await阻塞在这里,直到Leader挂了,自己就会成为Leader继续工作Boolean hasLeaderShip = leaderLatch.hasLeadership();//判断是否成为LeaderSystem.out.println("是否成为leader:" + hasLeaderShip);Thread.sleep(Integer.MAX_VALUE);}
}

(2)Curator实现Leader选举的第二种方式之LeaderSelector

通过Curator的LeaderSelector来实现Leader选举如下:其中,LeaderSelector有两个监听器,可以关注连接状态。

public class LeaderSelectorDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();LeaderSelector leaderSelector = new LeaderSelector(client,"/leader/election",new LeaderSelectorListener() {public void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("你已经成为了Leader......");//在这里干Leader所有的事情,此时方法不能退出Thread.sleep(Integer.MAX_VALUE);}public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("连接状态的变化,已经不是Leader......");if (connectionState.equals(ConnectionState.LOST)) {throw new CancelLeadershipException();}}});leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为LeaderThread.sleep(Integer.MAX_VALUE);}
}

5.基于Curator实现的分布式Barrier

(1)分布式Barrier

(2)分布式双重Barrier

(1)分布式Barrier

很多台机器都可以创建一个Barrier,此时它们都被阻塞了。除非满足一个条件(setBarrier()或removeBarrier()),才能不再阻塞它们。

//DistributedBarrier
public class DistributedBarrierDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");barrier.waitOnBarrier();}
}

(2)分布式双重Barrier

//DistributedDoubleBarrier
public class DistributedDoubleBarrierDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client, "/barrier/double", 10);doubleBarrier.enter();//每台机器都会阻塞在enter这里//直到10台机器都调用了enter,就会从enter这里往下执行//此时可以做一些计算任务doubleBarrier.leave();//每台机器都会阻塞在leave这里,直到10台机器都调用了leave//此时就可以继续往下执行}
}

6.基于Curator实现分布式计数器

如果真的要实现分布式计数器,最好用Redis来实现。因为Redis的并发量更高,性能更好,功能更加的强大,而且还可以使用lua脚本嵌入进去实现复杂的业务逻辑。但是Redis天生的异步同步机制,存在机器宕机导致的数据不同步风险。然而zk在ZAB协议下的数据同步机制,则不会出现宕机导致数据不同步的问题。

//SharedCount:通过一个节点的值来实现
public class SharedCounterDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();SharedCount sharedCount = new SharedCount(client, "/shared/count", 0);sharedCount.start();sharedCount.addListener(new SharedCountListener() {public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {System.out.println("分布式计数器变化了......");}public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("连接状态变化了.....");}});Boolean result = sharedCount.trySetCount(1);System.out.println(sharedCount.getCount());}
}

7.基于Curator实现zk的节点和子节点监听机制

(1)基于Curator实现zk的子节点监听机制

(2)基于Curator实现zk的节点数据监听机制

我们使用zk主要用于:

一.对元数据进行增删改查、监听元数据的变化

二.进行Leader选举

有三种类型的节点可以监听:

一.子节点监听PathCache

二.节点监听NodeCache

三.整个节点以下的树监听TreeCache

(1)基于Curator实现zk的子节点监听机制

下面是PathCache实现的子节点监听示例:

public class PathCacheDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);//cache就是把zk里的数据缓存到客户端里来//可以针对这个缓存的数据加监听器,去观察zk里的数据的变化pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {}});pathChildrenCache.start();}
}

(2)基于Curator实现zk的节点数据监听机制

下面是NodeCache实现的节点监听示例:

public class NodeCacheDemo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);client.start();final NodeCache nodeCache = new NodeCache(client, "/cluster");nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {Stat stat = client.checkExists().forPath("/cluster");if (stat == null) {} else {nodeCache.getCurrentData();}}});nodeCache.start();}
}

8.基于Curator创建客户端实例的源码分析

(1)创建CuratorFramework实例使用了构造器模式

(2)创建CuratorFramework实例会初始化CuratorZooKeeperClient实例

(1)创建CuratorFramework实例使用了构造器模式

CuratorFrameworkFactory.newClient()方法使用了构造器模式。首先通过builder()方法创建出Builder实例对象,然后把参数都设置成Builder实例对象的属性,最后通过build()方法把Builder实例对象传入目标类的构造方法中。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");}
}public class CuratorFrameworkFactory {//创建CuratorFramework实例使用了构造器模式public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();}...public static Builder builder() {return new Builder();}public static class Builder {...private EnsembleProvider ensembleProvider;private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;private RetryPolicy retryPolicy;...public Builder connectString(String connectString) {ensembleProvider = new FixedEnsembleProvider(connectString);return this;}public Builder sessionTimeoutMs(int sessionTimeoutMs) {this.sessionTimeoutMs = sessionTimeoutMs;return this;}public Builder connectionTimeoutMs(int connectionTimeoutMs) {this.connectionTimeoutMs = connectionTimeoutMs;return this;}public Builder retryPolicy(RetryPolicy retryPolicy) {this.retryPolicy = retryPolicy;return this;}...public CuratorFramework build() {return new CuratorFrameworkImpl(this);}}...
}public class CuratorFrameworkImpl implements CuratorFramework {...public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());this.client = new CuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(),builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),builder.getWaitForShutdownTimeoutMs(),new Watcher() {//这里注册了一个zk的watcher@Overridepublic void process(WatchedEvent watchedEvent) {CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);processEvent(event);}},builder.getRetryPolicy(),builder.canBeReadOnly(),builder.getConnectionHandlingPolicy());...}...
}

(2)创建CuratorFramework实例会初始化CuratorZooKeeperClient实例

CuratorFramework实例代表了一个zk客户端,CuratorFramework初始化时会初始化一个CuratorZooKeeperClient实例。

CuratorZooKeeperClient是Curator封装ZooKeeper的客户端。

初始化CuratorZooKeeperClient时会传入一个Watcher监听器。

所以CuratorFrameworkFactory的newClient()方法的主要工作是:初始化CuratorFramework -> 初始化CuratorZooKeeperClient -> 初始化ZookeeperFactory + 注册一个Watcher。

客户端发起与zk的连接,以及注册Watcher监听器,则是由CuratorFramework的start()方法触发的。

9.Curator启动时是如何跟zk建立连接的

ConnectionStateManager的start()方法会启动一个线程处理eventQueue。eventQueue里存放了与zk的网络连接变化事件,eventQueue收到这种事件便会通知ConnectionStateListener。

CuratorZookeeperClient的start()方法会初始化好原生zk客户端,和zk服务器建立一个TCP长连接,而且还会注册一个ConnectionState类型的Watcher监听器,以便能收到zk服务端发送的通知事件。

public class CuratorFrameworkImpl implements CuratorFramework {private final CuratorZookeeperClient client;private final ConnectionStateManager connectionStateManager;private volatile ExecutorService executorService;...public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {...this.client = new CuratorZookeeperClient(...);connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());...}...@Overridepublic void start() {log.info("Starting");if (!state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)) {throw new IllegalStateException("Cannot be started more than once");}...//1.启动一个线程监听和zk网络连接的变化事件connectionStateManager.start();//2.添加一个监听器监听和zk网络连接的变化final ConnectionStateListener listener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState) {logAsErrorConnectionErrors.set(true);}}@Overridepublic boolean doNotDecorate() {return true;}};this.getConnectionStateListenable().addListener(listener);//3.创建原生zk客户端client.start();//4.创建一个线程池,执行后台的操作executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);executorService.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {backgroundOperationsLoop();return null;}});if (ensembleTracker != null) {ensembleTracker.start();}log.info(schemaSet.toDocumentation());}...
}public class ConnectionStateManager implements Closeable {private final ExecutorService service;private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);...public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) {...service = Executors.newSingleThreadExecutor(threadFactory);...}...public void start() {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");//启动一个线程service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {processEvents();return null;}});}private void processEvents() {while (state.get() == State.STARTED) {int useSessionTimeoutMs = getUseSessionTimeoutMs();long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;long pollMaxMs = useSessionTimeoutMs - elapsedMs;final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);if (newState != null) {if (listeners.size() == 0) {log.warn("There are no ConnectionStateListeners registered.");}listeners.forEach(listener -> listener.stateChanged(client, newState));} else if (sessionExpirationPercent > 0) {synchronized(this) {checkSessionExpiration();}}}}...
}public class CuratorZookeeperClient implements Closeable {private final ConnectionState state;...public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {...state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);...}...public void start() throws Exception {log.debug("Starting");if (!started.compareAndSet(false, true)) {throw new IllegalStateException("Already started");}state.start();}...
}class ConnectionState implements Watcher, Closeable {private final HandleHolder zooKeeper;ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {this.ensembleProvider = ensembleProvider;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.tracer = tracer;this.connectionHandlingPolicy = connectionHandlingPolicy;if (parentWatcher != null) {parentWatchers.offer(parentWatcher);}//把自己作为Watcher注册给HandleHolderzooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);}...void start() throws Exception {log.debug("Starting");ensembleProvider.start();reset();}synchronized void reset() throws Exception {log.debug("reset");instanceIndex.incrementAndGet();isConnected.set(false);connectionStartMs = System.currentTimeMillis();//创建客户端与zk的连接zooKeeper.closeAndReset();zooKeeper.getZooKeeper();//initiate connection}...
}class HandleHolder {private final ZookeeperFactory zookeeperFactory;private final Watcher watcher;private final EnsembleProvider ensembleProvider;private final int sessionTimeout;private final boolean canBeReadOnly;private volatile Helper helper;...HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly) {this.zookeeperFactory = zookeeperFactory;this.watcher = watcher;this.ensembleProvider = ensembleProvider;this.sessionTimeout = sessionTimeout;this.canBeReadOnly = canBeReadOnly;}private interface Helper {ZooKeeper getZooKeeper() throws Exception;String getConnectionString();int getNegotiatedSessionTimeoutMs();}ZooKeeper getZooKeeper() throws Exception {return (helper != null) ? helper.getZooKeeper() : null;}void closeAndReset() throws Exception {internalClose(0);helper = new Helper() {private volatile ZooKeeper zooKeeperHandle = null;private volatile String connectionString = null;@Overridepublic ZooKeeper getZooKeeper() throws Exception {synchronized(this) {if (zooKeeperHandle == null) {connectionString = ensembleProvider.getConnectionString();//创建和zk的连接,初始化变量zooKeeperHandlezooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);}...return zooKeeperHandle;}}@Overridepublic String getConnectionString() {return connectionString;}@Overridepublic int getNegotiatedSessionTimeoutMs() {return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;}};}...
}//创建客户端与zk的连接
public class DefaultZookeeperFactory implements ZookeeperFactory {@Overridepublic ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);}
}

10.基于Curator进行增删改查节点的源码分析

(1)基于Curator创建znode节点

(2)基于Curator查询znode节点

(3)基于Curator修改znode节点

(4)基于Curator删除znode节点

Curator的CURD操作,底层都是通过调用zk原生的API来完成的。

(1)基于Curator创建znode节点

创建节点也使用了构造器模式:首先通过CuratorFramework的create()方法创建一个CreateBuilder实例,然后通过CreateBuilder的withMode()等方法设置CreateBuilder的变量,最后通过CreateBuilder的forPath()方法 + 重试调用来创建znode节点。

创建节点时会调用CuratorFramework的getZooKeeper()方法获取zk客户端实例,之后就是通过原生zk客户端的API去创建节点了。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//创建节点client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/my/path", "100".getBytes());}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic CreateBuilder create() {checkState();return new CreateBuilderImpl(this);}...
}public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> { private final CuratorFrameworkImpl client;private CreateMode createMode;private Backgrounding backgrounding;private boolean createParentsIfNeeded;...CreateBuilderImpl(CuratorFrameworkImpl client) {this.client = client;createMode = CreateMode.PERSISTENT;backgrounding = new Backgrounding();acling = new ACLing(client.getAclProvider());createParentsIfNeeded = false;createParentsAsContainers = false;compress = false;setDataIfExists = false;storingStat = null;ttl = -1;}@Overridepublic String forPath(final String givenPath, byte[] data) throws Exception {if (compress) {data = client.getCompressionProvider().compress(givenPath, data);}final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));List<ACL> aclList = acling.getAclList(adjustedPath);client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);String returnPath = null;if (backgrounding.inBackground()) {pathInBackground(adjustedPath, data, givenPath);} else {//创建节点String path = protectedPathInForeground(adjustedPath, data, aclList);returnPath = client.unfixForNamespace(path);}return returnPath;}private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception {return pathInForeground(adjustedPath, data, aclList);}private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Foreground");final AtomicBoolean firstTime = new AtomicBoolean(true);//重试调用String returnPath = RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<String>() {@Overridepublic String call() throws Exception {boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;protectedMode.checkSetSessionId(client, createMode);String createdPath = null;if (!localFirstTime && protectedMode.doProtected()) {debugForceFindProtectedNode = false;createdPath = findProtectedNodeInForeground(path);}if (createdPath == null) {//在创建znode节点的时候,首先会调用CuratorFramework.getZooKeeper()获取zk客户端实例//之后就是通过原生zk客户端的API去创建节点了try {if (client.isZk34CompatibilityMode()) {createdPath = client.getZooKeeper().create(path, data, aclList, createMode);} else {createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);}} catch (KeeperException.NoNodeException e) {if (createParentsIfNeeded) {//这就是级联创建节点的实现ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);if (client.isZk34CompatibilityMode()) {createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);} else {createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);}} else {throw e;}} catch (KeeperException.NodeExistsException e) {if (setDataIfExists) {Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);if (storingStat != null) {DataTree.copyStat(setStat, storingStat);}createdPath = path;} else {throw e;}}}if (failNextCreateForTesting) {failNextCreateForTesting = false;throw new KeeperException.ConnectionLossException();}return createdPath;}});trace.setRequestBytesLength(data).setPath(path).commit();return returnPath;}...
}public class CuratorFrameworkImpl implements CuratorFramework {private final CuratorZookeeperClient client;public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());this.client = new CuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(),builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),builder.getWaitForShutdownTimeoutMs(),new Watcher() {...},builder.getRetryPolicy(),builder.canBeReadOnly(),builder.getConnectionHandlingPolicy());...}...ZooKeeper getZooKeeper() throws Exception {return client.getZooKeeper();}...
}public class CuratorZookeeperClient implements Closeable {private final ConnectionState state;...public ZooKeeper getZooKeeper() throws Exception {Preconditions.checkState(started.get(), "Client is not started");return state.getZooKeeper();}...
}class ConnectionState implements Watcher, Closeable {private final HandleHolder zooKeeper;...ZooKeeper getZooKeeper() throws Exception {if (SessionFailRetryLoop.sessionForThreadHasFailed()) {throw new SessionFailRetryLoop.SessionFailedException();}Exception exception = backgroundExceptions.poll();if (exception != null) {new EventTrace("background-exceptions", tracer.get()).commit();throw exception;}boolean localIsConnected = isConnected.get();if (!localIsConnected) {checkTimeouts();}//通过HandleHolder获取ZooKeeper实例return zooKeeper.getZooKeeper();}...
}

(2)基于Curator查询znode节点

查询节点也使用了构造器模式:首先通过CuratorFramework的getData()方法创建一个GetDataBuilder实例,然后通过GetDataBuilder的forPath()方法 + 重试调用来查询znode节点。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//查询节点byte[] dataBytes = client.getData().forPath("/my/path");System.out.println(new String(dataBytes));//查询子节点List<String> children = client.getChildren().forPath("/my");System.out.println(children);}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic GetDataBuilder getData() {checkState();return new GetDataBuilderImpl(this);}@Overridepublic GetChildrenBuilder getChildren() {checkState();return new GetChildrenBuilderImpl(this);}...
}public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> {private final CuratorFrameworkImpl  client;...@Overridepublic byte[] forPath(String path) throws Exception {client.getSchemaSet().getSchema(path).validateWatch(path, watching.isWatched() || watching.hasWatcher());path = client.fixForNamespace(path);byte[] responseData = null;if (backgrounding.inBackground()) {client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);} else {//查询节点responseData = pathInForeground(path);}return responseData;}private byte[] pathInForeground(final String path) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");//重试调用byte[] responseData = RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<byte[]>() {@Overridepublic byte[] call() throws Exception {byte[] responseData;//通过CuratorFramework获取原生zk客户端实例,然后调用其getData()获取节点if (watching.isWatched()) {responseData = client.getZooKeeper().getData(path, true, responseStat);} else {responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat);watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);}return responseData;}});trace.setResponseBytesLength(responseData).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(responseStat).commit();return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;}...
}

(3)基于Curator修改znode节点

修改节点也使用了构造器模式:首先通过CuratorFramework的setData()方法创建一个SetDataBuilder实例,然后通过SetDataBuilder的forPath()方法 + 重试调用来修改znode节点。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//修改节点client.setData().forPath("/my/path", "110".getBytes());byte[] dataBytes = client.getData().forPath("/my/path");System.out.println(new String(dataBytes));}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic SetDataBuilder setData() {checkState();return new SetDataBuilderImpl(this);}...
}public class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> {private final CuratorFrameworkImpl client;...@Overridepublic Stat forPath(String path, byte[] data) throws Exception {client.getSchemaSet().getSchema(path).validateGeneral(path, data, null);if (compress) {data = client.getCompressionProvider().compress(path, data);}path = client.fixForNamespace(path);Stat resultStat = null;if (backgrounding.inBackground()) {client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null);} else {//修改节点resultStat = pathInForeground(path, data);}return resultStat;}private Stat pathInForeground(final String path, final byte[] data) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");//重试调用Stat resultStat = RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<Stat>() {@Overridepublic Stat call() throws Exception {//通过CuratorFramework获取原生zk客户端实例,然后调用其setData()修改节点return client.getZooKeeper().setData(path, data, version);}});trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();return resultStat;}...
}

(4)基于Curator删除znode节点

删除节点也使用了构造器模式:首先通过CuratorFramework的delete()方法创建一个DeleteBuilder实例,然后通过DeleteBuilder的forPath()方法 + 重试调用来删除znode节点。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端");//删除节点client.delete().forPath("/my/path");}
}public class CuratorFrameworkImpl implements CuratorFramework {...@Overridepublic DeleteBuilder delete() {checkState();return new DeleteBuilderImpl(this);}...
}public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> {private final CuratorFrameworkImpl client;...@Overridepublic Void forPath(String path) throws Exception {client.getSchemaSet().getSchema(path).validateDelete(path);final String unfixedPath = path;path = client.fixForNamespace(path);if (backgrounding.inBackground()) {OperationAndData.ErrorCallback<String> errorCallback = null;if (guaranteed) {errorCallback = new OperationAndData.ErrorCallback<String>() {@Overridepublic void retriesExhausted(OperationAndData<String> operationAndData) {client.getFailedDeleteManager().addFailedOperation(unfixedPath);}};}client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null);} else {//删除节点pathInForeground(path, unfixedPath);}return null;}private void pathInForeground(final String path, String unfixedPath) throws Exception {OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("DeleteBuilderImpl-Foreground");//重试调用RetryLoop.callWithRetry(client.getZookeeperClient(),new Callable<Void>() {@Overridepublic Void call() throws Exception {try {//通过CuratorFramework获取原生zk客户端实例,然后调用其delete()删除节点client.getZooKeeper().delete(path, version);} catch (KeeperException.NoNodeException e) {if (!quietly) {throw e;}} catch (KeeperException.NotEmptyException e) {if (deletingChildrenIfNeeded) {ZKPaths.deleteChildren(client.getZooKeeper(), path, true);} else {throw e;}}return null;}});trace.setPath(path).commit();}
}


文章转载自:

http://LRcD3pyr.Ljwyc.cn
http://7atgN6cM.Ljwyc.cn
http://ttZXLr7u.Ljwyc.cn
http://jn3rlvoO.Ljwyc.cn
http://KDt9qXvQ.Ljwyc.cn
http://UeIfGWW7.Ljwyc.cn
http://NL4MZBRL.Ljwyc.cn
http://TWMs7Dzy.Ljwyc.cn
http://a5NnduVT.Ljwyc.cn
http://vGmMNywK.Ljwyc.cn
http://6KLKAx8i.Ljwyc.cn
http://5CusMe0l.Ljwyc.cn
http://klh7iq18.Ljwyc.cn
http://ZjsEAnzb.Ljwyc.cn
http://rDZxzsXO.Ljwyc.cn
http://RR7pAIb7.Ljwyc.cn
http://t3mPW7Kq.Ljwyc.cn
http://QnqOopSj.Ljwyc.cn
http://UEYZ4DBT.Ljwyc.cn
http://7ysBON2E.Ljwyc.cn
http://4n38Ss3N.Ljwyc.cn
http://i7vRpiCG.Ljwyc.cn
http://pqmlT5wP.Ljwyc.cn
http://sOV8oTIW.Ljwyc.cn
http://8ZsfJtFm.Ljwyc.cn
http://c9HLwZCW.Ljwyc.cn
http://r42rwXp9.Ljwyc.cn
http://yEf45lsj.Ljwyc.cn
http://KfbOSR5E.Ljwyc.cn
http://CkrDUb6F.Ljwyc.cn
http://www.dtcms.com/wzjs/736436.html

相关文章:

  • 怎么用自己电脑做网站物流网站
  • 采集网站seo国内Wordpress博客平台
  • 泉州网站建设公司推荐网站制作引擎
  • 美乐乐网站源码网站icp备案系统下载
  • 深圳市宝安区做网站建设的企业海外网络推广收费
  • 如何建立一个网站及app网络营销总监岗位职责
  • 网站建设的地方网站开发流程指什么
  • 专业的传媒行业网站开发广告联盟怎么接单
  • wordpress 网站标题图设计师的网站有哪些
  • 兰州建设工程信息网站山西省网站建设哪里好
  • 网站域名用公司注册信息查询做非遗网站的原因
  • 宁波住房和城乡建设网站建设工程立项在哪个网站查询
  • 如何建设国际网站网站的技术支持
  • 网站策划书如何做网站建设那个最好
  • wordpress设置网站主题网站建设互联
  • 深圳网站建设中为wordpress 登录页面变了
  • 如何做网站的的关键词湘潭网站建设公司
  • 联通公司网站谁做的网站维护中要多久才能重新进入
  • 网站推广营销服务wordpress 群发邮件
  • wap网站模板下载域名已更改请拿笔记住
  • 韩国优秀电商网站wordpress new
  • 关于建设网站业务系统的请示做网站切图软件
  • 北京做网站的好公司导购网站怎么做的
  • 找别人做的淘客网站 会不会有问题手机编程软件python
  • 吉林省 网站建设wordpress加载特效插件
  • 深圳龙华网站建设公司龙华营销型网站制作哪家好
  • 做外贸做网站做网站图片为什么不清晰
  • 成都百度网站设计公司服务器和域名有免费申请
  • 园林公司做网站的好处邢台市地图全图高清版
  • 网站域名实名认证官网房在线房产中介管理系统