分布式锁—7.Curator的分布式锁二
大纲
1.Curator的可重入锁的源码
2.Curator的非可重入锁的源码
3.Curator的可重入读写锁的源码
4.Curator的MultiLock源码
5.Curator的Semaphore源码
3.Curator的可重入读写锁的源码
(1)Curator的可重入读写锁InterProcessReadWriteLock的使用
(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化
(3)InterProcessMutex获取锁的源码
(4)先获取读锁 + 后获取读锁的情形分析
(5)先获取读锁 + 后获取写锁的情形分析
(6)先获取写锁 + 后获取读锁的情形分析
(7)先获取写锁 + 再获取写锁的情形分析
(1)Curator的可重入读写锁InterProcessReadWriteLock的使用
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//读写锁
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/locks");
lock.readLock().acquire();
lock.readLock().release();
lock.writeLock().acquire();
lock.writeLock().release();
}
}
(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化
读锁和写锁都是基于可重入锁InterProcessMutex的子类来实现的。读锁和写锁的获取锁和释放锁逻辑,就是使用InterProcessMutex的逻辑。
public class InterProcessReadWriteLock {
private final InterProcessMutex readMutex;//读锁
private final InterProcessMutex writeMutex;//写锁
//must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
...
//InterProcessReadWriteLock的初始化
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
//写锁的初始化
writeMutex = new InternalInterProcessMutex(
client,
basePath,
WRITE_LOCK_NAME,//写锁的lockName='__WRIT__'
lockData,
1,//写锁的maxLeases
new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
//读锁的初始化
readMutex = new InternalInterProcessMutex(
client,
basePath,
READ_LOCK_NAME,//读锁的lockName='__READ__'
lockData,
Integer.MAX_VALUE,//读锁的maxLeases
new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
private static class InternalInterProcessMutex extends InterProcessMutex {
private final String lockName;
private final byte[] lockData;
InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver) {
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
this.lockData = lockData;
}
...
}
public InterProcessMutex readLock() {
return readMutex;
}
public InterProcessMutex writeLock() {
return writeMutex;
}
...
}
(3)InterProcessMutex获取锁的源码
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
//一个线程对应一个LockData数据对象
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//设置锁的路径
basePath = PathUtils.validatePath(path);
//初始化LockInternals
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
@Override
public void acquire() throws Exception {
//获取分布式锁,会一直阻塞等待直到获取成功
//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData数据
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
//可重入计算
lockData.lockCount.incrementAndGet();
return true;
}
//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
LockData newLockData = new LockData(currentThread, lockPath);
//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
//LockData是InterProcessMutex的一个静态内部类
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
protected byte[] getLockNodeBytes() {
return null;
}
...
}
public class LockInternals {
private final LockInternalsDriver driver;
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
...
}
...
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//获取当前时间
final long startMillis = System.currentTimeMillis();
//默认情况下millisToWait=null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//默认情况下localLockNodeBytes也是null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;//是否已经获取到锁
boolean isDone = false;//是否正在获取锁
while (!isDone) {
isDone = true;
//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//2.检查是否获取到了锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
List<String> getSortedChildren() throws Exception {
//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
return getSortedChildren(client, basePath, lockName, driver);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = client.getChildren().forPath(basePath);
//对节点名称进行排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort(
sortedList,
new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//级联创建一个临时顺序节点
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
//默认情况下传入的lockNodeBytes=null
if (lockNodeBytes != null) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
//创建临时顺序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(4)先获取读锁 + 后获取读锁的情形分析
当线程创建完临时顺序节点,并获取到排好序的节点列表children后,执行LockInternalsDriver的getsTheLock()方法获取能否成功加锁的信息时,会执行到InterProcessReadWriteLock的readLockPredicate()方法。
由于此时firstWriteIndex = Integer.MAX_VALUE,所以无论多少线程尝试获取读锁,都能满足ourIndex < firstWriteIndex,也就是getsTheLock的值会为true,即表示可以获取读锁。
所以读读不互斥。
public class InterProcessReadWriteLock {
...
//sequenceNodeName是当前线程创建的临时顺序节点的路径名称
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
if (writeMutex.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for (String node : children) {
if (node.contains(WRITE_LOCK_NAME)) {
firstWriteIndex = Math.min(index, firstWriteIndex);
} else if (node.startsWith(sequenceNodeName)) {
//找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(5)先获取读锁 + 后获取写锁的情形分析
一.假设客户端线程1首先成功获取了读锁
那么在/locks目录下,此时已经有了如下这个读锁的临时顺序节点。
/locks/43f3-4c2f-ba98-07a641d351f2-__READ__0000000004
二.然后另一个客户端线程2过来尝试获取写锁
于是该线程2会也会先在/locks目录下创建出如下写锁的临时顺序节点:
/locks/9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005
接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:
[43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]
然后会执行StandardLockInternalsDriver的getsTheLock()方法。由于初始化写锁时,设置了其maxLeases是1,而在StandardLockInternalsDriver的getsTheLock()方法中,判断线程能成功获取写锁的依据是:ourIndex < maxLeases。即如果要成功获取写锁,那么线程创建的节点在子节点列表里必须排第一。
而此时,由于之前已有线程获取过一个读锁,而后来又有其他线程往里面创建一个写锁的临时顺序节点。所以写锁的临时顺序节点在子节点列表children里排第二,ourIndex是1。所以index = 1 < maxLeases = 1,条件不成立。
因此,此时客户端线程2获取写锁失败。于是该线程便会给前一个节点添加一个监听器,并调用wait()方法把自己挂起。如果前面一个节点被删除释放了锁,那么该线程就会被唤醒,从而再次尝试判断自己创建的节点是否在当前子节点列表中排第一。如果是,那么就表示获取写锁成功。
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(6)先获取写锁 + 后获取读锁的情形分析
一.假设客户端线程1先获取了写锁
那么在/locks目录下,此时已经有了如下这个写锁的临时顺序节点。
/locks/4383-466e-9b86-fda522ea061a-__WRIT__0000000006
二.然后另一个客户端线程2过来尝试获取读锁
于是该线程2会也会先在/locks目录下创建出如下读锁的临时顺序节点:
/locks/5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007
接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:
[4383-466e-9b86-fda522ea061a-__WRIT__0000000006,
5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007]
然后会执行LockInternalsDriver的getsTheLock()方法获取能否加锁的信息,也就是会执行InterProcessReadWriteLock的readLockPredicate()方法。
public class InterProcessReadWriteLock {
...
//sequenceNodeName是当前线程创建的临时顺序节点的路径名称
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
//如果是同一个客户端线程,先加写锁,再加读锁,是可以成功的,不会互斥
if (writeMutex.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for (String node : children) {
if (node.contains(WRITE_LOCK_NAME)) {
firstWriteIndex = Math.min(index, firstWriteIndex);
} else if (node.startsWith(sequenceNodeName)) {
//找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
在InterProcessReadWriteLock的readLockPredicate()方法中,如果是同一个客户端线程,先获取写锁,再获取读锁,是不会互斥的。如果是不同的客户端线程,线程1先获取写锁,线程2再获取读锁,则互斥。因为线程2执行readLockPredicate()方法在遍历子节点列表(children)时,如果在子节点列表(children)中发现了一个写锁,会设置firstWriteIndex=0。而此时线程2创建的临时顺序节点的ourIndex=1,所以不满足ourIndex(1) < firstWriteIndex(0),于是线程2获取读锁失败。
总结,获取读锁时,在当前线程创建的节点前面:如果还有写锁对应的节点,那么firstWriteIndex就会被重置为具体位置。如果没有写锁对应的节点,那么firstWriteIndex就是MAX_VALUE。而只要firstWriteIndex为MAX_VALUE,那么就可以不断允许获取读锁。
(7)先获取写锁 + 再获取写锁的情形分析
如果客户端线程1先获取了写锁,然后后面客户端线程2来获取这个写锁。此时线程2会发现自己创建的节点排在节点列表中的第二,不是第一。于是获取写锁失败,进行阻塞挂起。等线程1释放了写锁后,才会唤醒线程2继续尝试获取写锁。
4.Curator的MultiLock源码
(1)Curator的MultiLock的使用
(2)Curator的MultiLock的源码
(1)Curator的MultiLock的使用
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//MultiLock
InterProcessLock lock1 = new InterProcessMutex(client, "/locks/lock_01");
InterProcessLock lock2 = new InterProcessMutex(client, "/locks/lock_02");
InterProcessLock lock3 = new InterProcessMutex(client, "/locks/lock_03");
List<InterProcessLock> locks = new ArrayList<InterProcessLock>();
locks.add(lock1);
locks.add(lock2);
locks.add(lock3);
InterProcessMultiLock multiLock = new InterProcessMultiLock(locks);
}
}
(2)Curator的MultiLock的源码
MultiLock原理:依次遍历获取每个锁,阻塞直到获取每个锁为止,然后返回true。如果过程中有报错,依次释放已经获取到的锁,然后返回false。
public class InterProcessMultiLock implements InterProcessLock {
private final List<InterProcessLock> locks;
public InterProcessMultiLock(List<InterProcessLock> locks) {
this.locks = ImmutableList.copyOf(locks);
}
//获取锁
@Override
public void acquire() throws Exception {
acquire(-1, null);
}
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
Exception exception = null;
List<InterProcessLock> acquired = Lists.newArrayList();
boolean success = true;
//依次遍历获取每个锁,阻塞直到获取每个锁为止
for (InterProcessLock lock : locks) {
try {
if (unit == null) {
lock.acquire();
acquired.add(lock);
} else {
if (lock.acquire(time, unit)) {
acquired.add(lock);
} else {
success = false;
break;
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
success = false;
exception = e;
}
}
if (!success) {
for (InterProcessLock lock : reverse(acquired)) {
try {
lock.release();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
// ignore
}
}
}
if (exception != null) {
throw exception;
}
return success;
}
@Override
public synchronized void release() throws Exception {
Exception baseException = null;
for (InterProcessLock lock : reverse(locks)) {
try {
lock.release();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
if (baseException == null) {
baseException = e;
} else {
baseException = new Exception(baseException);
}
}
}
if (baseException != null) {
throw baseException;
}
}
...
}
5.Curator的Semaphore源码
(1)基于InterProcessSemaphoreV2使用Semaphore
(2)InterProcessSemaphoreV2的初始化
(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease
(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease
Semaphore信号量,就是指定同时可以有多个线程获取到锁。
(1)基于InterProcessSemaphoreV2使用Semaphore
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//获取Semaphore
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphore", 3);
Lease lease = semaphore.acquire();//获取Semaphore的一个锁
Thread.sleep(3000);
semaphore.returnLease(lease);//向Semaphore返还一个锁
}
}
(2)InterProcessSemaphoreV2的初始化
public class InterProcessSemaphoreV2 {
private final WatcherRemoveCuratorFramework client;
private final InterProcessMutex lock;
private final String leasesPath;
private volatile int maxLeases;
...
//maxLeases表示该实例可以允许获取的lease数量
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
this(client, path, maxLeases, null);
}
//初始化InterProcessSemaphoreV2时,传入的参数path = "/semaphore",参数maxLeases = 3
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
this.client = client.newWatcherRemoveCuratorFramework();
path = PathUtils.validatePath(path);
//锁的path是ZKPaths.makePath(path, LOCK_PARENT) => '/semaphore/locks'
//初始化一个InterProcessMutex分布式锁
this.lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
//lease的path是:'/semaphore/leases'
this.leasesPath = ZKPaths.makePath(path, LEASE_PARENT);
...
}
...
}
(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease
客户端线程尝试获取Semaphore的一个Lease。
步骤一:首先会获取初始化时创建的锁InterProcessMutex
锁的路径是:/semaphore/locks。当多个客户端线程同时执行acquire()获取Lease时只会有一个线程成功,而其他线程会基于锁路径下的临时顺序节点来排队获取锁。
步骤二:获取锁成功后才会尝试获取Semaphore的Lease
Lease的路径是:/semaphore/leases。此时会先到'/semaphore/leases'目录下创建一个临时顺序节点,然后会调用InterProcessSemaphoreV2的makeLease()方法创建一个Lease。这个Lease对象就是客户端线程成功获取Semaphore的一个Lease。
创建完Lease对象后,接着会进入一个for循环,会先获取/semaphore/leases目录下的所有临时顺序节点,并添加监听。然后判断/semaphore/leases目录下节点的数量是否大于maxLeases。如果临时顺序节点的数量小于maxLeases,那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环。如果临时顺序节点的数量大于maxLeases,那么当前客户端线程就要调用wait()进行阻塞等待。
public class InterProcessSemaphoreV2 {
private final InterProcessMutex lock;
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒在InterProcessSemaphoreV2对象中执行wait()而被阻塞的线程
client.postSafeNotify(InterProcessSemaphoreV2.this);
}
};
...
public Lease acquire() throws Exception {
Collection<Lease> leases = acquire(1, 0, null);
return leases.iterator().next();
}
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception {
long startMs = System.currentTimeMillis();
boolean hasWait = (unit != null);
long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
Preconditions.checkArgument(qty > 0, "qty cannot be 0");
ImmutableList.Builder<Lease> builder = ImmutableList.builder();
boolean success = false;
try {
while (qty-- > 0) {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
boolean isDone = false;
while (!isDone) {
switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) {
case CONTINUE: {
isDone = true;
break;
}
case RETURN_NULL: {
return null;
}
case RETRY_DUE_TO_MISSING_NODE: {
if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");
}
//try again
break;
}
}
}
}
success = true;
} finally {
if (!success) {
returnAll(builder.build());
}
}
return builder.build();
}
private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception {
if (client.getState() != CuratorFrameworkState.STARTED) {
return InternalAcquireResult.RETURN_NULL;
}
if (hasWait) {
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) {
return InternalAcquireResult.RETURN_NULL;
}
} else {
//1.首先获取一个分布式锁
lock.acquire();
}
Lease lease = null;
boolean success = false;
try {
//2.尝试获取Semaphore的Lease:创建一个临时顺序节点
PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
String nodeName = ZKPaths.getNodeFromPath(path);
lease = makeLease(path);
...
try {
synchronized(this) {
for(;;) {
List<String> children;
//3.获取./lease目录下的所有临时顺序节点,并添加watcher监听
children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
...
//4.判断临时顺序节点的数量是否大于maxLeases
//maxLeases表示最多允许多少个客户端线程获取Semaphore的Lease
if (children.size() <= maxLeases) {
//如果临时顺序节点的数量小于maxLeases
//那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环
break;
}
//如果临时顺序节点的数量大于maxLeases
//那么当前客户端线程就要调用wait()进行阻塞等待
if (hasWait) {
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if (thisWaitMs <= 0) {
return InternalAcquireResult.RETURN_NULL;
}
...
wait(thisWaitMs);
} else {
...
wait();
}
}
success = true;
}
} finally {
if (!success) {
returnLease(lease);
}
client.removeWatchers();
}
} finally {
//释放掉之前获取的锁
lock.release();
}
builder.add(Preconditions.checkNotNull(lease));
return InternalAcquireResult.CONTINUE;
}
...
}
(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease
执行InterProcessSemaphoreV2的returnLease()方法时,最终会执行makeLease()生成的Lease对象的close()方法,而close()方法会删除在/semaphore/leases目录下创建的临时顺序节点。
当/semaphore/leases目录下的节点发生变化时,那些对该目录进行Watcher监听的客户端就会收到通知,于是就会执行Watcher里的process()方法,唤醒执行wait()时被阻塞的线程,从而让这些没有成功获取Semaphore的Lease的线程继续尝试获取Lease。
public class InterProcessSemaphoreV2 {
...
public void returnLease(Lease lease) {
//执行Lease的close()方法
CloseableUtils.closeQuietly(lease);
}
private Lease makeLease(final String path) {
return new Lease() {
@Override
public void close() throws IOException {
try {
client.delete().guaranteed().forPath(path);
} catch (KeeperException.NoNodeException e) {
log.warn("Lease already released", e);
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
}
@Override
public byte[] getData() throws Exception {
return client.getData().forPath(path);
}
@Override
public String getNodeName() {
return ZKPaths.getNodeFromPath(path);
}
};
}
...
}