JUC之并发容器
文章目录
- 一、并发容器概述
- 1.1 概述
- 1.2 JUC并发容器概述
- 二、并发Map容器
- 2.1 ConcurrentHashMap
- 2.1.1 核心原理
- 2.1.2 核心流程图
- 2.1.3 示例代码
- 2.1.4 应用场景
- 2.2 ConcurrentSkipListMap
- 2.2.1 核心原理
- 2.2.2 使用示例
- 2.2.3 应用场景
- 三、并发 List 与 Set
- 3.1 CopyOnWriteArrayList
- 3.2 CopyOnWriteArraySet
- 四、并发队列
- 4.1 BlockingQueue 接口
- 4.2 ArrayBlockingQueue
- 4.3 LinkedBlockingQueue
- 4.4 SynchronousQueue
- 4.5 其他阻塞队列
- 五、其他并发容器
- 5.1 ConcurrentLinkedQueue
- 5.2 ConcurrentLinkedDeque
- 六、并发容器选择对比
- 6.1 选择依据
- 6.2 对比传统同步容器
- 七、使用经验与最佳实践
- 7.1 JUC容器的核心价值
- 7.2 关键选择原则
- 7.3 并发容器选择决策图
- 八、使用经验总结
在多线程编程中,容器是我们存储和操作数据的基本工具。然而,传统的 Java 集合框架(如 HashMap、ArrayList)并非线程安全,在并发环境下使用可能导致数据不一致甚至程序崩溃。为此,JDK 的
java.util.concurrent
(简称 JUC)包提供了一系列线程安全的并发容器,专门用于解决高并发场景下的数据存储与访问问题。
一、并发容器概述
1.1 概述
- 分而治之:通过分割数据结构,让不同线程操作不同部分,减少锁竞争
- CAS 无锁算法:使用乐观锁机制,减少传统锁带来的性能开销
与传统同步容器(如Vector
、Hashtable
)相比,JUC 并发容器具有以下优势:
- 更高的并发性能:允许多个线程同时读写
- 更精细的锁控制:避免全表锁导致的性能瓶颈
- 更丰富的功能:支持原子操作、延迟初始化等高级特性
1.2 JUC并发容器概述
容器类型 | 典型实现 | 特点 |
---|---|---|
Map | ConcurrentHashMap | 高性能线程安全哈希表 |
List | CopyOnWriteArrayList | 写时复制,读多写少 |
Set | CopyOnWriteArraySet | 基于 CopyOnWriteArrayList |
Queue | ConcurrentLinkedQueue ArrayBlockingQueue LinkedBlockingQueue | 支持并发读写 |
Deque | ConcurrentLinkedDeque | 双向队列 |
二、并发Map容器
2.1 ConcurrentHashMap
2.1.1 核心原理
- JDK 7:使用分段锁(Segment),将数据分成多个段,每个段独立加锁。
-
JDK 8+弃用分段锁,采用 CAS + synchronized
实现:
- 数组 + 链表/红黑树:当链表长度超过阈值(默认8),转换为红黑树。
- 无锁化更新:读操作不加锁,写操作通过 CAS 和 synchronized 控制。
2.1.2 核心流程图
- 支持高并发:读操作几乎无锁(使用 volatile 保证可见性),写操作只锁定单个节点
- 弱一致性:迭代器不会抛出
ConcurrentModificationException
,但可能看不到最新修改 - 初始容量 16,负载因子 0.75,扩容时容量翻倍
2.1.3 示例代码
import java.util.concurrent.ConcurrentHashMap;public class ConcurrentHashMapDemo {public static void main(String[] args) {// 创建 ConcurrentHashMapConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();// 多线程写入for (int i = 0; i < 10; i++) {String key = "key" + i;int value = i * 10;map.put(key, value); // 线程安全写入}// 多线程读取for (int i = 0; i < 10; i++) {String key = "key" + i;Integer val = map.get(key); // 无锁读取System.out.println("Key: " + key + ", Value: " + val);}}
}
2.1.4 应用场景
- 高并发环境下的缓存系统
- 计数器场景(如网站访问量统计)
- 多线程共享的配置信息存储
- 需要频繁读写的键值对存储
2.2 ConcurrentSkipListMap
ConcurrentSkipListMap
是一个基于跳表(SkipList)实现的并发有序映射,类似于TreeMap
的线程安全版本。
2.2.1 核心原理
跳表是一种可以替代平衡树的数据结构,通过在每个节点上增加多层索引,实现快速查找。
ConcurrentSkipListMap
的特点:
- 元素按 key 自然顺序或自定义比较器排序
- 支持并发的插入、删除和查找操作
- 平均时间复杂度为 O (log n),与平衡树相当
- 无锁设计,使用 CAS 操作保证线程安全
2.2.2 使用示例
package cn.tcmeta.collections;import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;public class ConcurrentSkipListMapExample {// 创建ConcurrentSkipListMap实例,按key自然排序private static final ConcurrentSkipListMap<Integer, String> skipListMap =new ConcurrentSkipListMap<>();private static final CountDownLatch latch = new CountDownLatch(3);static void main(String[] args) throws InterruptedException {// 线程1:添加偶数keynew Thread(() -> {try {for (int i = 0; i < 10; i += 2) {skipListMap.put(i, "value-" + i);Thread.sleep(10); // 模拟耗时操作}} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}, "偶数线程").start();// 线程2:添加奇数keynew Thread(() -> {try {for (int i = 1; i < 10; i += 2) {skipListMap.put(i, "value-" + i);Thread.sleep(15); // 模拟耗时操作}} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}, "奇数线程").start();// 线程3:读取数据new Thread(() -> {try {Thread.sleep(50); // 等待一会儿再读取// 打印所有键值对,会按key有序输出System.out.println("当前映射内容:");for (Map.Entry<Integer, String> entry : skipListMap.entrySet()) {System.out.println(entry.getKey() + ": " + entry.getValue());}// 演示导航方法System.out.println("\n小于5的最大key: " + skipListMap.lowerKey(5));System.out.println("大于等于5的最小key: " + skipListMap.ceilingKey(5));System.out.println("从3到7的子映射: " + skipListMap.subMap(3, true, 7, true));} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}, "读取线程").start();latch.await();System.out.println("\n最终映射大小: " + skipListMap.size());}
}
运行结果会按 key 的自然顺序输出,展示了ConcurrentSkipListMap
的有序特性和丰富的导航方法。
2.2.3 应用场景
- 需要排序的并发映射场景
- 需频繁进行范围查询的场景(如按时间范围查询日志)
- 高并发下的排行榜实现(如游戏积分排名)
三、并发 List 与 Set
3.1 CopyOnWriteArrayList
CopyOnWriteArrayList
是ArrayList
的线程安全变体,其核心思想是 “写时复制”:当需要修改集合时,会创建底层数组的副本,修改操作在副本上进行,完成后再将引用指向新数组。
关键特性:
- 读操作无锁,性能优异(直接读取 volatile 数组)
- 写操作需要复制整个数组,开销较大
- 迭代器遍历的是数组的快照,不会抛出
ConcurrentModificationException
- 适合读多写少的场景
示例代码
package cn.tcmeta.collections;import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;public class CopyOnWriteArrayListExample {private static final List<String> list = new CopyOnWriteArrayList<>();private static final CountDownLatch latch = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {// 初始化列表list.add("元素1");list.add("元素2");list.add("元素3");// 读线程:迭代列表new Thread(() -> {try {// 获取迭代器(此时的快照)Iterator<String> iterator = list.iterator();Thread.sleep(500); // 等待写线程执行修改// 遍历迭代器,仍会看到原始元素System.out.println("读线程 - 迭代器内容:");while (iterator.hasNext()) {System.out.println(iterator.next());}// 直接访问列表,能看到最新内容System.out.println("\n读线程 - 直接访问列表:");System.out.println(list);} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}, "读线程").start();// 写线程:修改列表new Thread(() -> {try {Thread.sleep(200); // 等待读线程获取迭代器list.add("元素4");list.remove("元素2");System.out.println("\n写线程 - 已添加元素4并删除元素2");} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}, "写线程").start();latch.await();}
}
写线程 - 已添加元素4并删除元素2
读线程 - 迭代器内容:
元素1
元素2
元素3读线程 - 直接访问列表:
[元素1, 元素3, 元素4]
结果展示了CopyOnWriteArrayList
的弱一致性:迭代器使用的是创建时的数组快照,而直接访问可以看到最新修改。
应用场景
- 读操作远多于写操作的场景
- 不需要实时获取最新数据的场景
- 事件监听器列表(如 GUI 中的事件监听器)
- 配置项缓存(配置不常变更,但需要频繁读取)
3.2 CopyOnWriteArraySet
CopyOnWriteArraySet
基于CopyOnWriteArrayList
实现,内部持有一个CopyOnWriteArrayList
实例,其.add () 方法实际上调用的是CopyOnWriteArrayList
的.addIfAbsent () 方法,确保元素唯一性。
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements Serializable {private final CopyOnWriteArrayList<E> al;public boolean add(E e) {return al.addIfAbsent(e);}// 其他方法...
}
它的特性和适用场景与CopyOnWriteArrayList
类似,适用于读多写少且需要保证元素唯一性的场景。
四、并发队列
队列是多线程通信的重要工具,JUC 提供了多种并发队列实现,其中BlockingQueue
接口的实现类最为常用,它们支持阻塞的插入和移除操作。
实现类 | 特点 |
---|---|
ArrayBlockingQueue | 有界队列,基于数组 |
LinkedBlockingQueue | 无界队列,基于链表 |
PriorityBlockingQueue | 优先级队列 |
SynchronousQueue | 不存储元素,仅传递数据 |
4.1 BlockingQueue 接口
BlockingQueue
继承自Queue
,并添加了以下阻塞操作:
put(e)
:将元素插入队列,若队列满则阻塞take()
:从队列获取元素,若队列空则阻塞offer(e, timeout, unit)
:插入元素,若队列满则等待指定时间poll(timeout, unit)
:获取元素,若队列空则等待指定时间
BlockingQueue
通常用于生产者 - 消费者模式,生产者线程向队列添加元素,消费者线程从队列获取元素。
4.2 ArrayBlockingQueue
ArrayBlockingQueue
是基于数组实现的有界阻塞队列,创建时必须指定容量,且容量一旦确定就不能更改。
核心特性:
- 有界队列:容量固定,满了之后生产者会被阻塞
- 公平性选择:可以指定是否按 FIFO 顺序访问(公平模式),默认是非公平的
- 内部使用独占锁(ReentrantLock)控制并发,所有操作都在同一把锁上进行
示例代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class ArrayBlockingQueueExample {// 创建容量为5的ArrayBlockingQueue,公平模式private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5, true);public static void main(String[] args) {// 启动3个生产者for (int i = 0; i < 3; i++) {new Thread(new Producer(i), "生产者-" + i).start();}// 启动2个消费者for (int i = 0; i < 2; i++) {new Thread(new Consumer(i), "消费者-" + i).start();}}// 生产者static class Producer implements Runnable {private int id;public Producer(int id) {this.id = id;}@Overridepublic void run() {try {for (int i = 0; i < 5; i++) {int value = id * 10 + i;queue.put(value); // 若队列满则阻塞System.out.println(Thread.currentThread().getName() + " 生产了: " + value + ", 队列大小: " + queue.size());Thread.sleep(100); // 模拟生产耗时}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}// 消费者static class Consumer implements Runnable {private int id;public Consumer(int id) {this.id = id;}@Overridepublic void run() {try {for (int i = 0; i < 7; i++) {Integer value = queue.take(); // 若队列空则阻塞System.out.println(" " + Thread.currentThread().getName() + " 消费了: " + value + ", 队列大小: " + queue.size());Thread.sleep(200); // 模拟消费耗时}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
这个示例展示了 3 个生产者和 2 个消费者通过ArrayBlockingQueue
进行通信的过程,队列容量为 5,当队列满时生产者会阻塞,当队列空时消费者会阻塞。
4.3 LinkedBlockingQueue
LinkedBlockingQueue
是基于链表实现的阻塞队列,默认容量为Integer.MAX_VALUE
(可视为无界队列),也可以指定容量使其成为有界队列。
与 ArrayBlockingQueue 的对比:
特性 | ArrayBlockingQueue | LinkedBlockingQueue |
---|---|---|
数据结构 | 数组 | 链表 |
容量 | 必须指定,固定不变 | 可选,默认无界 |
锁实现 | 单锁(读写共享) | 双锁(读锁和写锁分离) |
内存占用 | 初始分配固定大小 | 动态分配,内存效率高 |
性能 | 读写操作竞争同一把锁 | 读写操作可并行,高并发下性能更好 |
应用场景
- 任务队列:如线程池中的任务队列
- 消息传递:线程间通信的消息队列
- 生产者和消费者速度不匹配的场景
4.4 SynchronousQueue
SynchronousQueue
是一个特殊的阻塞队列,它本身不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。可以理解为 “直接传递” 队列,生产者直接将元素传递给消费者,而不经过队列存储。
核心特性:
- 不存储元素,容量为 0
- 支持公平和非公平模式
- 适合直接传递的场景,减少中间存储开销
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;public class SynchronousQueueExample {public static void main(String[] args) {// 创建SynchronousQueue,公平模式SynchronousQueue<String> queue = new SynchronousQueue<>(true);// 生产者线程new Thread(() -> {try {String[] messages = {"消息1", "消息2", "消息3"};for (String msg : messages) {System.out.println("生产者发送: " + msg);// put会阻塞,直到有消费者接收queue.put(msg);System.out.println("生产者确认: " + msg + " 已被接收");}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "生产者").start();// 延迟启动消费者线程,展示阻塞效果try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 消费者线程new Thread(() -> {try {for (int i = 0; i < 3; i++) {// take会阻塞,直到有生产者发送消息String msg = queue.take();System.out.println(" 消费者接收: " + msg);// 模拟处理时间TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "消费者").start();}
}
生产者发送: 消息1消费者接收: 消息1
生产者确认: 消息1 已被接收
生产者发送: 消息2消费者接收: 消息2
生产者确认: 消息2 已被接收
生产者发送: 消息3消费者接收: 消息3
生产者确认: 消息3 已被接收
结果展示了SynchronousQueue
的直接传递特性:生产者必须等待消费者接收消息后才能继续发送下一条消息。
应用场景:
- 线程池:
Executors.newCachedThreadPool()
使用了SynchronousQueue
- 两个线程之间需要直接传递数据的场景
- 交替执行的任务(如生产者和消费者必须同步工作)
4.5 其他阻塞队列
-
PriorityBlockingQueue:
- 支持优先级的无界阻塞队列
- 元素按自然顺序或自定义比较器排序
- 适合需要按优先级处理任务的场景(如任务调度)
-
DelayQueue:
- 元素只有在延迟时间到期后才能被取出
- 元素必须实现
Delayed
接口 - 适合定时任务、缓存过期清理等场景
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;// 实现Delayed接口的任务类
class DelayedTask implements Delayed {private String name;private long delayTime; // 延迟时间(毫秒)private long executeTime; // 执行时间 = 当前时间 + 延迟时间public DelayedTask(String name, long delayTime) {this.name = name;this.delayTime = delayTime;this.executeTime = System.currentTimeMillis() + delayTime;}// 返回剩余延迟时间@Overridepublic long getDelay(TimeUnit unit) {long remaining = executeTime - System.currentTimeMillis();return unit.convert(remaining, TimeUnit.MILLISECONDS);}// 比较器,用于排序@Overridepublic int compareTo(Delayed o) {return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);}@Overridepublic String toString() {return name + " (延迟 " + delayTime + "ms)";}
}public class DelayQueueExample {public static void main(String[] args) {DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();// 添加延迟任务delayQueue.put(new DelayedTask("任务1", 1000)); // 延迟1秒delayQueue.put(new DelayedTask("任务2", 3000)); // 延迟3秒delayQueue.put(new DelayedTask("任务3", 2000)); // 延迟2秒System.out.println("开始等待任务到期...");// 启动消费者线程new Thread(() -> {try {while (!delayQueue.isEmpty()) {// take()会阻塞直到有任务到期DelayedTask task = delayQueue.take();System.out.println("执行任务: " + task + ", 当前时间: " + System.currentTimeMillis());}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
运行结果会按延迟时间顺序执行任务,展示了DelayQueue
的延迟特性。
五、其他并发容器
5.1 ConcurrentLinkedQueue
ConcurrentLinkedQueue
是基于链表的无界非阻塞队列,使用 CAS 操作保证线程安全,适合高并发场景。
特性:
- 无界队列,理论上可以无限添加元素
- 非阻塞算法,使用 CAS 实现线程安全
- 迭代器弱一致性,不抛出
ConcurrentModificationException
- 性能优于
LinkedBlockingQueue
(无锁竞争)
5.2 ConcurrentLinkedDeque
ConcurrentLinkedDeque
是基于双向链表的无界非阻塞双端队列,支持在队列两端进行元素的添加和移除操作。
适用场景:
- 需要在两端操作队列的场景
- 实现栈(LIFO)或队列(FIFO)
- 双端生产者 - 消费者模型
六、并发容器选择对比
6.1 选择依据
场景 | 推荐容器 | 理由 |
---|---|---|
高频读写 | ConcurrentHashMap | 锁粒度小,性能高 |
读多写少 | CopyOnWriteArrayList | 无锁读取,适合静态数据 |
生产消费模型 | BlockingQueue | 自动阻塞与唤醒机制 |
优先级处理 | PriorityBlockingQueue | 内置优先级排序 |
场景 | 推荐容器 | 原因 |
---|---|---|
高并发缓存 | ConcurrentHashMap | 分段锁/CAS,高吞吐量 |
有序并发映射 | ConcurrentSkipListMap | 跳表实现,有序性保证 |
生产者消费者 | ArrayBlockingQueue/LinkedBlockingQueue | 阻塞机制,流量控制 |
高吞吐队列 | ConcurrentLinkedQueue | CAS操作,无锁高性能 |
读多写少List | CopyOnWriteArrayList | 写时复制,读无锁 |
定时任务调度 | DelayQueue | 延迟获取,时间排序 |
直接传递 | SynchronousQueue | 无缓冲,直接交接 |
6.2 对比传统同步容器
容器 | 同步方式 | 性能 | 适用场景 |
---|---|---|---|
Vector | synchronized | 低 | 不推荐使用 |
Collections.synchronizedList | synchronized | 低 | 不推荐使用 |
ConcurrentHashMap | CAS + synchronized | 高 | 高频读写 |
CopyOnWriteArrayList | 写时复制 | 读高写低 | 读多写少 |
七、使用经验与最佳实践
✅ 正确使用建议
- 优先选择 JUC 容器:避免手动加锁实现线程安全。
- 根据场景选择容器
- 读多写少 →
CopyOnWriteArrayList
- 高频读写 →
ConcurrentHashMap
- 生产消费 →
BlockingQueue
- 读多写少 →
- 避免滥用:非必要场景无需使用并发容器,避免性能浪费。
❌ 常见陷阱
- 误用
Vector
:Vector
的synchronized
会降低性能。 - 忽略容器特性:如
CopyOnWriteArrayList
写操作开销大。 - 未正确处理阻塞:如
BlockingQueue
的put/take
会阻塞线程。
7.1 JUC容器的核心价值
- 线程安全:无需手动加锁即可实现并发安全。
- 高性能:通过无锁算法、分段锁等优化性能。
- 适用广泛:覆盖 Map、List、Set、Queue 等常见集合类型。
7.2 关键选择原则
- 读多写少:优先使用
CopyOnWriteArrayList
。 - 高频读写:选择
ConcurrentHashMap
。 - 生产消费:使用
BlockingQueue
及其子类。
JUC 容器 = 线程安全 + 高性能 + 场景适配,是构建高并发应用的基石。
7.3 并发容器选择决策图
关键点:根据读写频率和阻塞需求选择合适容器。
八、使用经验总结
- 避免过度同步:JUC 并发容器内部已经实现了线程安全,使用时无需额外加锁,否则会降低性能。
- 注意弱一致性:大多数并发容器的迭代器是弱一致性的,不能保证看到最新修改,这是为了换取更高的并发性能。
- 合理设置初始容量:如
ConcurrentHashMap
、ArrayBlockingQueue
等,合理的初始容量可以减少扩容带来的性能开销。 - 谨慎使用无界容器:无界容器(如
LinkedBlockingQueue
默认设置)在生产者速度远快于消费者时可能导致内存溢出,建议根据实际场景设置合理的容量上限。 - CopyOnWrite 容器的写操作代价高:每次写操作都会复制整个数组,因此不适合频繁修改的场景,也不适合存储大量数据。
- 优先使用 JUC 容器而非同步包装器:
Collections.synchronizedXXX()
创建的同步容器性能较差,应优先选择 JUC 提供的并发容器。 - 注意线程中断:
BlockingQueue
的put
和take
方法会响应线程中断,在使用时需要妥善处理InterruptedException
。 - 理解并发容器的内存可见性:JUC 并发容器通过 volatile、CAS 和锁机制保证了内存可见性,一个线程的修改对其他线程是可见的。