当前位置: 首页 > wzjs >正文

怎么用txt做网站全球搜索引擎入口

怎么用txt做网站,全球搜索引擎入口,可以建网站的路由器,陕西网站建设营销推广大纲 1.Curator的可重入锁的源码 2.Curator的非可重入锁的源码 3.Curator的可重入读写锁的源码 4.Curator的MultiLock源码 5.Curator的Semaphore源码 1.Curator的可重入锁的源码 (1)InterProcessMutex获取分布式锁 (2)InterProcessMutex的初始化 (3)InterProcessMutex.…

大纲

1.Curator的可重入锁的源码

2.Curator的非可重入锁的源码

3.Curator的可重入读写锁的源码

4.Curator的MultiLock源码

5.Curator的Semaphore源码

1.Curator的可重入锁的源码

(1)InterProcessMutex获取分布式锁

(2)InterProcessMutex的初始化

(3)InterProcessMutex.acquire()尝试获取锁

(4)LockInternals.attemptLock()尝试获取锁

(5)不同客户端线程获取锁时的互斥实现

(6)同一客户端线程可重入加锁的实现

(7)客户端线程释放锁的实现

(8)客户端线程释放锁后其他线程获取锁的实现

(9)InterProcessMutex就是一个公平锁

(1)InterProcessMutex获取分布式锁

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);client.start();System.out.println("已经启动Curator客户端");//获取分布式锁InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");lock.acquire();Thread.sleep(1000);lock.release();}
}

(2)InterProcessMutex的初始化

设置锁的节点路径basePath + 初始化一个LockInternals对象实例。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME = "lock-";...public InterProcessMutex(CuratorFramework client, String path) {this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {this(client, path, LOCK_NAME, 1, driver);}//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//1.设置锁的节点路径basePath = PathUtils.validatePath(path);//2.初始化一个LockInternals对象实例internals = new LockInternals(client, driver, path, lockName, maxLeases);}
}public class LockInternals {private final LockInternalsDriver driver;private final String lockName;private volatile int maxLeases;private final WatcherRemoveCuratorFramework client;private final String basePath;private final String path;...LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver = driver;this.lockName = lockName;this.maxLeases = maxLeases;this.client = client.newWatcherRemoveCuratorFramework();this.basePath = PathUtils.validatePath(path);this.path = ZKPaths.makePath(path, lockName);}...
}

(3)InterProcessMutex.acquire()尝试获取锁

LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象,用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount,用于锁的重入次数计数。

在执行InterProcessMutex的acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的LockData数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间time = -1。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME = "lock-";//一个线程对应一个LockData数据对象private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();...//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//设置锁的路径basePath = PathUtils.validatePath(path);//初始化LockInternalsinternals = new LockInternals(client, driver, path, lockName, maxLeases);}@Overridepublic void acquire() throws Exception {//获取分布式锁,会一直阻塞等待直到获取成功//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用if (!internalLock(-1, null)) {throw new IOException("Lost connection while trying to acquire lock: " + basePath);}}private boolean internalLock(long time, TimeUnit unit) throws Exception {//获取当前线程Thread currentThread = Thread.currentThread();//获取当前线程对应的LockData数据LockData lockData = threadData.get(currentThread);if (lockData != null) {//可重入计算lockData.lockCount.incrementAndGet();return true;}//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath != null) {//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象LockData newLockData = new LockData(currentThread, lockPath);//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中threadData.put(currentThread, newLockData);return true;}return false;}//LockData是InterProcessMutex的一个静态内部类private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数private LockData(Thread owningThread, String lockPath) {this.owningThread = owningThread;this.lockPath = lockPath;}}protected byte[] getLockNodeBytes() {return null;}...
}

(4)LockInternals.attemptLock()尝试获取锁

先创建临时节点,再判断是否满足获取锁的条件。

步骤一:首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。

步骤二:然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的PredicateResults对象。

具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁,默认是1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。

获取锁成功,则会中断LockInternals的internalLockLoop()方法的while循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象,然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。

获取锁失败,则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息,并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后,便会调用wait()方法将当前线程挂起。

所以前一个节点发生变化时,便会通知添加的Watcher监听。然后便会唤醒阻塞的线程,继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。

