Semaphore 信号量深度解析
目录
- 第一章:Semaphore 基础概念
- 第二章:Semaphore 内部原理
- 第三章:Semaphore 常用方法
- 第四章:线程同步和互斥实现
- 第五章:Semaphore 应用场景
- 第六章:AQS 条件队列使用
- 第七章:acquire 和 release 实现
- 第八章:经典场景分析
第一章:Semaphore 基础概念
1. 什么是 Semaphore?
答案:
Semaphore(信号量)是 Java 并发包中的一个同步工具,用于控制同时访问特定资源的线程数量。它维护了一组许可证(permits),线程在访问资源前必须先获取许可证,访问完成后释放许可证。
核心特性:
- 许可证管理:维护一定数量的许可证
- 访问控制:控制同时访问资源的线程数量
- 公平性支持:支持公平和非公平两种模式
- 可重入性:不支持重入,但可以获取多个许可证
基本概念:
// 创建一个有 3 个许可证的信号量
Semaphore semaphore = new Semaphore(3);// 获取许可证
semaphore.acquire();// 释放许可证
semaphore.release();
2. Semaphore 内部原理
答案:
Semaphore 基于 AQS(AbstractQueuedSynchronizer)实现,采用共享模式:
核心组件:
- AQS 同步器:管理线程的等待和唤醒
- 许可证计数:使用 AQS 的 state 字段存储许可证数量
- 等待队列:使用 AQS 的 CLH 队列管理等待线程
内部结构:
public class Semaphore {private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer {// 许可证数量存储在 state 中Sync(int permits) {setState(permits);}// 获取许可证protected int tryAcquireShared(int acquires) {// 实现逻辑}// 释放许可证protected boolean tryReleaseShared(int releases) {// 实现逻辑}}
}
工作原理:
- 初始化:设置许可证数量到 AQS 的 state 字段
- 获取许可证:调用
tryAcquireShared()
减少 state 值 - 释放许可证:调用
tryReleaseShared()
增加 state 值 - 等待机制:当许可证不足时,线程进入等待队列
第二章:Semaphore 内部原理
3. Semaphore 常用方法有哪些?
答案:
核心方法:
-
构造函数:
Semaphore(int permits) // 创建非公平信号量 Semaphore(int permits, boolean fair) // 创建指定公平性的信号量
-
获取许可证:
void acquire() // 获取1个许可证,阻塞 void acquire(int permits) // 获取指定数量许可证,阻塞 boolean tryAcquire() // 尝试获取1个许可证,非阻塞 boolean tryAcquire(int permits) // 尝试获取指定数量许可证,非阻塞 boolean tryAcquire(long timeout, TimeUnit unit) // 超时获取许可证
-
释放许可证:
void release() // 释放1个许可证 void release(int permits) // 释放指定数量许可证
-
查询方法:
int availablePermits() // 获取当前可用许可证数量 int drainPermits() // 获取并清空所有许可证 boolean isFair() // 判断是否为公平模式 boolean hasQueuedThreads() // 判断是否有等待线程 int getQueueLength() // 获取等待队列长度
-
其他方法:
void reducePermits(int reduction) // 减少许可证数量 Collection<Thread> getQueuedThreads() // 获取等待线程集合
4. 如何实现线程同步和互斥的?
答案:
线程同步实现:
-
基于 AQS 的共享模式:
// 获取许可证 public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }// 释放许可证 public void release() {sync.releaseShared(1); }
-
许可证管理:
// 获取许可证时减少计数 protected int tryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}} }// 释放许可证时增加计数 protected boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) {throw new Error("Maximum permit count exceeded");}if (compareAndSetState(current, next)) {return true;}} }
互斥实现:
- 当许可证数量为 1 时,Semaphore 退化为互斥锁
- 同一时间只有一个线程能获取到许可证
- 其他线程必须等待许可证被释放
同步机制:
- 使用 AQS 的 CLH 队列管理等待线程
- 支持中断和超时机制
- 支持公平和非公平两种模式
第三章:Semaphore 常用方法
5. Semaphore 适合用在什么场景?
答案:
主要应用场景:
-
资源池管理:
public class ConnectionPool {private final Semaphore semaphore;private final Queue<Connection> connections;public ConnectionPool(int maxConnections) {this.semaphore = new Semaphore(maxConnections);this.connections = new ConcurrentLinkedQueue<>();// 初始化连接池}public Connection getConnection() throws InterruptedException {semaphore.acquire(); // 获取许可证return connections.poll();}public void releaseConnection(Connection conn) {connections.offer(conn);semaphore.release(); // 释放许可证} }
-
限流控制:
public class RateLimiter {private final Semaphore semaphore;public RateLimiter(int maxRequests) {this.semaphore = new Semaphore(maxRequests);}public boolean tryAcquire() {return semaphore.tryAcquire();}public void release() {semaphore.release();} }
-
生产者消费者模式:
public class ProducerConsumer {private final Semaphore producerSemaphore;private final Semaphore consumerSemaphore;private final Queue<Integer> buffer;public ProducerConsumer(int bufferSize) {this.producerSemaphore = new Semaphore(bufferSize);this.consumerSemaphore = new Semaphore(0);this.buffer = new ConcurrentLinkedQueue<>();}public void produce(int item) throws InterruptedException {producerSemaphore.acquire(); // 等待缓冲区空间buffer.offer(item);consumerSemaphore.release(); // 通知消费者}public int consume() throws InterruptedException {consumerSemaphore.acquire(); // 等待数据int item = buffer.poll();producerSemaphore.release(); // 释放缓冲区空间return item;} }
-
数据库连接池:
public class DatabasePool {private final Semaphore semaphore;private final List<Connection> connections;public DatabasePool(int poolSize) {this.semaphore = new Semaphore(poolSize);this.connections = new ArrayList<>();// 初始化连接池}public Connection getConnection() throws InterruptedException {semaphore.acquire();synchronized (connections) {return connections.remove(0);}}public void returnConnection(Connection conn) {synchronized (connections) {connections.add(conn);}semaphore.release();} }
-
文件访问控制:
public class FileAccessController {private final Semaphore semaphore;public FileAccessController(int maxConcurrentAccess) {this.semaphore = new Semaphore(maxConcurrentAccess);}public void accessFile(String fileName) throws InterruptedException {semaphore.acquire();try {// 访问文件System.out.println("访问文件: " + fileName);} finally {semaphore.release();}} }
第四章:线程同步和互斥实现
6. 单独使用 Semaphore 是不会使用到 AQS 的条件队列?
答案:
正确,单独使用 Semaphore 不会使用到 AQS 的条件队列。
原因分析:
-
Semaphore 使用共享模式:
- Semaphore 基于 AQS 的共享模式实现
- 使用
acquireShared()
和releaseShared()
方法 - 不涉及条件队列(Condition)
-
条件队列的使用场景:
// 条件队列通常用于 ReentrantLock ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition();// 等待条件 condition.await();// 通知条件 condition.signal();
-
Semaphore 的等待机制:
// Semaphore 使用 AQS 的同步队列,不是条件队列 public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }
-
AQS 的两种队列:
- 同步队列:用于管理等待获取锁的线程
- 条件队列:用于管理等待特定条件的线程
总结:
- Semaphore 只使用 AQS 的同步队列
- 条件队列主要用于 ReentrantLock 的 Condition
- Semaphore 通过许可证数量控制线程访问,不需要条件队列
第五章:Semaphore 应用场景
7. Semaphore 中申请令牌(acquire)、释放令牌(release)的实现?
答案:
acquire 实现:
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}
核心实现逻辑:
// AQS 中的 acquireSharedInterruptibly 方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}// Semaphore 中的 tryAcquireShared 实现
protected int tryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;// 如果剩余许可证为负数,或者 CAS 操作成功,则返回剩余数量if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}}
}
release 实现:
public void release() {sync.releaseShared(1);
}public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);
}
核心实现逻辑:
// AQS 中的 releaseShared 方法
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}// Semaphore 中的 tryReleaseShared 实现
protected boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;// 检查是否溢出if (next < current) {throw new Error("Maximum permit count exceeded");}// CAS 操作更新状态if (compareAndSetState(current, next)) {return true;}}
}
实现特点:
- 无锁设计:使用 CAS 操作避免锁竞争
- 自旋重试:CAS 失败时自旋重试
- 溢出检查:防止许可证数量溢出
- 中断支持:支持线程中断
第六章:AQS 条件队列使用
8. Semaphore 初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?
答案:
会发生以下情况:
-
前10个线程:
- 成功获取许可证
- 继续执行后续代码
- 许可证数量变为 0
-
第11个线程:
- 调用
acquire()
方法 - 发现许可证不足(available = 0, acquires = 1, remaining = -1)
- 进入等待状态,被加入 AQS 的等待队列
- 线程被阻塞,等待许可证被释放
- 调用
代码演示:
public class SemaphoreTest1 {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(10);CountDownLatch latch = new CountDownLatch(11);for (int i = 0; i < 11; i++) {final int threadId = i;new Thread(() -> {try {System.out.println("线程 " + threadId + " 尝试获取许可证");semaphore.acquire();System.out.println("线程 " + threadId + " 获取许可证成功");// 模拟工作Thread.sleep(2000);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {semaphore.release();System.out.println("线程 " + threadId + " 释放许可证");latch.countDown();}}).start();}latch.await();}
}
执行结果:
线程 0 尝试获取许可证
线程 0 获取许可证成功
线程 1 尝试获取许可证
线程 1 获取许可证成功
...
线程 9 尝试获取许可证
线程 9 获取许可证成功
线程 10 尝试获取许可证
(线程 10 被阻塞,等待许可证)
9. Semaphore 初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?
答案:
会发生以下情况:
-
前10次调用:
- 成功获取许可证
- 许可证数量逐渐减少:10 → 9 → 8 → … → 1 → 0
-
第11次调用:
- 调用
acquire()
方法 - 发现许可证不足(available = 0, acquires = 1, remaining = -1)
- 线程被阻塞,等待许可证被释放
- 如果其他线程不释放许可证,该线程将永远等待
- 调用
代码演示:
public class SemaphoreTest2 {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(10);new Thread(() -> {try {for (int i = 0; i < 11; i++) {System.out.println("尝试获取第 " + (i + 1) + " 个许可证");semaphore.acquire();System.out.println("成功获取第 " + (i + 1) + " 个许可证,剩余: " + semaphore.availablePermits());}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();Thread.sleep(1000);System.out.println("当前可用许可证: " + semaphore.availablePermits());}
}
执行结果:
尝试获取第 1 个许可证
成功获取第 1 个许可证,剩余: 9
尝试获取第 2 个许可证
成功获取第 2 个许可证,剩余: 8
...
尝试获取第 10 个许可证
成功获取第 10 个许可证,剩余: 0
尝试获取第 11 个许可证
(线程被阻塞,等待许可证)
当前可用许可证: 0
10. Semaphore 初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?
答案:
能够获取到足够的令牌并继续运行。
分析过程:
-
初始状态:许可证数量 = 1
-
第一个线程操作:
- 调用
acquire()
:许可证数量 = 1 - 1 = 0 - 调用
release()
:许可证数量 = 0 + 1 = 1 - 调用
release()
:许可证数量 = 1 + 1 = 2
- 调用
-
第二个线程操作:
- 调用
acquire(2)
:许可证数量 = 2 - 2 = 0 - 成功获取2个许可证,继续运行
- 调用
代码演示:
public class SemaphoreTest3 {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(1);CountDownLatch latch = new CountDownLatch(2);// 第一个线程new Thread(() -> {try {System.out.println("线程1: 获取许可证,当前可用: " + semaphore.availablePermits());semaphore.acquire();System.out.println("线程1: 获取成功,剩余: " + semaphore.availablePermits());Thread.sleep(1000);System.out.println("线程1: 释放许可证,当前可用: " + semaphore.availablePermits());semaphore.release();System.out.println("线程1: 释放后,剩余: " + semaphore.availablePermits());System.out.println("线程1: 再次释放许可证,当前可用: " + semaphore.availablePermits());semaphore.release();System.out.println("线程1: 再次释放后,剩余: " + semaphore.availablePermits());} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {latch.countDown();}}).start();Thread.sleep(500);// 第二个线程new Thread(() -> {try {System.out.println("线程2: 尝试获取2个许可证,当前可用: " + semaphore.availablePermits());semaphore.acquire(2);System.out.println("线程2: 成功获取2个许可证,剩余: " + semaphore.availablePermits());} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {latch.countDown();}}).start();latch.await();}
}
执行结果:
线程1: 获取许可证,当前可用: 1
线程1: 获取成功,剩余: 0
线程1: 释放许可证,当前可用: 0
线程1: 释放后,剩余: 1
线程1: 再次释放许可证,当前可用: 1
线程1: 再次释放后,剩余: 2
线程2: 尝试获取2个许可证,当前可用: 2
线程2: 成功获取2个许可证,剩余: 0
11. Semaphore 初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?
答案:
会获取到。
分析过程:
-
初始状态:许可证数量 = 2
-
第一个线程操作:
- 调用
release()
:许可证数量 = 2 + 1 = 3
- 调用
-
第二个线程操作:
- 调用
acquire(3)
:许可证数量 = 3 - 3 = 0 - 成功获取3个许可证
- 调用
代码演示:
public class SemaphoreTest4 {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(2);CountDownLatch latch = new CountDownLatch(2);// 第一个线程new Thread(() -> {try {System.out.println("线程1: 释放许可证,当前可用: " + semaphore.availablePermits());semaphore.release();System.out.println("线程1: 释放后,剩余: " + semaphore.availablePermits());} catch (Exception e) {e.printStackTrace();} finally {latch.countDown();}}).start();Thread.sleep(500);// 第二个线程new Thread(() -> {try {System.out.println("线程2: 尝试获取3个许可证,当前可用: " + semaphore.availablePermits());semaphore.acquire(3);System.out.println("线程2: 成功获取3个许可证,剩余: " + semaphore.availablePermits());} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {latch.countDown();}}).start();latch.await();}
}
执行结果:
线程1: 释放许可证,当前可用: 2
线程1: 释放后,剩余: 3
线程2: 尝试获取3个许可证,当前可用: 3
线程2: 成功获取3个许可证,剩余: 0
第七章:acquire 和 release 实现
12. Semaphore 的公平模式和非公平模式有什么区别?
答案:
公平模式:
// 创建公平信号量
Semaphore fairSemaphore = new Semaphore(2, true);
非公平模式:
// 创建非公平信号量
Semaphore nonFairSemaphore = new Semaphore(2, false);
区别:
-
获取许可证的顺序:
- 公平模式:按照线程等待的先后顺序获取许可证
- 非公平模式:新来的线程可能插队获取许可证
-
性能差异:
- 公平模式:性能较低,需要维护等待顺序
- 非公平模式:性能较高,允许插队
-
实现差异:
// 公平模式 static final class FairSync extends Sync {protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}} }// 非公平模式 static final class NonfairSync extends Sync {protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);} }
第八章:经典场景分析
13. Semaphore 的最佳实践
答案:
最佳实践:
-
正确使用 try-finally:
public void useSemaphore() {try {semaphore.acquire();// 业务逻辑} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {semaphore.release();} }
-
避免死锁:
// 避免多个 Semaphore 的嵌套使用 // 避免在持有许可证时等待其他资源
-
合理设置许可证数量:
// 根据系统资源合理设置许可证数量 Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors());
-
监控和调试:
public void monitorSemaphore() {System.out.println("可用许可证: " + semaphore.availablePermits());System.out.println("等待队列长度: " + semaphore.getQueueLength());System.out.println("是否有等待线程: " + semaphore.hasQueuedThreads()); }
14. Semaphore 与其他同步工具的比较
答案:
特性 | Semaphore | CountDownLatch | CyclicBarrier | ReentrantLock |
---|---|---|---|---|
用途 | 控制并发数量 | 等待多个任务完成 | 多线程同步点 | 互斥访问 |
可重用性 | 是 | 否 | 是 | 是 |
许可证数量 | 可动态调整 | 固定 | 固定 | 1个 |
等待机制 | 等待许可证 | 等待计数归零 | 等待所有线程到达 | 等待锁释放 |
适用场景 | 资源池、限流 | 启动等待 | 分阶段处理 | 临界区保护 |
总结
Semaphore 是 Java 并发编程中的重要工具,它通过许可证机制控制同时访问特定资源的线程数量。基于 AQS 的共享模式实现,支持公平和非公平两种模式,广泛应用于资源池管理、限流控制、生产者消费者模式等场景。
关键要点:
- Semaphore 基于 AQS 的共享模式实现
- 使用许可证数量控制并发访问
- 支持公平和非公平两种模式
- 不涉及 AQS 的条件队列
- 可以动态调整许可证数量
- 适用于资源池管理和限流控制
- 需要正确使用 try-finally 确保资源释放