Java 并发同步工具类详解
1. 并发编程核心概念
并发编程主要解决两个核心问题:
- 性能:通过多线程并行执行提高程序效率
- 线程安全(正确性):确保多线程环境下程序的正确执行
并发编程的三个关键方面:
- 分工:如何高效地拆解任务并分配给线程
- 同步:线程之间如何协作
- 互斥:保证同一时间只有一个线程访问共享资源
2. ReentrantLock(可重入锁)
2.1 概述
ReentrantLock 是一种可重入的独占锁,允许同一线程多次获取同一个锁而不会被阻塞。相比 synchronized,它提供了更多高级功能。
2.2 特点
- 可中断
- 可设置超时时间
- 可设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样支持可重入
2.3 常用 API
void lock() // 获取锁
void lockInterruptibly() throws InterruptedException // 可中断获取锁
boolean tryLock() // 尝试非阻塞获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException // 超时获取锁
void unlock() // 释放锁
Condition newCondition() // 获取等待通知组件
2.4 基本使用
// 加锁(阻塞方式)
lock.lock();
try {// 临界区代码
} finally {lock.unlock(); // 解锁
}// 尝试加锁(非阻塞方式)
if (lock.tryLock(1, TimeUnit.SECONDS)) {try {// 临界区代码} finally {lock.unlock();}
}
2.5 使用注意事项
- 默认情况下 ReentrantLock 为非公平锁
- 加锁次数和释放锁次数必须保持一致
- 加锁操作放在 try 代码块之前
- 释放锁一定要放在 finally 中
2.6 应用场景
2.6.1 独占锁:模拟抢票场景
public class ReentrantLockDemo {private final ReentrantLock lock = new ReentrantLock();private static int tickets = 8;public void buyTicket() {lock.lock();try {if (tickets > 0) {Thread.sleep(10);System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");} else {System.out.println("票已售完," + Thread.currentThread().getName() + "抢票失败");}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {ReentrantLockDemo ticketSystem = new ReentrantLockDemo();for (int i = 1; i <= 10; i++) {new Thread(() -> ticketSystem.buyTicket(), "线程" + i).start();}}
}
2.6.2 公平锁与非公平锁
ReentrantLock lock = new ReentrantLock(); // 非公平锁(默认)
ReentrantLock fairLock = new ReentrantLock(true); // 公平锁
2.6.3 可重入锁
不断递归,不断获取锁(同一把锁)
class Counter {private final ReentrantLock lock = new ReentrantLock();public void recursiveCall(int num) {lock.lock();try {if (num == 0) return;System.out.println("执行递归,num = " + num);recursiveCall(num - 1);} finally {lock.unlock();}}
}
2.6.4 生产者消费者模式(没用过,示例代码)
class Queue {private Object[] items;private int size = 0, takeIndex, putIndex;private ReentrantLock lock;public Condition notEmpty, notFull;public Queue(int capacity) {items = new Object[capacity];lock = new ReentrantLock();notEmpty = lock.newCondition();notFull = lock.newCondition();}public void put(Object value) throws Exception {lock.lock();try {while (size == items.length) notFull.await();items[putIndex] = value;if (++putIndex == items.length) putIndex = 0;size++;notEmpty.signal();} finally {lock.unlock();}}public Object take() throws Exception {lock.lock();try {while (size == 0) notEmpty.await();Object value = items[takeIndex];if (++takeIndex == items.length) takeIndex = 0;size--;notFull.signal();return value;} finally {lock.unlock();}}
}
2.7 应用场景总结
- 解决多线程竞争资源问题
- 实现多线程任务顺序执行
- 实现多线程等待/通知机制
- 需要更细粒度控制的同步场景
3. Semaphore(信号量)
3.1 概述
Semaphore 用于控制同时访问某个资源的线程数量,维护了一个许可证计数器。
3.2 常用 API
// 构造器
Semaphore(int permits) // 指定许可证数量
Semaphore(int permits, boolean fair) // 指定许可证数量和公平性void acquire() // 获取许可(阻塞)
boolean tryAcquire() // 尝试获取许可(非阻塞)
boolean tryAcquire(long timeout, TimeUnit unit) // 超时获取许可
void release() // 释放许可
3.3 应用场景
3.3.1 接口限流
public class SemaphoreDemo {private static Semaphore semaphore = new Semaphore(2); // 同一时刻最多2个并发private static Executor executor = Executors.newFixedThreadPool(10);public static String getProductInfo() {if (!semaphore.tryAcquire()) {return "请求被流控了";}try {Thread.sleep(2000);return "返回商品详情信息";} catch (InterruptedException e) {return "处理异常";} finally {semaphore.release();}}
}
3.3.2 数据库连接池
class ConnectPool {private Connect[] connects;private boolean[] connectFlag;private Semaphore semaphore;public ConnectPool(int size) {semaphore = new Semaphore(size, true);connects = new Connect[size];connectFlag = new boolean[size];initConnects();}public Connect openConnect() throws InterruptedException {semaphore.acquire();return getConnect();}private synchronized Connect getConnect() {for (int i = 0; i < connectFlag.length; i++) {if (!connectFlag[i]) {connectFlag[i] = true;return connects[i];}}return null;}public synchronized void releaseConnect(Connect connect) {for (int i = 0; i < connects.length; i++) {if (connect == connects[i]) {connectFlag[i] = false;semaphore.release();return;}}}
}
3.4 应用场景总结
- 限流:限制对共享资源的并发访问数量
- 资源池:维护一组有限的共享资源
4. CountDownLatch(闭锁)
4.1 概述
CountDownLatch 允许一个或多个线程等待,直到其他线程完成操作集。
4.2 常用 API
// 构造器
CountDownLatch(int count) // 指定计数初始值void await() // 等待直到计数为零
boolean await(long timeout, TimeUnit unit) // 超时等待
void countDown() // 计数减一
4.3 应用场景
4.3.1 模拟百米赛跑
public class CountDownLatchDemo {private static CountDownLatch begin = new CountDownLatch(1); // 裁判private static CountDownLatch end = new CountDownLatch(8); // 选手public static void main(String[] args) throws InterruptedException {for (int i = 1; i <= 8; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "已准备");begin.await();System.out.println(Thread.currentThread().getName() + "开始跑步");Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + "到达终点");end.countDown();} catch (InterruptedException e) {e.printStackTrace();}}, "参赛者" + i).start();}Thread.sleep(5000);System.out.println("开始比赛");begin.countDown();end.await();System.out.println("比赛结束");}
}
4.3.2 多任务完成后合并汇总
public class CountDownLatchDemo2 {public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {new Thread(() -> {try {Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));System.out.println("任务完成");countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();}countDownLatch.await();System.out.println("所有任务完成,进行结果汇总");}
}
4.4 应用场景总结
- 并行任务同步:协调多个并行任务的完成情况
- 多任务汇总:统计多个线程的完成情况
- 资源初始化:等待资源初始化完成后开始使用
5. CyclicBarrier(循环屏障)
5.1 概述
CyclicBarrier 让一组线程等待至某个状态(屏障点)之后再全部同时执行,可以重复使用。
5.2 常用 API
// 构造器
CyclicBarrier(int parties) // 指定线程数量
CyclicBarrier(int parties, Runnable barrierAction) // 指定线程数量和屏障点操作int await() // 等待直到所有线程到达屏障点
int await(long timeout, TimeUnit unit) // 超时等待
void reset() // 重置屏障
5.3 应用场景
5.3.1 模拟人满发车
public class CyclicBarrierDemo {public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(5);CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("人齐了,准备发车"));for (int i = 0; i < 10; i++) {executor.submit(() -> {try {System.out.println(Thread.currentThread().getName() + "马上就到");Thread.sleep(ThreadLocalRandom.current().nextInt(2000));System.out.println(Thread.currentThread().getName() + "到了,上车");barrier.await();} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}
}
5.3.2 多线程批量处理数据
class CyclicBarrierBatchProcessor {private List<Integer> data;private int batchSize;private CyclicBarrier barrier;private List<Thread> threads;public CyclicBarrierBatchProcessor(List<Integer> data, int batchSize) {this.data = data;this.batchSize = batchSize;this.threads = new ArrayList<>();}public void process(BatchTask task) {int threadCount = (data.size() + batchSize - 1) / batchSize;barrier = new CyclicBarrier(threadCount);for (int i = 0; i < threadCount; i++) {int start = i * batchSize;int end = Math.min(start + batchSize, data.size());List<Integer> batchData = data.subList(start, end);Thread thread = new Thread(() -> {task.process(batchData);try {barrier.await();} catch (Exception e) {e.printStackTrace();}});threads.add(thread);thread.start();}}public interface BatchTask {void process(List<Integer> batchData);}
}
5.4 CyclicBarrier vs CountDownLatch
- CountDownLatch 是一次性的,CyclicBarrier 可循环使用
- CountDownLatch 中线程职责不同,CyclicBarrier 中线程职责相同
5.5 应用场景总结
- 多线程任务:将复杂任务分配给多个线程执行
- 数据处理:协调多个线程间的数据处理
- 重复性任务:需要重复执行的任务场景
6. Exchanger(交换器)
6.1 概述
Exchanger 是一个用于两个线程间交换数据的协作工具类。
6.2 常用 API
V exchange(V x) // 交换数据,等待另一个线程
V exchange(V x, long timeout, TimeUnit unit) // 超时交换数据
6.3 应用场景
6.3.1 模拟交易场景
public class ExchangerDemo {private static Exchanger<String> exchanger = new Exchanger<>();static String goods = "电脑";static String money = "$4000";public static void main(String[] args) {// 卖家线程new Thread(() -> {try {System.out.println("卖家已准备好货:" + goods);String received = exchanger.exchange(goods);System.out.println("卖家收到钱:" + received);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 买家线程new Thread(() -> {try {System.out.println("买家已准备好钱:" + money);String received = exchanger.exchange(money);System.out.println("买家收到货:" + received);} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}
6.3.2 数据校验场景
public class ExchangerDemo2 {private static final Exchanger<String> exchanger = new Exchanger<>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(() -> {try {String dataA = "12379871924sfkhfksdhfks";exchanger.exchange(dataA);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {try {String dataB = "32423423jknjkfsbfj";String dataA = exchanger.exchange(dataB);System.out.println("数据一致性检查:" + dataA.equals(dataB));} catch (InterruptedException e) {e.printStackTrace();}});threadPool.shutdown();}
}
6.4 应用场景总结
- 数据交换:两个线程间进行数据交换
- 数据采集:在采集线程和处理线程间交换数据
- 数据校对:两个线程生产的数据进行相互校验
7. Phaser(阶段协同器)
7.1 概述
Phaser 是更灵活的阶段同步器,可以视为 CyclicBarrier 和 CountDownLatch 的进化版,支持动态调整参与线程数量。
7.2 常用 API
// 构造器
Phaser()
Phaser(int parties)
Phaser(Phaser parent)
Phaser(Phaser parent, int parties)int register() // 注册一个新参与者
int bulkRegister(int parties) // 批量注册参与者
int arrive() // 到达但不等待
int arriveAndAwaitAdvance() // 到达并等待其他参与者
int arriveAndDeregister() // 到达并注销注册
int awaitAdvance(int phase) // 等待特定阶段完成
7.3 应用场景
7.3.1 多线程批量处理数据
public class PhaserBatchProcessorDemo {private final List<String> data;private final int batchSize;private final int threadCount;private final Phaser phaser;private final List<String> processedData;public PhaserBatchProcessorDemo(List<String> data, int batchSize, int threadCount) {this.data = data;this.batchSize = batchSize;this.threadCount = threadCount;this.phaser = new Phaser(1);this.processedData = new ArrayList<>();}public void process() {for (int i = 0; i < threadCount; i++) {phaser.register();new Thread(new BatchProcessor(i)).start();}phaser.arriveAndDeregister();}private class BatchProcessor implements Runnable {private final int threadIndex;private int index = 0;public BatchProcessor(int threadIndex) {this.threadIndex = threadIndex;}@Overridepublic void run() {while (!phaser.isTerminated()) {phaser.arriveAndAwaitAdvance();List<String> batch = new ArrayList<>();synchronized (data) {while (index < data.size() && batch.size() < batchSize) {String item = data.get(index);if (!processedData.contains(item)) {batch.add(item);processedData.add(item);}index++;}}for (String item : batch) {System.out.println("线程" + threadIndex + "处理数据:" + item);}phaser.arriveAndAwaitAdvance();if (batch.isEmpty() || index >= data.size()) {phaser.arriveAndDeregister();break;}}}}
}
7.3.2 阶段性任务:模拟公司团建
public class PhaserDemo {public static void main(String[] args) {Phaser phaser = new Phaser() {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {int staffs = registeredParties - 1;switch (phase) {case 0: System.out.println("大家到公司了,出发去公园,人数:" + staffs); break;case 1: System.out.println("大家到公园了,出发去餐厅,人数:" + staffs); break;case 2: System.out.println("大家到餐厅了,开始用餐,人数:" + staffs); break;}return registeredParties == 1;}};phaser.register(); // 主线程注册// 创建参与团建的员工线程for (int i = 0; i < 3; i++) {phaser.register();new Thread(new StaffTask(phaser), "员工" + i).start();}}static class StaffTask implements Runnable {private final Phaser phaser;private static final Random random = new Random();public StaffTask(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {try {step1Task();phaser.arriveAndAwaitAdvance();step2Task();phaser.arriveAndAwaitAdvance();step3Task();phaser.arriveAndAwaitAdvance();step4Task();phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}private void step1Task() throws InterruptedException {System.out.println(Thread.currentThread().getName() + "从家出发了");Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + "到达公司");}// 其他步骤方法类似...}
}
7.4 应用场景总结
- 多线程任务分配:协调复杂任务的多个执行线程
- 多级任务流程:实现多级任务流程控制
- 模拟并行计算:协调并行计算中的多个阶段
- 动态线程管理:需要动态增加或减少参与线程的场景
8. 总结对比
工具类 | 特点 | 适用场景 |
---|---|---|
ReentrantLock | 可重入、可中断、可公平、支持条件变量 | 需要细粒度控制的同步场景 |
Semaphore | 控制并发访问数量 | 限流、资源池管理 |
CountDownLatch | 一次性、等待多个线程完成 | 初始化、任务开始前的准备工作 |
CyclicBarrier | 可重复使用、线程相互等待 | 分阶段任务、重复性任务 |
Exchanger | 两个线程间数据交换 | 数据交换、数据校对 |
Phaser | 灵活的阶段同步、动态调整 | 复杂多阶段任务、动态线程管理 |
选择合适的同步工具类可以大大提高并发程序的性能和可维护性。在实际开发中,应根据具体需求选择最合适的工具,并注意正确使用以避免死锁和性能问题。