public class LockInternals {private final LockInternalsDriver driver;LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver = driver;this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称...}...String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//获取当前时间final long startMillis = System.currentTimeMillis();//默认情况下millisToWait=nullfinal Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//默认情况下localLockNodeBytes也是nullfinal byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;boolean hasTheLock = false;//是否已经获取到锁boolean isDone = false;//是否正在获取锁while (!isDone) {isDone = true;//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点ourPath = driver.createsTheLock(client, path, localLockNodeBytes);//2.检查是否获取到了锁hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);}if (hasTheLock) {return ourPath;}return null;}private final Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;...while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock = true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息,并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下,millisToWait = nullif (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait <= 0) {doDelete = true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}List<String> getSortedChildren() throws Exception {//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表return getSortedChildren(client, basePath, lockName, driver);}public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = client.getChildren().forPath(basePath);//对节点名称进行排序List<String> sortedList = Lists.newArrayList(children);Collections.sort(sortedList,new Comparator<String>() {@Overridepublic int compare(String lhs, String rhs) {return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));}});return sortedList;}...
}public class StandardLockInternalsDriver implements LockInternalsDriver {...//级联创建一个临时顺序节点@Overridepublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默认情况下传入的lockNodeBytes=nullif (lockNodeBytes != null) {ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);} else {//创建临时顺序节点ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;}//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败boolean getsTheLock = ourIndex < maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}

(5)不同客户端线程获取锁时的互斥实现

maxLeases代表了同时允许多少个客户端可以获取到锁,默认值是1。能否获取锁的判断就是:线程创建的节点的位置outIndex < maxLeases。当线程1创建的节点在节点列表中排第一时,满足outIndex = 0 < maxLeases = 1,可以获取锁。当线程2创建的节点再节点列表中排第二时,不满足outIndex = 1 < maxLeases = 1,所以不能获取锁。从而实现线程1和线程2获取锁时的互斥。

(6)同一客户端线程可重入加锁的实现

客户端线程重复获取锁时,会重复调用InterProcessMutex的internalLock()方法。在InterProcessMutex的internalLock()方法中:线程第一次获取锁成功会创建一个LockData对象,并存放在一个Map中。线程第二次获取锁时,便会从这个Map中取出这个LockData对象,并对LockData对象中的重入计数器lockCount进行递增,接着就返回true。以此实现可重入加锁。

(7)客户端线程释放锁的实现

客户端线程释放锁时会调用InterProcessMutex的release()方法。

首先对LockData里的重入计数器进行递减。当重入计数器大于0时,直接返回。当重入计数器为0时才执行下一步删除节点的操作。

然后删除客户端线程创建的临时顺序节点,client.delete().guaranteed().forPath(ourPath)。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();...@Overridepublic void release() throws Exception {//获取当前线程Thread currentThread = Thread.currentThread();//获取当前线程对应的LockData对象LockData lockData = threadData.get(currentThread);if (lockData == null) {throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}//1.首先对LockData里的重入计数器lockCount进行递减int newLockCount = lockData.lockCount.decrementAndGet();if (newLockCount > 0) {//当重入计数器大于0时,直接返回return;}if (newLockCount < 0) {throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}try {//2.当重入计数器为0时执行删除节点的操作internals.releaseLock(lockData.lockPath);} finally {threadData.remove(currentThread);}}...
}public class LockInternals {...final void releaseLock(String lockPath) throws Exception {client.removeWatchers();revocable.set(null);deleteOurPath(lockPath);}private void deleteOurPath(String ourPath) throws Exception {//删除节点client.delete().guaranteed().forPath(ourPath);}...
}

(8)客户端线程释放锁后其他线程获取锁的实现

由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法,也就是唤醒该线程,让其继续执行while循环获取锁。

public class LockInternals {...private final Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;...while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock = true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息,并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下,millisToWait = nullif (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait <= 0) {doDelete = true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}...
}

(9)InterProcessMutex就是一个公平锁

因为所有客户端线程都会创建一个顺序节点,然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁,实现了所有客户端排队获取锁。

图片

2.Curator的非可重入锁的源码

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

非可重入锁:同一个时间只能有一个客户端线程获取到锁,其他线程都要排队,而且同一个客户端线程是不可重入加锁的。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");//非可重入锁InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks");lock.acquire();Thread.sleep(3000);lock.release();}
}

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

Curator的非可重入锁是基于Semaphore来实现的,也就是将Semaphore允许获取Lease的客户端线程数设置为1,从而实现同一时间只能有一个客户端线程获取到Lease。

public class InterProcessSemaphoreMutex implements InterProcessLock {private final InterProcessSemaphoreV2 semaphore;private final WatcherRemoveCuratorFramework watcherRemoveClient;private volatile Lease lease;public InterProcessSemaphoreMutex(CuratorFramework client, String path) {watcherRemoveClient = client.newWatcherRemoveCuratorFramework();this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);}@Overridepublic void acquire() throws Exception {//获取非可重入锁就是获取Semaphore的Leaselease = semaphore.acquire();}@Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception {Lease acquiredLease = semaphore.acquire(time, unit);if (acquiredLease == null) {return false;}lease = acquiredLease;return true;}@Overridepublic void release() throws Exception {//释放非可重入锁就是释放Semaphore的LeaseLease lease = this.lease;Preconditions.checkState(lease != null, "Not acquired");this.lease = null;lease.close();watcherRemoveClient.removeWatchers();}
}
http://www.dtcms.com/wzjs/211751.html

相关文章:

  • 网站开发多少钱一个月windows优化大师有哪些功能
  • 做粤菜的视频网站sem和seo有什么区别
  • 什么程序做网站安全杭州网站定制
  • 网站建设的费用包括成都网站设计公司
  • app开发流程 网站开发公司推广文案
  • 工商登记代理代办山东seo百度推广
  • 企业网站模板谷歌全球营销
  • 网站开发好吗怎么弄属于自己的网站
  • 深圳手机商城网站设计宁波江北区网站推广联系方式
  • 网站建设龙岗如何制作一个网页页面
  • 唱片公司网站模板免费发广告的网站大全
  • 可以举报一个做网络网站发大财吗专门看广告的网站
  • 做淘宝代销哪个网站好互联网公司排名
  • apache发布多个网站宁波网站制作优化服务
  • 内部网站开发百度推广下载安装
  • 郑州市城乡建设厅网站网络推广是什么意思
  • 椒江建设网保障性阳光工程网站全网营销的公司
  • 连锁品牌网站建设济南seo排名搜索
  • 北京地铁优化搜索引擎优化的核心本质
  • 江苏seo推广网站建设餐饮店如何引流与推广
  • 网站运营需要 做哪些工作如何让产品吸引顾客
  • 网站建设wbs网络推广的工作内容是什么
  • 杭州高端响应式网站百度网站安全检测
  • wordpress企业网站模板破解宁波网站制作优化服务公司
  • 深圳定制网站公司海外推广专员
  • 国家社会保险公共服务平台百度seo关键词怎么做
  • 怎么搭建购物网站北京网络seo经理
  • 用js做跳转到其他网站搜多多搜索引擎入口
  • 针式个人知识库管理系统优化推广排名网站教程
  • 大庆建设局网站迁址小红书推广策略