当前位置: 首页 > news >正文

Java 并发同步工具类详解

1. 并发编程核心概念

并发编程主要解决两个核心问题:

  • 性能:通过多线程并行执行提高程序效率
  • 线程安全(正确性):确保多线程环境下程序的正确执行

并发编程的三个关键方面:

  • 分工:如何高效地拆解任务并分配给线程
  • 同步:线程之间如何协作
  • 互斥:保证同一时间只有一个线程访问共享资源

2. ReentrantLock(可重入锁)

2.1 概述

ReentrantLock 是一种可重入的独占锁,允许同一线程多次获取同一个锁而不会被阻塞。相比 synchronized,它提供了更多高级功能。

2.2 特点

  • 可中断
  • 可设置超时时间
  • 可设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样支持可重入

2.3 常用 API

void lock() // 获取锁
void lockInterruptibly() throws InterruptedException // 可中断获取锁
boolean tryLock() // 尝试非阻塞获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException // 超时获取锁
void unlock() // 释放锁
Condition newCondition() // 获取等待通知组件

2.4 基本使用

// 加锁(阻塞方式)
lock.lock();
try {// 临界区代码
} finally {lock.unlock(); // 解锁
}// 尝试加锁(非阻塞方式)
if (lock.tryLock(1, TimeUnit.SECONDS)) {try {// 临界区代码} finally {lock.unlock();}
}

2.5 使用注意事项

  1. 默认情况下 ReentrantLock 为非公平锁
  2. 加锁次数和释放锁次数必须保持一致
  3. 加锁操作放在 try 代码块之前
  4. 释放锁一定要放在 finally 中

2.6 应用场景

2.6.1 独占锁:模拟抢票场景
public class ReentrantLockDemo {private final ReentrantLock lock = new ReentrantLock();private static int tickets = 8;public void buyTicket() {lock.lock();try {if (tickets > 0) {Thread.sleep(10);System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");} else {System.out.println("票已售完," + Thread.currentThread().getName() + "抢票失败");}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {ReentrantLockDemo ticketSystem = new ReentrantLockDemo();for (int i = 1; i <= 10; i++) {new Thread(() -> ticketSystem.buyTicket(), "线程" + i).start();}}
}
2.6.2 公平锁与非公平锁
ReentrantLock lock = new ReentrantLock(); // 非公平锁(默认)
ReentrantLock fairLock = new ReentrantLock(true); // 公平锁
2.6.3 可重入锁

不断递归,不断获取锁(同一把锁)

class Counter {private final ReentrantLock lock = new ReentrantLock();public void recursiveCall(int num) {lock.lock();try {if (num == 0) return;System.out.println("执行递归,num = " + num);recursiveCall(num - 1);} finally {lock.unlock();}}
}
2.6.4 生产者消费者模式(没用过,示例代码)
class Queue {private Object[] items;private int size = 0, takeIndex, putIndex;private ReentrantLock lock;public Condition notEmpty, notFull;public Queue(int capacity) {items = new Object[capacity];lock = new ReentrantLock();notEmpty = lock.newCondition();notFull = lock.newCondition();}public void put(Object value) throws Exception {lock.lock();try {while (size == items.length) notFull.await();items[putIndex] = value;if (++putIndex == items.length) putIndex = 0;size++;notEmpty.signal();} finally {lock.unlock();}}public Object take() throws Exception {lock.lock();try {while (size == 0) notEmpty.await();Object value = items[takeIndex];if (++takeIndex == items.length) takeIndex = 0;size--;notFull.signal();return value;} finally {lock.unlock();}}
}

2.7 应用场景总结

  1. 解决多线程竞争资源问题
  2. 实现多线程任务顺序执行
  3. 实现多线程等待/通知机制
  4. 需要更细粒度控制的同步场景

3. Semaphore(信号量)

3.1 概述

Semaphore 用于控制同时访问某个资源的线程数量,维护了一个许可证计数器。

3.2 常用 API

// 构造器
Semaphore(int permits) // 指定许可证数量
Semaphore(int permits, boolean fair) // 指定许可证数量和公平性void acquire() // 获取许可(阻塞)
boolean tryAcquire() // 尝试获取许可(非阻塞)
boolean tryAcquire(long timeout, TimeUnit unit) // 超时获取许可
void release() // 释放许可

3.3 应用场景

3.3.1 接口限流
public class SemaphoreDemo {private static Semaphore semaphore = new Semaphore(2); // 同一时刻最多2个并发private static Executor executor = Executors.newFixedThreadPool(10);public static String getProductInfo() {if (!semaphore.tryAcquire()) {return "请求被流控了";}try {Thread.sleep(2000);return "返回商品详情信息";} catch (InterruptedException e) {return "处理异常";} finally {semaphore.release();}}
}
3.3.2 数据库连接池
class ConnectPool {private Connect[] connects;private boolean[] connectFlag;private Semaphore semaphore;public ConnectPool(int size) {semaphore = new Semaphore(size, true);connects = new Connect[size];connectFlag = new boolean[size];initConnects();}public Connect openConnect() throws InterruptedException {semaphore.acquire();return getConnect();}private synchronized Connect getConnect() {for (int i = 0; i < connectFlag.length; i++) {if (!connectFlag[i]) {connectFlag[i] = true;return connects[i];}}return null;}public synchronized void releaseConnect(Connect connect) {for (int i = 0; i < connects.length; i++) {if (connect == connects[i]) {connectFlag[i] = false;semaphore.release();return;}}}
}

3.4 应用场景总结

  1. 限流:限制对共享资源的并发访问数量
  2. 资源池:维护一组有限的共享资源

4. CountDownLatch(闭锁)

4.1 概述

CountDownLatch 允许一个或多个线程等待,直到其他线程完成操作集。

4.2 常用 API

// 构造器
CountDownLatch(int count) // 指定计数初始值void await() // 等待直到计数为零
boolean await(long timeout, TimeUnit unit) // 超时等待
void countDown() // 计数减一

4.3 应用场景

4.3.1 模拟百米赛跑
public class CountDownLatchDemo {private static CountDownLatch begin = new CountDownLatch(1); // 裁判private static CountDownLatch end = new CountDownLatch(8);   // 选手public static void main(String[] args) throws InterruptedException {for (int i = 1; i <= 8; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "已准备");begin.await();System.out.println(Thread.currentThread().getName() + "开始跑步");Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + "到达终点");end.countDown();} catch (InterruptedException e) {e.printStackTrace();}}, "参赛者" + i).start();}Thread.sleep(5000);System.out.println("开始比赛");begin.countDown();end.await();System.out.println("比赛结束");}
}
4.3.2 多任务完成后合并汇总
public class CountDownLatchDemo2 {public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {new Thread(() -> {try {Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));System.out.println("任务完成");countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();}countDownLatch.await();System.out.println("所有任务完成,进行结果汇总");}
}

4.4 应用场景总结

  1. 并行任务同步:协调多个并行任务的完成情况
  2. 多任务汇总:统计多个线程的完成情况
  3. 资源初始化:等待资源初始化完成后开始使用

5. CyclicBarrier(循环屏障)

5.1 概述

CyclicBarrier 让一组线程等待至某个状态(屏障点)之后再全部同时执行,可以重复使用。

5.2 常用 API

// 构造器
CyclicBarrier(int parties) // 指定线程数量
CyclicBarrier(int parties, Runnable barrierAction) // 指定线程数量和屏障点操作int await() // 等待直到所有线程到达屏障点
int await(long timeout, TimeUnit unit) // 超时等待
void reset() // 重置屏障

5.3 应用场景

5.3.1 模拟人满发车
public class CyclicBarrierDemo {public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(5);CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("人齐了,准备发车"));for (int i = 0; i < 10; i++) {executor.submit(() -> {try {System.out.println(Thread.currentThread().getName() + "马上就到");Thread.sleep(ThreadLocalRandom.current().nextInt(2000));System.out.println(Thread.currentThread().getName() + "到了,上车");barrier.await();} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}
}
5.3.2 多线程批量处理数据
class CyclicBarrierBatchProcessor {private List<Integer> data;private int batchSize;private CyclicBarrier barrier;private List<Thread> threads;public CyclicBarrierBatchProcessor(List<Integer> data, int batchSize) {this.data = data;this.batchSize = batchSize;this.threads = new ArrayList<>();}public void process(BatchTask task) {int threadCount = (data.size() + batchSize - 1) / batchSize;barrier = new CyclicBarrier(threadCount);for (int i = 0; i < threadCount; i++) {int start = i * batchSize;int end = Math.min(start + batchSize, data.size());List<Integer> batchData = data.subList(start, end);Thread thread = new Thread(() -> {task.process(batchData);try {barrier.await();} catch (Exception e) {e.printStackTrace();}});threads.add(thread);thread.start();}}public interface BatchTask {void process(List<Integer> batchData);}
}

5.4 CyclicBarrier vs CountDownLatch

  • CountDownLatch 是一次性的,CyclicBarrier 可循环使用
  • CountDownLatch 中线程职责不同,CyclicBarrier 中线程职责相同

5.5 应用场景总结

  1. 多线程任务:将复杂任务分配给多个线程执行
  2. 数据处理:协调多个线程间的数据处理
  3. 重复性任务:需要重复执行的任务场景

6. Exchanger(交换器)

6.1 概述

Exchanger 是一个用于两个线程间交换数据的协作工具类。

6.2 常用 API

V exchange(V x) // 交换数据,等待另一个线程
V exchange(V x, long timeout, TimeUnit unit) // 超时交换数据

6.3 应用场景

6.3.1 模拟交易场景
public class ExchangerDemo {private static Exchanger<String> exchanger = new Exchanger<>();static String goods = "电脑";static String money = "$4000";public static void main(String[] args) {// 卖家线程new Thread(() -> {try {System.out.println("卖家已准备好货:" + goods);String received = exchanger.exchange(goods);System.out.println("卖家收到钱:" + received);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 买家线程new Thread(() -> {try {System.out.println("买家已准备好钱:" + money);String received = exchanger.exchange(money);System.out.println("买家收到货:" + received);} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}
6.3.2 数据校验场景
public class ExchangerDemo2 {private static final Exchanger<String> exchanger = new Exchanger<>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(() -> {try {String dataA = "12379871924sfkhfksdhfks";exchanger.exchange(dataA);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {try {String dataB = "32423423jknjkfsbfj";String dataA = exchanger.exchange(dataB);System.out.println("数据一致性检查:" + dataA.equals(dataB));} catch (InterruptedException e) {e.printStackTrace();}});threadPool.shutdown();}
}

6.4 应用场景总结

  1. 数据交换:两个线程间进行数据交换
  2. 数据采集:在采集线程和处理线程间交换数据
  3. 数据校对:两个线程生产的数据进行相互校验

7. Phaser(阶段协同器)

7.1 概述

Phaser 是更灵活的阶段同步器,可以视为 CyclicBarrier 和 CountDownLatch 的进化版,支持动态调整参与线程数量。

7.2 常用 API

// 构造器
Phaser()
Phaser(int parties)
Phaser(Phaser parent)
Phaser(Phaser parent, int parties)int register() // 注册一个新参与者
int bulkRegister(int parties) // 批量注册参与者
int arrive() // 到达但不等待
int arriveAndAwaitAdvance() // 到达并等待其他参与者
int arriveAndDeregister() // 到达并注销注册
int awaitAdvance(int phase) // 等待特定阶段完成

7.3 应用场景

7.3.1 多线程批量处理数据
public class PhaserBatchProcessorDemo {private final List<String> data;private final int batchSize;private final int threadCount;private final Phaser phaser;private final List<String> processedData;public PhaserBatchProcessorDemo(List<String> data, int batchSize, int threadCount) {this.data = data;this.batchSize = batchSize;this.threadCount = threadCount;this.phaser = new Phaser(1);this.processedData = new ArrayList<>();}public void process() {for (int i = 0; i < threadCount; i++) {phaser.register();new Thread(new BatchProcessor(i)).start();}phaser.arriveAndDeregister();}private class BatchProcessor implements Runnable {private final int threadIndex;private int index = 0;public BatchProcessor(int threadIndex) {this.threadIndex = threadIndex;}@Overridepublic void run() {while (!phaser.isTerminated()) {phaser.arriveAndAwaitAdvance();List<String> batch = new ArrayList<>();synchronized (data) {while (index < data.size() && batch.size() < batchSize) {String item = data.get(index);if (!processedData.contains(item)) {batch.add(item);processedData.add(item);}index++;}}for (String item : batch) {System.out.println("线程" + threadIndex + "处理数据:" + item);}phaser.arriveAndAwaitAdvance();if (batch.isEmpty() || index >= data.size()) {phaser.arriveAndDeregister();break;}}}}
}
7.3.2 阶段性任务:模拟公司团建
public class PhaserDemo {public static void main(String[] args) {Phaser phaser = new Phaser() {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {int staffs = registeredParties - 1;switch (phase) {case 0: System.out.println("大家到公司了,出发去公园,人数:" + staffs); break;case 1: System.out.println("大家到公园了,出发去餐厅,人数:" + staffs); break;case 2: System.out.println("大家到餐厅了,开始用餐,人数:" + staffs); break;}return registeredParties == 1;}};phaser.register(); // 主线程注册// 创建参与团建的员工线程for (int i = 0; i < 3; i++) {phaser.register();new Thread(new StaffTask(phaser), "员工" + i).start();}}static class StaffTask implements Runnable {private final Phaser phaser;private static final Random random = new Random();public StaffTask(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {try {step1Task();phaser.arriveAndAwaitAdvance();step2Task();phaser.arriveAndAwaitAdvance();step3Task();phaser.arriveAndAwaitAdvance();step4Task();phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}private void step1Task() throws InterruptedException {System.out.println(Thread.currentThread().getName() + "从家出发了");Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + "到达公司");}// 其他步骤方法类似...}
}

7.4 应用场景总结

  1. 多线程任务分配:协调复杂任务的多个执行线程
  2. 多级任务流程:实现多级任务流程控制
  3. 模拟并行计算:协调并行计算中的多个阶段
  4. 动态线程管理:需要动态增加或减少参与线程的场景

8. 总结对比

工具类特点适用场景
ReentrantLock可重入、可中断、可公平、支持条件变量需要细粒度控制的同步场景
Semaphore控制并发访问数量限流、资源池管理
CountDownLatch一次性、等待多个线程完成初始化、任务开始前的准备工作
CyclicBarrier可重复使用、线程相互等待分阶段任务、重复性任务
Exchanger两个线程间数据交换数据交换、数据校对
Phaser灵活的阶段同步、动态调整复杂多阶段任务、动态线程管理

选择合适的同步工具类可以大大提高并发程序的性能和可维护性。在实际开发中,应根据具体需求选择最合适的工具,并注意正确使用以避免死锁和性能问题。

http://www.dtcms.com/a/339768.html

相关文章:

  • WordPress 从删除文章后(清空回收站)保存被删除文章的链接到txt
  • 24.早期目标检测
  • Nacos-7--扩展一下:0-RTT和1-RTT怎么理解?
  • 【unitrix数间混合计算】3.2 非零标记trait(non_zero.rs)
  • JVM垃圾回收(GC)深度解析:原理、调优与问题排查
  • libvaapi,libva-utils源码获取并编译测试
  • 深入理解AQS:并发编程的基石
  • django生成迁移文件,执行生成到数据库
  • sfc_os!SfcValidateDLL函数分析之SfcGetValidationData
  • Android音频学习(十三)——音量配置文件分析
  • Python数据分析:DataFrame,reindex,重建索引。有时候整型变浮点型,有时候又不变?
  • FPGA 在情绪识别领域的护理应用(一)
  • 第二十六天:static、const、#define的用法和区别
  • Java:Assert与 Return
  • ZKmall开源商城跨境物流解决方案:让全球配送从复杂到可控的实战之路
  • 深入理解 MySQL 主从同步
  • 【弦乐教程】弦乐家族与音源解析:从乐器到音色的全面认识
  • nodejs使用
  • python matplotlib库如何使用
  • 构造:算法设计中的“魔法工具箱
  • 【C++】C++ 的护身符:解锁 try-catch 异常处理
  • IPD流程执行检查表
  • pnpm 和 npm 差异
  • Spring事务基础:你在入门时踩过的所有坑
  • MoonBit Perals Vol.06: Moonbit 与 LLVM 共舞 (上):编译前端实现
  • 【深度解析】2025年中国GEO优化公司:如何驱动“答案营销”
  • python学习DAY46打卡
  • Vulkan笔记(十)-图形管道的七个配置项
  • 微服务-07.微服务拆分-微服务项目结构说明
  • VulKan笔记(九)-着色器