当前位置: 首页 > news >正文

Semaphore的详细源码剖析

Semaphore的详细源码剖析


欢迎来到我的博客:TWind的博客

我的CSDN::Thanwind-CSDN博客

我的掘金:Thanwinde 的个人主页

0.前言

SemaphoreJUC中同步器最应该是最简单的一个了,它提供了“资源”的概念,用来控制共享锁的无限共享,抑制写锁的饥饿情况

其中大量的方法在ReentrantReadWriteLock中重复,本身也采用了AQS提供的同步队列,大量减少了代码的重复度

没有StampLock一般的复杂至极的队列代码的都是好同步器

强烈建议先阅读完《ReentrantLock的详细源码剖析》,《ReentrantReadWriteLock的源码详细剖析》再行阅读!

因为这里面的代码大多数是AQS的方法的使用!


1.基础部分

​ Semaphore像ReentrantReadWriteLock一样分为了公平模式和非公平模式,其由sync的类型决定

我们先来看基础的Sycn提供了什么方法:

abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}
}

其继承了AQS,意味着其中的共享锁的获取,释放,CLH队列的维护都不用操心,本文也不会涉及,想要参阅的读者可以看我的《ReentrantReadWriteLock的源码详细剖析》,这里面对共享锁的获取以及维护的源码有详细的分析

在解析方法之前,我们先来说一说资源的概念:


资源

Semaphore虽然采用了AQS的同步队列,但是其用了且仅用了共享模式!

了解过ReentrantReadWriteLock源码的人都知道,ReentrantReadWriteLock里面的读锁就是共享锁,理论上可以无限并发

Semaphore单独取出了其中的“读锁”,并进行了限制:资源

资源有点类似于停车场的车位,车进去会消耗一个车位,出来后会加回一个车位

线程也是如此,拿到锁后就少一个资源,释放锁就多一个资源,没资源就拿不到锁

这就限制了读锁的无限并发

这种设计非常适合于线程池,连接池的实现,实际上,这两者都是直接基于Semaphore,或是采用其思想建立的


我们来一一看看其方法:

构造方法

public Semaphore(int permits) {sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {setState(permits);
}
NonfairSync(int permits) {super(permits);
}
FairSync(int permits) {super(permits);
}

可以看到,在新建Semaphore对象,指定资源数,公平模式就能完成对资源,模式(默认非公平)的指定


nonfairTryAcquireShared()

非公平的获取资源

final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();	//获取资源数int remaining = available - acquires;//计算还剩的资源数if (remaining < 0 ||	//如果小于0,就不会执行CAS,返回一个负数compareAndSetState(available, remaining))	//大于0,CAS自旋修改return remaining;	//返回}
}

返回一个负数时,Semaphore约定:返回0,负数时视为失败

这样就完成了非公平的获取资源


tryReleaseShared()

protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();	//获取当前资源int next = current + releases;	//加上释放资源if (next < current) // 反而小于原来的,说明溢出了int成了负数throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))	//正常自旋CAS返回return true;}
}

朴实无华的释放资源,CAS自旋尝试,判断溢出抛错


reducePermits()

final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}
}

和上面一模一样的代码,区别是减

是用来手动控制资源的


drainPermits()

final int drainPermits() {for (;;) {int current = getState();//获取资源if (current == 0 || compareAndSetState(current, 0))//看看如果资源是0就返回,不是的话CAS自旋为0return current;}
}

这个用法是把资源手动设为0,一般是用来重置


NonfairSync

static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}
}

这是非公平模式,只修改了一个地方:把获取共享锁的方式改成了nonfairTryAcquireShared,非公平模式


FairSync

static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())	//如果队列中存在节点就返回-1,失败return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))	//和之前一样的方式,CAS自旋扣资源(前提有资源)return remaining;}}
}

这是公平模式,也只更改了获取锁的方式:tryAcquireShared

至于为什么只try一次,因为JUC中try系的代码一般不会自旋,就是自旋也是CAS自旋,都是嵌套在其他方法里面使用

tryAcquireShared就会嵌套在 acquireShared里面使用


2.源码方法解析

其实Semaphore并没有许多的方法,大部分的方法都在AQS里面,我只会对AQS其中比较特别的进行讲解,其他的都能在我之前的文章中找到


acquire()

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

简单的获取共享锁,跟入:

这个方法已经在AQS里面了

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

会抛出中断异常,说明是会响应中断的类型

这里先会tryAcquireShared快速尝试一次,再调用doAcquireSharedInterruptibly

tryAcquireShared是由你的公平还是非公平决定的,也就是通过这个决定了你会不会“插队”

跟入:

    private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);	//新建一个共享节点接在队列尾boolean failed = true;try {for (;;) {final Node p = node.predecessor();	//获取前驱节点if (p == head) {int r = tryAcquireShared(arg);	//如果前驱节点是头节点的话,就可以尝试去拿锁了if (r >= 0) {					//r>=0说明拿到了,返回值r是剩余资源数setHeadAndPropagate(node, r);	//把自己设成头节点并传播p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&	//如果没拿到就处理后事并parkparkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

