Semaphore的详细源码剖析
Semaphore的详细源码剖析
欢迎来到我的博客:TWind的博客
我的CSDN::Thanwind-CSDN博客
我的掘金:Thanwinde 的个人主页
0.前言
SemaphoreJUC中同步器最应该是最简单的一个了,它提供了“资源”的概念,用来控制共享锁的无限共享,抑制写锁的饥饿情况
其中大量的方法在ReentrantReadWriteLock中重复,本身也采用了AQS提供的同步队列,大量减少了代码的重复度
没有StampLock一般的复杂至极的队列代码的都是好同步器
强烈建议先阅读完《ReentrantLock的详细源码剖析》,《ReentrantReadWriteLock的源码详细剖析》再行阅读!
因为这里面的代码大多数是AQS的方法的使用!
1.基础部分
Semaphore像ReentrantReadWriteLock一样分为了公平模式和非公平模式,其由sync的类型决定
我们先来看基础的Sycn提供了什么方法:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}
}
其继承了AQS,意味着其中的共享锁的获取,释放,CLH队列的维护都不用操心,本文也不会涉及,想要参阅的读者可以看我的《ReentrantReadWriteLock的源码详细剖析》,这里面对共享锁的获取以及维护的源码有详细的分析
在解析方法之前,我们先来说一说资源的概念:
资源
Semaphore虽然采用了AQS的同步队列,但是其用了且仅用了共享模式!
了解过ReentrantReadWriteLock源码的人都知道,ReentrantReadWriteLock里面的读锁就是共享锁,理论上可以无限并发
Semaphore单独取出了其中的“读锁”,并进行了限制:资源
资源有点类似于停车场的车位,车进去会消耗一个车位,出来后会加回一个车位
线程也是如此,拿到锁后就少一个资源,释放锁就多一个资源,没资源就拿不到锁
这就限制了读锁的无限并发
这种设计非常适合于线程池,连接池的实现,实际上,这两者都是直接基于Semaphore,或是采用其思想建立的
我们来一一看看其方法:
构造方法
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {setState(permits);
}
NonfairSync(int permits) {super(permits);
}
FairSync(int permits) {super(permits);
}
可以看到,在新建Semaphore对象,指定资源数,公平模式就能完成对资源,模式(默认非公平)的指定
nonfairTryAcquireShared()
非公平的获取资源
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState(); //获取资源数int remaining = available - acquires;//计算还剩的资源数if (remaining < 0 || //如果小于0,就不会执行CAS,返回一个负数compareAndSetState(available, remaining)) //大于0,CAS自旋修改return remaining; //返回}
}
返回一个负数时,Semaphore约定:返回0,负数时视为失败
这样就完成了非公平的获取资源
tryReleaseShared()
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState(); //获取当前资源int next = current + releases; //加上释放资源if (next < current) // 反而小于原来的,说明溢出了int成了负数throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) //正常自旋CAS返回return true;}
}
朴实无华的释放资源,CAS自旋尝试,判断溢出抛错
reducePermits()
final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}
}
和上面一模一样的代码,区别是减
是用来手动控制资源的
drainPermits()
final int drainPermits() {for (;;) {int current = getState();//获取资源if (current == 0 || compareAndSetState(current, 0))//看看如果资源是0就返回,不是的话CAS自旋为0return current;}
}
这个用法是把资源手动设为0,一般是用来重置
NonfairSync
static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}
}
这是非公平模式,只修改了一个地方:把获取共享锁的方式改成了nonfairTryAcquireShared,非公平模式
FairSync
static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors()) //如果队列中存在节点就返回-1,失败return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining)) //和之前一样的方式,CAS自旋扣资源(前提有资源)return remaining;}}
}
这是公平模式,也只更改了获取锁的方式:tryAcquireShared
至于为什么只try一次,因为JUC中try系的代码一般不会自旋,就是自旋也是CAS自旋,都是嵌套在其他方法里面使用
tryAcquireShared就会嵌套在 acquireShared里面使用
2.源码方法解析
其实Semaphore并没有许多的方法,大部分的方法都在AQS里面,我只会对AQS其中比较特别的进行讲解,其他的都能在我之前的文章中找到
acquire()
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
简单的获取共享锁,跟入:
这个方法已经在AQS里面了
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
会抛出中断异常,说明是会响应中断的类型
这里先会tryAcquireShared快速尝试一次,再调用doAcquireSharedInterruptibly
tryAcquireShared是由你的公平还是非公平决定的,也就是通过这个决定了你会不会“插队”
跟入:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); //新建一个共享节点接在队列尾boolean failed = true;try {for (;;) {final Node p = node.predecessor(); //获取前驱节点if (p == head) {int r = tryAcquireShared(arg); //如果前驱节点是头节点的话,就可以尝试去拿锁了if (r >= 0) { //r>=0说明拿到了,返回值r是剩余资源数setHeadAndPropagate(node, r); //把自己设成头节点并传播p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) && //如果没拿到就处理后事并parkparkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
这是经典的AQS获取锁的结构,关于这种结构详细了解请去我之前的文章中了解,这里不过多赘述~~(有一说一你不看的话后面都看不懂)~~
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}
这里我们能看到,是会先将这个节点设置头节点,下面会进行传播,前提是条件满足:
- propagate > 0 ,代表还有资源
- h == null 老头节点为空
- h.waitStatus < 0 老头节点状态是-1或-3,应该进行尝试传播
- (h = head) == null 新头为空
- h.waitStatus < 0 新头状态为-1,-3,应该尝试传播
如果满足任意一个,就会看看有没有后继,后继是不是共享节点来执行doReleaseShared来处理传播与共享
[!CAUTION]
在我的《ReentrantReadWriteLock的源码详细剖析》里面有对这一系列的代码的详细解析,强烈建议先在其中学习后再继续阅读,这里的代码对于整个共享锁来说非常重要!
acquireUninterruptibly()
字面意思,它不会抛出异常,我们直接进入其AQS的方法
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
可以看到,它和上面代码的唯一区别就是不会抛出中断异常,只是会以一个interrupted为true来表示
但是对于共享模式的获取锁,并不会在外部检测中断并手动中断来防止吞掉中断标记,也就是说,完全忽略了中断带来的影响
tryAcquire()
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}
极其的简单,调一下非公平的尝试获取一次锁就完了
tryAcquire(long timeout, TimeUnit unit)
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
也是非常的简单,跟入:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout; //截止时间final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime(); //超出时间了直接返回falseif (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
就只是加入了一个超时时间限制而已
我之前的文章对这些都有详细的解释
release()
public void release() {sync.releaseShared(1);
}
跟入:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
先直接释放(自定义的方法),再调用doReleaseShared来唤醒
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}
}
[!CAUTION]
这里的代码非常重要!建议到我的《ReentrantReadWriteLock的源码详细剖析》里面详细看一下!这涉及到共享锁的传播原理
acquire(int permits)
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}
这里是尝试获取多个资源,跟入:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
其中doAcquireSharedInterruptibly(arg);和上面的一模一样,区别就是arg
从0变成了1而已
tryAcquire(int permits)
public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;
}
和上方的tryAcquire基本一模一样,只是获取的资源不是0而是permits了,不再赘述
tryAcquire(int permits, long timeout, TimeUnit unit)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
指定资源和超时时间的tryAcquire,本质是tryAcquire(long timeout, TimeUnit unit)把arg改成了permits而不是1,不再赘述
release(int permits)
带指定数量的释放资源,不再赘述
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);
}
3. 总结
有一说一,Semaphore真的是最简单的同步器,那么多方法基本没什么可以讲的
但是简单的前提是建立在你懂得AQS的原理,共享锁的传播方式的基础之上的
所以我还是建议能看到这里的读者,一定要去把AQS,共享锁的机制搞明白(欢迎看我的文章!)
JUC的锁应该已经全部分析完了,接下来应该会介绍一下线程安全类之类的东西了,,,