史上最详细Java并发多线程(面试必备,一篇足矣)
第一章:线程基础
1.1 线程与进程
进程:系统资源分配的基本单位,拥有独立的内存空间
线程:CPU调度的基本单位,共享进程内存空间
关系:一个进程可包含多个线程,线程切换成本远低于进程
1.2 线程的创建方式
1.2.1 继承Thread类
public class MyThread extends Thread {@Overridepublic void run() {System.out.println("Thread running: " + Thread.currentThread().getName());}public static void main(String[] args) {MyThread thread = new MyThread();thread.start(); // 启动线程,不能直接调用run()}
}
1.2.2 实现Runnable接口
public class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println("Runnable running: " + Thread.currentThread().getName());}public static void main(String[] args) {Thread thread = new Thread(new MyRunnable());thread.start();}
}
1.2.3 使用Callable和Future
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;public class MyCallable implements Callable<Integer> {@Overridepublic Integer call() throws Exception {return 1 + 1;}public static void main(String[] args) throws Exception {FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());new Thread(futureTask).start();System.out.println("Result: " + futureTask.get()); // 获取返回值}
}
1.3 线程生命周期与状态转换
1.3.1 六种状态定义
- NEW:线程创建但未启动
- RUNNABLE:可运行状态(包含就绪和运行中)
- BLOCKED:等待监视器锁的阻塞状态
- WAITING:无限期等待状态
- TIMED_WAITING:限期等待状态
- TERMINATED:终止状态
1.3.2 状态转换完整图示
┌───────────┐ start() ┌───────────┐
│ NEW ├────────────────►│ RUNNABLE │
└───────────┘ └─────┬─────┘│┌───────────────────────────┼───────────────────────────┐│ │ │▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────────────┐
│ BLOCKED │ │ WAITING │ │ TIMED_WAITING │
└─────┬─────┘ └─────┬─────┘ └─────────┬─────────┘│ │ │└───────────────────────┼───────────────────────────┘│▼┌───────────┐│TERMINATED │└───────────┘
1.3.3 状态转换触发条件
状态转换 | 触发条件 |
---|---|
NEW→RUNNABLE | 调用start()方法 |
RUNNABLE→BLOCKED | 进入synchronized块/方法未获取到锁 |
RUNNABLE→WAITING | 调用Object.wait()、Thread.join()、LockSupport.park() |
RUNNABLE→TIMED_WAITING | 调用Thread.sleep(long)、Object.wait(long)、Thread.join(long) |
BLOCKED/WAITING/TIMED_WAITING→RUNNABLE | 获取到锁、被唤醒、超时 |
所有状态→TERMINATED | run()执行完毕或抛出未捕获异常 |
1.4 并发带来的问题
不可见性,乱序性,非原子性 ,这里先介绍一个概念
1.4.1 JMM
Java 内存模型(Java Memory Model,JMM)规范了Java 虚拟机与计算机内存是如何协同工作的。Java 虚拟机是一个完整的计算机的一个模型,因此这个模型自然也包含一个内存模型——又称为 Java 内存模型。
JVM 主内存与工作内存
Java 内存模型中规定了所有的变量都存储在主内存中,每条线程还有自己的工作内存,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量。
这里的工作内存是 JMM 的一个抽象概念,也叫本地内存,其存储了该线程读/写共享变量的副本。
1.4.2 并发带来的问题
非原子性问题
原子性指一个操作是不可分割的,要么全部执行完成,要么完全不执行。若操作被拆分,多线程并发时会导致中间状态被干扰。
public class AtomicityDemo {private static int count = 0;public static void main(String[] args) throws InterruptedException {// 1000个线程,每个线程对count自增1000次Thread[] threads = new Thread[1000];for (int i = 0; i < 1000; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 1000; j++) {count++; // 非原子操作}});threads[i].start();}// 等待所有线程执行完毕for (int i = 0; i < 1000; i++) {threads[i].join();}System.out.println("最终count值:" + count); // 预期1000000,实际远小于该值}
}
现象:最终输出的 count 值远小于预期的 1000000。
原因:count++ 并非原子操作,实际分为三步:
读取 count 的当前值;
将值 + 1;
将结果写回 count。
多线程并发时,多个线程可能同时读取到同一个值,导致 “丢失更新”(例如两个线程同时读取到 100,都 + 1 后写回 101,实际应是 102)。
因为JVM 底层执行的时候是指令执行的,我们写的Java代码会被翻译为指令,说白了就是 你 在没有锁的情况下 写 ++ 这种可拆的操作会有风险,结果可能会和预期不一致
不可见性问题
可见性指一个线程对共享变量的修改,其他线程能立即看到。若缺乏可见性,线程可能一直使用旧数据,导致逻辑错误。
public class VisibilityDemo {private static boolean flag = false; // 未加volatilepublic static void main(String[] args) throws InterruptedException {// 线程1:循环等待flag变为trueThread t1 = new Thread(() -> {while (!flag) {// 空循环,等待flag更新}System.out.println("线程1:检测到flag变为true,退出循环");});t1.start();// 主线程:1秒后将flag改为trueThread.sleep(1000);flag = true;System.out.println("主线程:已将flag设为true");}
}
现象:线程 1 可能永远不会退出循环,始终打印 “主线程:已将 flag 设为 true”,但线程 1 无输出。
原因:线程 1 运行时,会将 flag 从主内存加载到自己的工作内存中;
由于 flag 没有volatile修饰,主线程对 flag 的修改可能仅更新自己的工作内存,未及时同步到主内存;
线程 1 始终读取自己工作内存中的旧值(false),导致循环无法退出。
有序性问题
有序性指程序执行顺序与代码顺序一致。编译器或 CPU 为优化性能,可能对指令重排序,若重排序破坏了逻辑依赖,会导致并发问题。
public class Singleton {private static Singleton instance; // 未加volatileprivate Singleton() {}public static Singleton getInstance() {if (instance == null) { // 第一次检查synchronized (Singleton.class) {if (instance == null) { // 第二次检查instance = new Singleton(); // 可能发生指令重排序}}}return instance;}
}
现象:多线程环境下,可能获取到未初始化完成的 Singleton 实例,导致空指针异常。
原因:instance = new Singleton() 可拆分为 3 步:
分配内存空间(memory = allocate ());
初始化对象(ctorInstance (memory));
将 instance 指向内存地址(instance = memory)。编译器可能重排序为 1→3→2:线程 A 执行到 3 时,instance 已非 null(指向未初始化的内存);
此时线程 B 进入getInstance(),第一次检查发现 instance≠null,直接返回未初始化的实例,导致使用时出错。
第二章:同步机制与锁
2.1 synchronized关键字详解
2.1.1 锁对象类型及区别
锁类型 | 语法形式 | 锁对象 | 作用范围 | 示例 |
---|---|---|---|---|
实例方法锁 | synchronized void method(){} | 当前实例(this) | 整个方法 | public synchronized void add() |
静态方法锁 | static synchronized void method(){} | 类的Class对象 | 整个静态方法 | public static synchronized void increment() |
代码块锁-实例 | synchronized(obj){} | 指定实例对象 | 代码块 | synchronized(this){ ... } |
代码块锁-类 | synchronized(Class.class){} | 类的Class对象 | 代码块 | synchronized(MyClass.class){ ... } |
实例锁与类锁的独立性:
public class SyncDemo {// 实例锁public synchronized void instanceLock() {}// 类锁public static synchronized void classLock() {}public static void main(String[] args) {SyncDemo demo = new SyncDemo();// 可以同时获取实例锁和类锁,两者相互独立new Thread(demo::instanceLock).start();new Thread(SyncDemo::classLock).start();}
}
2.1.2 底层实现:对象头与Monitor
对象头结构(64位JVM):
┌─────────────────────────────────────────────────────────────┐
│ Object Header (128 bits) │
├───────────────────────┬───────────────────────────────────┤
│ Mark Word (64 bits) │ Class Metadata (64 bits)│
└───────────────────────┴───────────────────────────────────┘
在 Hotspot 虚拟机中,对象在内存中的布局分为三块区域:对象头、实例数据和对齐填充;Java 对象头是实现 synchronized 的锁对象的基础,一般而言,synchronized 使用的锁对象是存储在 Java 对象头里。它是轻量级锁和偏向锁的关键。
对象头中有一块区域称为 Mark Word,用于存储对象自身的运行时数据,如哈希码(HashCode)、GC 分代年龄、锁状态标志、线程持有的锁、偏向线程ID等等。
32 位操作系统 Mark Word 为 32bit 为,64 位操作系统Mark Word为64bit. 下面就是对象头的一些信息:
Mark Word状态变化:
锁状态 | 结构 | 锁标志位 | 偏向锁标志 |
---|---|---|---|
无锁 | hashcode(31) + 分代年龄(4) + 0 + 001 | 001 | 0 |
偏向锁 | 线程ID(54) + epoch(2) + 分代年龄(4) + 1 + 001 | 001 | 1 |
轻量级锁 | 指向栈中锁记录的指针(62) + 00 | 00 | - |
重量级锁 | 指向Monitor的指针(62) + 10 | 10 | - |
GC标记 | 空 + 11 | 11 | - |
Monitor机制:
- 每个对象关联一个Monitor(管程)
- 包含WaitSet(等待队列)、EntryList(阻塞队列)和Owner(持有线程)
- 执行synchronized时,线程通过CAS竞争Monitor的Owner
2.1.3 锁升级完整流程
下面先介绍一些常见锁的概念
乐观锁/悲观锁
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。乐观锁则认为对于同一个数据的并发操作,是不会发生修改的。在更新数据的时候,会采用尝试更新,不断重新的方式更新数据。乐观的认为,不加锁的并发操作是没有事情的。
悲观锁认为对于同一个数据的并发操作,一定是会发生修改的,哪怕没有修改,也会认为修改。因此对于同一个数据的并发操作,悲观锁采取加锁的形式。悲观的认为,不加锁的并发操作一定会出问题。
从上面的描述我们可以看出,悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景,不加锁会带来大量的性能提升。
悲观锁在 Java 中的使用,就是利用各种锁。
乐观锁在 Java 中的使用,是无锁编程,常常采用的是CAS 算法,典型的例子就是原子类,通过 CAS 自旋实现原子操作的更新。
可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。对于 ReentrantLock 而言, 他的名字就可以看出是一个可重入锁,其名字是 ReentrantLock 重新进入锁。
对于 Synchronized 而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。
CAS
CAS(Compare-And-Swap) :比较并交换,该算法是硬件对于并发操作的支持。CAS 是乐观锁的一种实现,他采用的是自旋的思想,是一种轻量级的锁机制。即***每次判断我的预期值和内存中的值是不是相同,如果不相同则说明该内存值已经被其他线程更新过了,因此需要拿到该最新值作为预期值,重新判断。而该线程不断的循环判断是否该内存值已经被其他线程更新过了,这就是自旋的思想。***底层是通过 Unsafe 类中的 compareAndSwapInt 等方法实现.
读写锁
读写锁特点: 读读不互斥,读写互斥,写写互斥
加读锁是防止在另外的线程在此时写入数据,防止读取脏数据提高读的效率
分段锁
分段锁并非一种实际的锁,而是一种思想,用于将数据分段,并在每个分段上都会单独加锁,把锁进一步细粒度化,以提高并发效率。
自旋锁
所谓自旋其实指的就是自己重试,当线程抢锁失败后,重试几次,要是抢到锁了就继续,要是抢不到就持续循环尝试(自旋转)。说白了还是为了尽量不要阻塞线程。由此可见,自旋锁是是比较消耗 CPU 的,因为要不断的循环重试,不会释放CPU资源。另外,加锁时间普遍较短的场景非常适合自旋锁,可以极大提高锁的效率。
共享锁/独占锁
共享锁是指该锁可被多个线程所持有,并发访问共享资源。
独占锁也叫互斥锁,是指该锁一次只能被一个线程所持有。
对于 Java ReentrantLock,Synchronized 而言,都是独享锁。ReadWriteLock 是接口,其实现类(如 ReentrantReadWriteLock)支持读写分离锁。读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
1. 偏向锁
- 适用场景:单线程重复获取锁
- 实现:在Mark Word中存储线程ID,后续获取只需比对ID
- 撤销条件:当第二个线程尝试获取锁时,触发偏向锁撤销
2. 轻量级锁
- 适用场景:多线程交替执行同步块
- 实现:
- 线程在栈帧中创建Lock Record
- CAS将Mark Word复制到Lock Record并替换为指针
- 竞争时通过自旋尝试获取锁
3. 重量级锁
- 适用场景:多线程激烈竞争
- 实现:向操作系统申请互斥量(Mutex),线程阻塞等待
- 性能影响:涉及内核态切换,开销较大
升级触发条件:
无锁 → 偏向锁(首次获取)→ 轻量级锁(第二个线程竞争)→ 重量级锁(自旋失败或第三个线程竞争)
锁的状态升级是只针对 synchronized 来说的
代码演示锁升级:
import org.openjdk.jol.info.ClassLayout;public class LockUpgradeDemo {public static void main(String[] args) throws InterruptedException {Object lock = new Object();System.out.println("无锁状态:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());// 偏向锁synchronized (lock) {System.out.println("\n偏向锁状态:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());}// 轻量级锁(另一个线程竞争)Thread t1 = new Thread(() -> {synchronized (lock) {try { Thread.sleep(100); } catch (InterruptedException e) {}}});t1.start();t1.join();System.out.println("\n轻量级锁状态:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());// 重量级锁(多线程竞争)Thread t2 = new Thread(() -> synchronized (lock) {});Thread t3 = new Thread(() -> synchronized (lock) {});t2.start();t3.start();t2.join();t3.join();System.out.println("\n重量级锁状态:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());}
}
2.2 ReentrantLock深入解析
ReentrantLock 是 java.util.concurrent.locks 包下的类, 实现Lock接口,Lock 的意义在于提供区别于 synchronized 的另一种具有更多广泛操作的同步方式,它能支持更多灵活的结构
ReentrantLock 基于 AQS,在并发编程中它可以实现公平锁和非公平锁来对共享资源进行同步,同时和 synchronized 一样,ReentrantLock 支持可重入,除此之外,ReentrantLock 在调度上更灵活,支持更多丰富的功能。
ReentrantLock 总共有三个内部类,并且三个内部类是紧密相关的
public class ReentrantLock implements Lock {// 1. 抽象同步器:继承自 AQS,定义锁的基本操作abstract static class Sync extends AbstractQueuedSynchronizer { ... }// 2. 非公平锁实现:继承自 Syncstatic final class NonfairSync extends Sync { ... }// 3. 公平锁实现:继承自 Syncstatic final class FairSync extends Sync { ... }// 4. 条件对象(非内部类,但与锁密切相关)public class ConditionObject implements Condition { ... }
}
2.2.1 类结构与继承关系
虚线表示实现接口,实线表示继承
AQS 在 ReentrantLock 中的关键属性
public class ReentrantLock implements Lock {private final Sync sync; // 锁的核心实现,继承自 AQS// 抽象同步器,继承自 AQSabstract static class Sync extends AbstractQueuedSynchronizer {abstract void lock(); // 由子类实现的加锁方法// 其他方法...}// 非公平锁实现static final class NonfairSync extends Sync { ... }// 公平锁实现static final class FairSync extends Sync { ... }
}
2.2.2 公平锁与非公平锁实现对比
1. 加锁流程差异
- 非公平锁加锁流程
static final class NonfairSync extends Sync {final void lock() {// 关键区别:直接尝试 CAS 抢占锁,不管队列中是否有等待线程if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1); // 失败则进入 AQS 的正常获取流程}// 重写 AQS 的 tryAcquire 方法protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}// Sync 类中的非公平获取锁方法
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 关键区别:不检查队列,直接尝试 CASif (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
抢占式获取:线程进入 lock() 时直接尝试 CAS 修改 state,即使队列中已有等待线程。
失败后入队:如果 CAS 失败(锁已被占用),才进入 AQS 队列等待。
公平锁加锁流程
static final class FairSync extends Sync {final void lock() {acquire(1); // 调用 AQS 的 acquire 方法}// 重写 AQS 的 tryAcquire 方法protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); // 获取 AQS 的同步状态if (c == 0) {// 关键区别:先检查队列中是否有前驱节点,再尝试 CASif (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); // 设置当前线程为锁的持有者return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires; // 重入锁计数增加if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
hasQueuedPredecessors():检查队列中是否有其他线程在等待(判断当前线程是否是队列的第一个节点)。
如果有前驱节点,则当前线程必须入队等待,保证公平性。
CAS 操作:仅当队列中没有前驱节点时,才尝试通过 CAS 修改 state。
2. tryAcquire方法核心区别
-
非公平锁
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 不检查队列,直接CASif (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入逻辑...return false; }
-
公平锁:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 检查队列中是否有前驱线程if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入逻辑...return false; }
3. 性能对比
特性 | 公平锁 | 非公平锁 |
---|---|---|
吞吐量 | 较低 | 较高 |
响应时间 | 稳定(FIFO) | 可能波动 |
上下文切换 | 频繁 | 较少 |
饥饿风险 | 无 | 可能有 |
适用场景 | 对顺序敏感场景 | 高并发吞吐量优先 |
2.3 CAS与原子操作
2.3.1 CAS原理深度解析
CAS(Compare-And-Swap) 是乐观锁的核心实现,包含三个操作数:
- V:要更新的内存位置
- A:预期值
- B:新值
CPU指令实现:
- x86架构:
lock cmpxchg
指令 - ARM架构:
ldrex/strex
指令组合
Java实现:
// Unsafe类的CAS方法
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int update);
AtomicInteger自增实现:
public class AtomicInteger extends Number implements java.io.Serializable {private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile int value;public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}
}
2.3.2 ABA问题及解决方案
ABA问题:当变量从A变为B再变回A时,CAS无法检测中间变化
解决方案:
- 版本号机制:每次更新增加版本号
- AtomicStampedReference:Java提供的带版本戳的原子引用
代码示例:
import java.util.concurrent.atomic.AtomicStampedReference;public class ABADemo {public static void main(String[] args) {// 初始值100,版本号1AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(100, 1);// 线程1执行ABA操作new Thread(() -> {int stamp = asr.getStamp();System.out.println("线程1初始版本号: " + stamp);// A→Basr.compareAndSet(100, 101, stamp, stamp + 1);System.out.println("线程1修改后版本号: " + asr.getStamp());// B→Aasr.compareAndSet(101, 100, asr.getStamp(), asr.getStamp() + 1);System.out.println("线程1再次修改后版本号: " + asr.getStamp());}, "线程1").start();// 线程2尝试修改new Thread(() -> {int stamp = asr.getStamp();System.out.println("线程2初始版本号: " + stamp);try { Thread.sleep(1000); } catch (InterruptedException e) {}// 版本号已变化,修改失败boolean success = asr.compareAndSet(100, 200, stamp, stamp + 1);System.out.println("线程2修改是否成功: " + success);System.out.println("当前值: " + asr.getReference());System.out.println("当前版本号: " + asr.getStamp());}, "线程2").start();}
}
输出结果:
线程1初始版本号: 1
线程1修改后版本号: 2
线程1再次修改后版本号: 3
线程2初始版本号: 1
线程2修改是否成功: false
当前值: 100
当前版本号: 3
第三章:AQS框架详解
AQS 的 全 称 为 ( AbstractQueuedSynchronizer ),这个类在java.util.concurrent.locks 包下面。
AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出同步 器 , 是 JUC 中 核 心 的 组 件 , 比如我们提到的ReentrantLock,CountDownLatch 等等都是基于 AQS 来实现。只要搞懂了 AQS,那么 JUC 中绝大部分的 api 都能掌握。
3.1 AQS核心结构
AbstractQueuedSynchronizer是Java并发工具的基础框架,核心组成:
- 状态变量:
volatile int state
,表示同步状态 - 同步队列:CLH双向链表,存储等待线程
- 条件队列:单向链表,用于Condition等待/通知
Node节点结构:
static final class Node {volatile int waitStatus; // 等待状态volatile Node prev; // 前驱节点volatile Node next; // 后继节点volatile Thread thread; // 等待线程Node nextWaiter; // 条件队列后继节点
}
3.2 独占锁获取流程(acquire)
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
流程分解:
- tryAcquire:尝试获取锁(子类实现)
- addWaiter:将线程封装为Node加入队列尾部
- acquireQueued:在队列中自旋等待获取锁
- selfInterrupt:处理中断状态
3.3 Condition条件变量
Condition提供比Object.wait/notify更灵活的线程协作机制:
- 每个Lock可以创建多个Condition
- 支持精确的线程唤醒控制
实现原理:
- 等待队列:每个Condition维护一个单向等待队列
- await():释放锁并加入等待队列
- signal():将等待队列节点转移到同步队列
代码示例:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ConditionDemo {private static final Lock lock = new ReentrantLock();private static final Condition condition = lock.newCondition();private static boolean flag = false;public static void main(String[] args) {// 等待线程new Thread(() -> {lock.lock();try {while (!flag) {System.out.println("条件不满足,进入等待");condition.await(); // 释放锁并等待}System.out.println("条件满足,继续执行");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "等待线程").start();// 通知线程new Thread(() -> {lock.lock();try {flag = true;System.out.println("条件已满足,发出通知");condition.signal(); // 唤醒等待线程} finally {lock.unlock();}}, "通知线程").start();}
}
第四章:并发工具与实战
4.1 线程池原理与使用
核心参数:
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 非核心线程空闲时间TimeUnit unit, // 时间单位BlockingQueue<Runnable> workQueue, // 任务队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler // 拒绝策略
)
工作流程:
- 提交任务时,若核心线程未满则创建核心线程
- 核心线程满则加入任务队列
- 队列满则创建非核心线程
- 总线程数达最大值则执行拒绝策略
常用线程池:
- FixedThreadPool:固定大小线程池
- CachedThreadPool:可缓存线程池
- ScheduledThreadPool:定时任务线程池
- SingleThreadExecutor:单线程线程池
4.2 并发容器
容器类 | 特点 | 适用场景 |
---|---|---|
ConcurrentHashMap | 分段锁/CAS实现,支持高并发读写 | 高频读写场景 |
CopyOnWriteArrayList | 写时复制,读无锁 | 读多写少场景 |
ConcurrentLinkedQueue | 无锁队列,CAS操作 | 高并发生产者消费者 |
BlockingQueue | 支持阻塞的队列 | 线程间通信 |
4.3 synchronized与ReentrantLock对比
特性 | synchronized | ReentrantLock |
---|---|---|
实现层面 | JVM内置锁 | JDK API |
锁类型 | 可重入、非公平 | 可重入、可公平/非公平 |
灵活性 | 低(自动释放) | 高(可中断、超时、尝试获取) |
性能 | JDK1.6后优化,与Lock接近 | 高并发下更优 |
条件变量 | 一个对象一个条件队列 | 可创建多个Condition |
底层实现 | 对象头+Monitor | AQS框架+CAS |
代码示例对比:
// synchronized实现
public class SyncExample {private final Object lock = new Object();public void doSync() {synchronized (lock) {// 临界区}}
}// ReentrantLock实现
public class LockExample {private final Lock lock = new ReentrantLock();public void doLock() {lock.lock();try {// 临界区} finally {lock.unlock(); // 必须手动释放}}
}
第五章:总结与最佳实践
5.1 核心知识点总结
- 线程模型:掌握线程状态转换及生命周期管理
- 锁机制:理解synchronized锁升级过程和ReentrantLock实现原理
- AQS框架:熟悉同步队列和条件队列的工作机制
- 并发工具:合理选择线程池参数和并发容器
- 性能优化:根据场景选择合适的锁策略和同步机制
5.2 并发编程最佳实践
- 减少锁持有时间:同步块尽可能小
- 降低锁粒度:如ConcurrentHashMap的分段锁思想
- 避免死锁:固定加锁顺序、使用tryLock超时机制
- 优先使用并发容器:减少手动同步
- 合理设置线程池参数:根据CPU核心数和任务类型调整
- 使用原子类:无锁情况下替代锁机制
- 避免过度同步:不需要同步的代码不要加锁
5.3 常见问题排查
-
死锁检测:使用jstack命令分析线程状态
-
锁竞争:通过JConsole观察线程阻塞情况
-
内存可见性:正确使用volatile和final
-
线程泄漏:确保线程池正确关闭
-
CPU过高:检查是否存在无限循环或过度自旋
通过深入理解Java并发编程的底层原理和框架实现,能够编写出高效、安全的多线程程序,在面对高并发场景时能够做出合理的技术选型和性能优化。
一、高频场景:轻量级同步
1. synchronized 关键字
优势:语法简单,自动释放锁,JVM 对其进行了多种优化(偏向锁、轻量级锁)。
适用场景:简单同步块、方法级同步。
public class Counter {private int count = 0;public synchronized void increment() {count++;}
}
2. ReentrantLock(非公平模式)
优势:比 synchronized 更灵活(可中断、超时、条件变量),竞争激烈时性能更优。
适用场景:复杂同步逻辑,如需要尝试锁、可中断锁的场景。
private final ReentrantLock lock = new ReentrantLock();public void doCriticalWork() {lock.lock();try {// 临界区代码} finally {lock.unlock();}
}
二、读多写少场景:读写锁
1. ReentrantReadWriteLock
特性:读锁共享,写锁独占,读写互斥。
适用场景:缓存更新、数据库读写分离等读多写少的场景。
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private Map<String, Object> cache = new HashMap<>();public Object get(String key) {readLock.lock();try {return cache.get(key);} finally {readLock.unlock();}
}public void put(String key, Object value) {writeLock.lock();try {cache.put(key, value);} finally {writeLock.unlock();}
}
2. StampedLock(Java 8+)
特性:支持乐观读锁(无锁机制),读性能更高。
适用场景:读极多、写极少且写操作耗时短的场景(如缓存)。
private final StampedLock lock = new StampedLock();
private double x, y;public void move(double deltaX, double deltaY) {long stamp = lock.writeLock();try {x += deltaX;y += deltaY;} finally {lock.unlockWrite(stamp);}
}public double distanceFromOrigin() {long stamp = lock.tryOptimisticRead();double currentX = x, currentY = y;if (!lock.validate(stamp)) {stamp = lock.readLock();try {currentX = x;currentY = y;} finally {lock.unlockRead(stamp);}}return Math.hypot(currentX, currentY);
}
第6章 JUC常用类底层实现原理
JUC 是 java.util.concurrent 包的缩写,是 Java 并发编程的核心工具类库,主要用于解决多线程环境下的线程同步、并发控制、线程安全等问题。
6.1 ConcurrentHashMap
6.1.1 实现演进
JDK7 vs JDK8+对比
版本 | 核心技术 | 数据结构 | 并发度 | 锁粒度 |
---|---|---|---|---|
JDK7 | 分段锁(Segment) | Segment数组 + HashEntry数组 + 链表 | 等于Segment数量(默认16) | Segment级 |
JDK8+ | CAS + synchronized | Node数组 + 链表 + 红黑树 | 等于CPU核心数 | 桶头节点级 |
JDK8+核心改进
- 取消分段锁:减少锁竞争,提高并发度
- 红黑树优化:链表长度>8时转为红黑树,查询复杂度从O(n)→O(log n)
- CAS无锁操作:空桶插入时使用CAS避免加锁
- synchronized细粒度锁:仅锁定当前桶的头节点
6.1.2 核心源码分析
1. 节点结构
// 普通节点
static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;// ...
}// 红黑树节点容器
static final class TreeBin<K,V> extends Node<K,V> {TreeNode<K,V> root;volatile TreeNode<K,V> first;volatile Thread waiter;volatile int lockState;// ...
}
2. putVal方法核心逻辑
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode()); // 哈希扰动int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable(); // 延迟初始化else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// CAS无锁插入新节点if (casTabAt(tab, i, null, new Node<>(hash, key, value)))break;}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f); // 协助扩容else {V oldVal = null;// 锁定桶头节点synchronized (f) {if (tabAt(tab, i) == f) { // 二次检查if (fh >= 0) { // 链表节点binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<>(hash, key, value);break;}}}else if (f instanceof TreeBin) { // 红黑树节点Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i); // 链表转红黑树if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount); // 更新元素计数return null;
}
6.1.3 高效原因分析
- 细粒度锁:仅锁定当前操作的桶,不影响其他桶并发访问
- 无锁读操作:volatile数组保证读可见性,无需加锁
- 红黑树优化:解决长链表查询性能问题
- 分散扩容:扩容时多个线程可协助迁移数据,提高效率
- 计数优化:通过baseCount + CounterCell数组实现高效并发计数
6.2 LongAdder
6.2.1 分段累加设计
核心原理
- 继承关系:
LongAdder extends Striped64
- 核心字段:
transient volatile long base; // 基础值,低竞争时使用 transient volatile Cell[] cells; // 分段数组,高竞争时使用 transient volatile int cellsBusy; // 初始化/扩容锁(0-无锁,1-锁定)
Cell类(避免伪共享)
@sun.misc.Contended // 缓存行填充,避免伪共享
static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}
}
6.2.2 累加流程
- 低竞争:直接CAS更新base值
- 高竞争:根据线程哈希定位到cells数组元素进行CAS更新
- 动态扩容:当cells元素竞争激烈时,数组容量翻倍(最大2^24)
add方法源码
public void add(long x) {Cell[] cs; long b, v; int m; Cell c;// 1. 尝试直接更新baseif ((cs = cells) == null || !casBase(b = base, b + x)) {boolean uncontended = true;int h = getProbe(); // 线程哈希值// 2. 检查cells状态并尝试更新if (cs == null || (m = cs.length - 1) < 0 ||(c = cs[m & h]) == null ||!(uncontended = c.cas(v = c.value, v + x)))// 3. 复杂情况处理(初始化/扩容/重试)longAccumulate(x, null, uncontended, h);}
}
6.2.3 与AtomicLong对比
特性 | AtomicLong | LongAdder |
---|---|---|
实现 | 单一变量CAS | 分段CAS + 汇总 |
低并发性能 | 相近 | 相近 |
高并发性能 | 较差(CAS冲突严重) | 优异(分散竞争) |
内存占用 | 低 | 高(Cell数组) |
功能完整性 | 支持多种原子操作 | 仅支持加减操作 |
适用场景 | 功能需求多,并发低 | 计数场景,高并发 |
6.3 ThreadPoolExecutor
6.3.1 核心参数详解
构造函数
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 非核心线程空闲时间TimeUnit unit, // 时间单位BlockingQueue<Runnable> workQueue, // 任务队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler // 拒绝策略
)
工作队列类型
队列类型 | 特点 | 适用场景 |
---|---|---|
ArrayBlockingQueue | 有界,数组结构 | 资源有限,需控制队列大小 |
LinkedBlockingQueue | 无界/有界,链表结构 | 任务量大,无界时需防止OOM |
SynchronousQueue | 无缓冲,直接移交 | 任务需立即处理,配合无界线程池 |
PriorityBlockingQueue | 优先级排序 | 需按优先级处理任务 |
6.3.2 拒绝策略深度解析
内置策略对比
策略类 | 处理方式 | 适用场景 |
---|---|---|
AbortPolicy | 抛出RejectedExecutionException | 关键任务,需明确失败 |
CallerRunsPolicy | 由提交任务的线程执行 | 非核心任务,限流保护 |
DiscardPolicy | 静默丢弃任务 | 可丢失任务(如日志收集) |
DiscardOldestPolicy | 丢弃队列头部任务 | 允许任务过期(如缓存更新) |
自定义拒绝策略示例
// 保存被拒绝任务到数据库重试
RejectedExecutionHandler dbRetryHandler = (r, executor) -> {if (!executor.isShutdown()) {try {// 保存任务到数据库retryTaskService.save(new RetryTask(r));} catch (Exception e) {log.error("保存重试任务失败", e);// 保存失败时可降级为CallerRunsPolicynew ThreadPoolExecutor.CallerRunsPolicy().rejectedExecution(r, executor);}}
};
6.3.3 线程池状态管理
- 状态定义:
private static final int RUNNING = -1 << COUNT_BITS; // 运行中 private static final int SHUTDOWN = 0 << COUNT_BITS; // 已关闭 private static final int STOP = 1 << COUNT_BITS; // 已停止 private static final int TIDYING = 2 << COUNT_BITS; // 整理中 private static final int TERMINATED = 3 << COUNT_BITS; // 已终止
- 状态转换:
RUNNING → SHUTDOWN → TIDYING → TERMINATED
或RUNNING → STOP → TIDYING → TERMINATED
6.4 CompletableFuture
6.4.1 异步编程模型
核心接口
Future
:基础异步结果接口CompletionStage
:提供链式异步操作能力
常用方法分类
方法类型 | 作用 | 示例 |
---|---|---|
创建异步任务 | 启动异步计算 | supplyAsync(Supplier) runAsync(Runnable) |
结果处理 | 转换/消费异步结果 | thenApply(Function) thenAccept(Consumer) |
任务组合 | 组合多个异步任务 | thenCombine(...) allOf(...) anyOf(...) |
异常处理 | 捕获异步异常 | exceptionally(Function) handle(BiFunction) |
6.4.2 核心实现原理
1. 异步执行机制
- 默认使用
ForkJoinPool.commonPool()
- 可指定自定义线程池:
supplyAsync(Supplier, Executor)
2. 链式调用实现
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);
}private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d = new CompletableFuture<V>();if (e != null || !d.uniApply(this, f, null)) {UniApply<T,V> c = new UniApply<T,V>(e, this, d, f);push(c); // 添加到完成队列c.tryFire(SYNC); // 尝试执行}return d;
}
3. 异常传播机制
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);
}private CompletableFuture<T> uniExceptionallyStage(Function<Throwable, ? extends T> fn) {if (fn == null) throw new NullPointerException();CompletableFuture<T> d = new CompletableFuture<T>();if (!d.uniExceptionally(this, fn)) {UniExceptionally<T> c = new UniExceptionally<T>(this, d, fn);push(c);c.tryFire(SYNC);}return d;
}
6.4.3 高级应用示例
1. 多任务组合
// 两个独立任务组合
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);System.out.println(combined.join()); // 输出: Hello World
2. 超时处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(3000); } catch (InterruptedException e) {}return "Result";
}).orTimeout(2, TimeUnit.SECONDS) // 超时设置.exceptionally(ex -> "Timeout fallback");System.out.println(future.join()); // 输出: Timeout fallback
6.5 同步工具类
6.5.1 Semaphore(信号量)
基于AQS共享模式实现
- 状态表示:AQS的state表示可用许可数
- 获取许可:
acquire()
→tryAcquireShared(int)
→ CAS减少state - 释放许可:
release()
→tryReleaseShared(int)
→ CAS增加state
核心源码
// 非公平模式获取
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}// 释放许可
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
应用场景:连接池限流
public class ConnectionPool {private final Semaphore semaphore;private final List<Connection> connections;public ConnectionPool(int size) {this.semaphore = new Semaphore(size);this.connections = new ArrayList<>(size);// 初始化连接...}public Connection getConnection() throws InterruptedException {semaphore.acquire(); // 获取许可synchronized (connections) {return connections.remove(0);}}public void releaseConnection(Connection conn) {synchronized (connections) {connections.add(conn);}semaphore.release(); // 释放许可}
}
6.5.2 CountDownLatch与CyclicBarrier对比
特性 | CountDownLatch | CyclicBarrier |
---|---|---|
核心功能 | 一个线程等待其他线程完成 | 多个线程互相等待 |
计数器 | 一次性,减至0后不可重置 | 可通过reset()重置 |
阻塞线程 | 仅等待线程阻塞 | 所有参与线程阻塞 |
触发动作 | 等待线程继续执行 | 所有线程到达后执行屏障任务 |
底层实现 | AQS共享模式 | ReentrantLock + Condition |
CountDownLatch示例(等待子任务)
CountDownLatch latch = new CountDownLatch(3);// 启动3个子任务
for (int i = 0; i < 3; i++) {new Thread(() -> {try {// 子任务执行Thread.sleep(1000);System.out.println("子任务完成");} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown(); // 计数器减1}}).start();
}latch.await(); // 等待计数器为0
System.out.println("所有子任务完成,继续执行主线程");
CyclicBarrier示例(多线程同步)
// 3个线程到达后执行屏障任务
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("所有线程到达屏障,执行后续任务")
);for (int i = 0; i < 3; i++) {new Thread(() -> {try {// 线程任务Thread.sleep(new Random().nextInt(1000));System.out.println(Thread.currentThread().getName() + "到达屏障");barrier.await(); // 等待其他线程System.out.println(Thread.currentThread().getName() + "继续执行");} catch (Exception e) {e.printStackTrace();}}).start();
}
6.6 CopyOnWriteArrayList
6.6.1 写时复制机制
核心原理
- 读操作:直接访问volatile数组,无需加锁
- 写操作:复制新数组 → 修改新数组 → 替换旧数组(加锁保证原子性)
核心源码
// 读操作(无锁)
public E get(int index) {return get(getArray(), index);
}// 写操作(加锁复制)
public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}
}
6.6.2 适用场景与局限性
适用场景
- 读多写少:如配置列表、监听器集合
- 迭代安全:不希望迭代时抛出ConcurrentModificationException
局限性
- 内存开销大:每次写操作复制整个数组
- 数据一致性弱:读操作可能获取旧数据
- 写性能差:不适合频繁修改的场景
与Collections.synchronizedList对比
特性 | CopyOnWriteArrayList | Collections.synchronizedList |
---|---|---|
读性能 | 极高(无锁) | 一般(需加锁) |
写性能 | 差(数组复制) | 一般(加锁) |
内存占用 | 高 | 低 |
迭代安全性 | 安全(快照迭代) | 不安全(可能抛异常) |
适用场景 | 读多写极少 | 读写均衡 |
6.7 JUC类高效设计总结
JUC包各类的高效设计可归纳为以下核心思想:
- 分而治之:将竞争分散到多个单元(如LongAdder的Cell数组、ConcurrentHashMap的桶)
- 读写分离:读操作无锁化(如CopyOnWriteArrayList的写时复制、ConcurrentHashMap的volatile数组)
- CAS优化:使用无锁CAS操作减少锁开销(如Atomic系列、AQS核心操作)
- 锁粒度细化:降低锁竞争范围(如ConcurrentHashMap的桶级锁、ReentrantLock的对象级锁)
- 队列化管理:使用FIFO队列有序管理竞争线程(AQS同步队列)
- 懒加载:延迟初始化资源,减少初始开销(如ThreadPoolExecutor的核心线程创建)
- 自适应策略:根据竞争情况动态调整策略(如LongAdder的动态扩容、synchronized的锁升级)
理解这些设计思想,不仅能更好地使用JUC类,还能在自定义并发组件时借鉴这些高效模式。