这是经典的AQS获取锁的结构,关于这种结构详细了解请去我之前的文章中了解,这里不过多赘述~~(有一说一你不看的话后面都看不懂)~~

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}

这里我们能看到,是会先将这个节点设置头节点,下面会进行传播,前提是条件满足:

  • propagate > 0 ,代表还有资源
  • h == null 老头节点为空
  • h.waitStatus < 0 老头节点状态是-1或-3,应该进行尝试传播
  • (h = head) == null 新头为空
  • h.waitStatus < 0 新头状态为-1,-3,应该尝试传播

如果满足任意一个,就会看看有没有后继,后继是不是共享节点来执行doReleaseShared来处理传播与共享

[!CAUTION]

在我的《ReentrantReadWriteLock的源码详细剖析》里面有对这一系列的代码的详细解析,强烈建议先在其中学习后再继续阅读,这里的代码对于整个共享锁来说非常重要!


acquireUninterruptibly()

字面意思,它不会抛出异常,我们直接进入其AQS的方法

private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

可以看到,它和上面代码的唯一区别就是不会抛出中断异常,只是会以一个interrupted为true来表示

但是对于共享模式的获取锁,并不会在外部检测中断并手动中断来防止吞掉中断标记,也就是说,完全忽略了中断带来的影响


tryAcquire()

public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}

极其的简单,调一下非公平的尝试获取一次锁就完了


tryAcquire(long timeout, TimeUnit unit)

public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

也是非常的简单,跟入:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;	//截止时间final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();	//超出时间了直接返回falseif (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

就只是加入了一个超时时间限制而已

我之前的文章对这些都有详细的解释


release()

public void release() {sync.releaseShared(1);
}

跟入:

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}

先直接释放(自定义的方法),再调用doReleaseShared来唤醒

private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}

[!CAUTION]

这里的代码非常重要!建议到我的《ReentrantReadWriteLock的源码详细剖析》里面详细看一下!这涉及到共享锁的传播原理


acquire(int permits)

public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}

这里是尝试获取多个资源,跟入:

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

其中doAcquireSharedInterruptibly(arg);和上面的一模一样,区别就是arg从0变成了1而已


tryAcquire(int permits)

public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;
}

和上方的tryAcquire基本一模一样,只是获取的资源不是0而是permits了,不再赘述


tryAcquire(int permits, long timeout, TimeUnit unit)

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

指定资源和超时时间的tryAcquire,本质是tryAcquire(long timeout, TimeUnit unit)把arg改成了permits而不是1,不再赘述


release(int permits)

带指定数量的释放资源,不再赘述

public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);
}

3. 总结

有一说一,Semaphore真的是最简单的同步器,那么多方法基本没什么可以讲的

但是简单的前提是建立在你懂得AQS的原理,共享锁的传播方式的基础之上的

所以我还是建议能看到这里的读者,一定要去把AQS,共享锁的机制搞明白(欢迎看我的文章!)

JUC的锁应该已经全部分析完了,接下来应该会介绍一下线程安全类之类的东西了,,,

相关文章:

  • 组合模式深度解析:构建灵活树形结构的终极指南
  • 变更需求代价:影响分析
  • OpenCv实战笔记(2)基于opencv和qt对图像进行灰度化 → 降噪 → 边缘检测预处理及显示
  • AUTOSAR_BSW_从入门到精通学习笔记系列_EcuM
  • 仓颉编程语言:面向未来的全场景智能开发新范式
  • LeetCode 102题解 | 二叉树的层序遍历
  • BUUCTF——Fake XML cookbook
  • 13:图像处理—畸变矫正详解
  • 57认知干货:AI机器人产业
  • AIDC智算中心建设:计算力核心技术解析
  • 【深入浅出MySQL】之数据类型介绍
  • ES6入门---第三单元 模块一:类、继承
  • 分享一个Android中文汉字手写输入法并带有形近字联想功能
  • DeepSeek Copilot idea插件推荐
  • Allegro23.1新功能之如何设置高压爬电间距规则操作指导
  • Mamba+Attention+CNN 预测模型:破局长程依赖的计算机视觉新范式
  • ActiveMQ 与其他 MQ 的对比分析:Kafka/RocketMQ 的选型参考(二)
  • 【JLINK调试器】适配【大华HC32F4A0芯片】的完整解决方案
  • 数据结构--树状数组
  • opencv的contours
  • 贵州黔西市载人游船倾覆事故已致3人遇难,14人正在搜救中
  • 老人误操作免密支付买几百只鸡崽,经济日报:支付要便捷也要安全
  • 印尼巴厘岛多地停电,疑似海底电缆发生故障
  • 德雷克海峡发生7.4级地震,震源深度10千米
  • 2025年五一档电影新片票房破3亿
  • 增诉滥用职权罪,尹锡悦遭韩国检方追加起诉