JAVA中的多线程
目录
什么是并发?(多线程的概念)
线程的创建方式
Thred线程
Runnable接口线程
Callable接口线程
ExecutorService实现线程
线程池
newCachedThreadPool
newFixedThreadPool
newScheduledThreadPool
newSingleThreadExecutor
自定义线程池
线程池的核心参数
线程的生命周期
线程中的方法
线程安全问题
使用同步机制
使用volatile关键字
原子变量类(Atomic)
使用线程安全的集合类
避免共享可变状态
什么是并发?(多线程的概念)
-
并发指的是在同一时间段内,有多个任务交替执行,看起来是同时进行的。它提高了程序的资源利用率和响应能力
-
多线程是实现并发的主要方式之一。一个程序可以启动多个线程,多个线程之间可以同时执行不同的任务
-
在单核CPU上,多线程通过时间片轮转模拟并发;多核CPU上,线程可以真正并行执行。
线程的创建方式
Java中主要有三种创建线程的方式
Thred线程
Thread类是创建线程的方式之一,通过继承Thread然后重新run()方法,然后调用start()方法启动线程。
/** Thread线程测试方法*/
public class ThreadTest extends Thread {
@Overridepublic void run() {try {// 模拟线程执行任务System.out.println("线程 " + Thread.currentThread().getName() + " 正在执行任务...");Thread.sleep(2000); // 模拟任务耗时System.out.println("线程 " + Thread.currentThread().getName() + " 任务完成");} catch (InterruptedException e) {System.err.println("线程 " + Thread.currentThread().getName() + " 被中断: " + e.getMessage());}}
public static void main(String[] args) {ThreadTest thread1 = new ThreadTest();ThreadTest thread2 = new ThreadTest();ThreadTest thread3 = new ThreadTest();ThreadTest thread4 = new ThreadTest();ThreadTest thread5 = new ThreadTest();ThreadTest thread6 = new ThreadTest();thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();}
}
Runnable接口线程
runnable也是实现线程的方法之一,通过实现runnable接口重写run方法,然后创建Thread对象,将runnable对象传递,调用start方法
/** 线程runnable测试*/
public class RunnableTest implements Runnable{@Overridepublic void run() {try {// 模拟线程执行任务System.out.println("线程Runnable " + Thread.currentThread().getName() + " 正在执行任务...");Thread.sleep(2000); // 模拟任务耗时System.out.println("线程Runnable " + Thread.currentThread().getName() + " 任务完成");} catch (InterruptedException e) {System.err.println("线程Runnable " + Thread.currentThread().getName() + " 被中断: " + e.getMessage());}}
public static void main(String[] args) {RunnableTest runnable1 = new RunnableTest();RunnableTest runnable2 = new RunnableTest();RunnableTest runnable3 = new RunnableTest();RunnableTest runnable4 = new RunnableTest();RunnableTest runnable5 = new RunnableTest();RunnableTest runnable6 = new RunnableTest();
Thread thread1 = new Thread(runnable1);Thread thread2 = new Thread(runnable2);Thread thread3 = new Thread(runnable3);Thread thread4 = new Thread(runnable4);Thread thread5 = new Thread(runnable5);Thread thread6 = new Thread(runnable6);
thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();}
}
Callable接口线程
callable接口也可以实现线程,与runnable不同的是,它可以实现带返回值的结果
/** Callable测试方法*/
public class CallableTest implements Callable {@Overridepublic Object call() throws Exception {try {// 模拟任务执行System.out.println("Callable " + Thread.currentThread().getName() + " 正在执行任务...");Thread.sleep(2000); // 模拟任务耗时System.out.println("Callable " + Thread.currentThread().getName() + " 任务完成");return "任务完成";} catch (InterruptedException e) {System.err.println("Callable " + Thread.currentThread().getName() + " 被中断: " + e.getMessage());return "任务被中断";}}
public static void main(String[] args) {CallableTest callable1 = new CallableTest();CallableTest callable2 = new CallableTest();CallableTest callable3 = new CallableTest();CallableTest callable4 = new CallableTest();CallableTest callable5 = new CallableTest();CallableTest callable6 = new CallableTest();
Thread thread1 = new Thread(() -> {try {System.out.println(callable1.call());} catch (Exception e) {e.printStackTrace();}});Thread thread2 = new Thread(() -> {try {System.out.println(callable2.call());} catch (Exception e) {e.printStackTrace();}});Thread thread3 = new Thread(() -> {try {System.out.println(callable3.call());} catch (Exception e) {e.printStackTrace();}});Thread thread4 = new Thread(() -> {try {System.out.println(callable4.call());} catch (Exception e) {e.printStackTrace();}});Thread thread5 = new Thread(() -> {try {System.out.println(callable5.call());} catch (Exception e) {e.printStackTrace();}});Thread thread6 = new Thread(() -> {try {System.out.println(callable6.call());} catch (Exception e) {e.printStackTrace();}});
thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();}
}
ExecutorService实现线程
有返回值的任务必须实现 Callable 接口,类似的,无返回值的任务必须 Runnable 接口。执行 Callable 任务后,可以获取一个 Future 的对象,在该对象上调用 get 就可以获取到 Callable 任务 返回的 Object 了,再结合线程池接口 ExecutorService 就可以实现传说中有返回结果的多线程。
线程池
Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而 只是一个执行线程的工具。真正的线程池接口是 ExecutorService。ExecutorService实现的线程池总共有4种。
newCachedThreadPool
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造 的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并 从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
特点:
-
线程数量动态调整:无核心线程,最大线程数为
Integer.MAX_VALUE
。 -
空闲线程回收:线程空闲 60 秒后自动销毁。
-
任务队列:使用
SynchronousQueue
(无容量队列,任务直接交给线程或创建新线程)。
优点:
-
弹性伸缩:适合短期异步任务,避免频繁创建/销毁线程的开销。
-
自动回收:长时间空闲时不占用资源。
缺点:
-
线程数量无上限,可能耗尽系统资源(如大量并发时)。
适用场景:
-
高频短任务:例如 HTTP 请求处理、快速计算任务。
-
测试/原型开发:快速验证异步逻辑。
newFixedThreadPool
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数Threads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何 线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
特点:
-
固定线程数:核心线程数 = 最大线程数(由用户指定)。
-
任务队列:无界队列(
LinkedBlockingQueue
),任务堆积可能导致 OOM。 -
线程存活:即使空闲也不会被回收,除非调用
shutdown()
。
优点:
-
资源可控:避免线程数量爆炸。
-
稳定吞吐量:适合长期运行的任务。
缺点:
-
无界队列可能导致内存溢出(如任务提交速度远高于处理速度)。
适用场景:
-
CPU 密集型任务:线程数通常设为 CPU 核数 + 1。
-
稳定负载场景:例如后台批处理、定时统计。
newScheduledThreadPool
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
特点:
-
支持定时/周期性任务:通过
schedule()
、scheduleAtFixedRate()
等方法实现。 -
核心线程固定,但可扩展(通过
DelayedWorkQueue
管理任务)。
优点:
-
精准调度:适合需要延迟或定期执行的任务。
缺点:
-
复杂任务调度可能引发并发问题(需自行处理同步)。
适用场景:
-
定时任务:如每天凌晨数据备份。
-
延迟任务:如订单超时取消。
-
周期性任务:如心跳检测。
newSingleThreadExecutor
Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程 池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
特点:
-
只有一个线程:保证任务按提交顺序串行执行。
-
自动恢复:线程异常终止后会新建一个线程继续执行后续任务。
-
任务队列:无界队列(
LinkedBlockingQueue
)。
优点:
-
顺序执行:避免多线程并发问题(如数据竞争)。
-
容错性:线程崩溃不影响后续任务。
缺点:
-
吞吐量低,不适合高并发场景。
适用场景:
-
任务需严格顺序执行:如日志写入、单线程消费队列。
-
简单异步化:如 GUI 的事件分发线程(EDT)。
package com.xiancheng;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/** ExecutorService测试类*/
public class ExecutorServiceTest {public static void main(String[] args) throws InterruptedException {// 创建固定大小的线程池//ExecutorService executorService = Executors.newFixedThreadPool(5);
// 创建可缓存的线程池//ExecutorService executorService = Executors.newCachedThreadPool();
// 创建单线程的线程池//ExecutorService executorService = Executors.newSingleThreadExecutor();
//创建scheduled线程池ExecutorService executorService = Executors.newScheduledThreadPool(5);
CountDownLatch latch = new CountDownLatch(5); // 用于等待所有任务完成
// 提交5个任务到线程池for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = executorService.submit(() -> {try {System.out.println("任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName());long startTime = System.currentTimeMillis();
Thread.sleep(2000); // 模拟任务耗时
long endTime = System.currentTimeMillis();System.out.println("任务 " + taskId + " 完成,耗时: " + (endTime - startTime) + "ms");return "任务" + taskId + "成功完成";} catch (InterruptedException e) {System.err.println("任务 " + taskId + " 被中断: " + e.getMessage());Thread.currentThread().interrupt();return "任务" + taskId + "被中断";} finally {latch.countDown(); // 任务完成时计数减1}});}
// 关闭线程池(不再接受新任务)executorService.shutdown();
// 等待所有任务完成latch.await();System.out.println("所有任务已完成");
// 检查线程池是否完全终止if (executorService.isTerminated()) {System.out.println("线程池已完全终止");} else {System.out.println("线程池仍在终止过程中...");}}
}
自定义线程池
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/*** 自定义线程池工具类,提供线程池的创建、任务提交和优雅关闭功能*/
public final class ThreadPoolUtil {private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;private static final int DEFAULT_MAX_POOL_SIZE = DEFAULT_CORE_POOL_SIZE * 2;private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;private static final int DEFAULT_QUEUE_CAPACITY = 1000;
private ThreadPoolUtil() {// 私有构造函数,防止实例化}
/*** 创建固定大小的线程池* @param poolSize 线程池大小* @param threadNamePrefix 线程名称前缀* @return 线程池实例*/public static ExecutorService createFixedThreadPool(int poolSize, String threadNamePrefix) {return new ThreadPoolExecutor(poolSize,poolSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),createThreadFactory(threadNamePrefix),new ThreadPoolExecutor.CallerRunsPolicy());}
/*** 创建可扩展的线程池* @param corePoolSize 核心线程数* @param maxPoolSize 最大线程数* @param queueCapacity 任务队列容量* @param threadNamePrefix 线程名称前缀* @return 线程池实例*/public static ExecutorService createScalableThreadPool(int corePoolSize,int maxPoolSize,int queueCapacity,String threadNamePrefix) {return new ThreadPoolExecutor(corePoolSize,maxPoolSize,DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),createThreadFactory(threadNamePrefix),new ThreadPoolExecutor.CallerRunsPolicy());}
/*** 创建IO密集型任务的线程池(线程数通常设置为CPU核心数的2倍)* @param threadNamePrefix 线程名称前缀* @return 线程池实例*/public static ExecutorService createIOIntensivePool(String threadNamePrefix) {return createScalableThreadPool(DEFAULT_CORE_POOL_SIZE,DEFAULT_MAX_POOL_SIZE,DEFAULT_QUEUE_CAPACITY,threadNamePrefix);}
/*** 创建线程工厂* @param threadNamePrefix 线程名称前缀* @return 线程工厂实例*/private static ThreadFactory createThreadFactory(String threadNamePrefix) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(1);
@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, threadNamePrefix + "-" + threadNumber.getAndIncrement());if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}};}
/*** 提交任务到线程池并获取Future* @param executor 线程池* @param task 任务* @param <T> 返回值类型* @return Future对象*/public static <T> Future<T> submitTask(ExecutorService executor, Callable<T> task) {if (executor == null || executor.isShutdown() || executor.isTerminated()) {throw new IllegalStateException("线程池不可用");}return executor.submit(task);}
/*** 提交Runnable任务到线程池* @param executor 线程池* @param task 任务* @return Future对象*/public static Future<?> submitTask(ExecutorService executor, Runnable task) {if (executor == null || executor.isShutdown() || executor.isTerminated()) {throw new IllegalStateException("线程池不可用");}return executor.submit(task);}
/*** 优雅关闭线程池* @param executor 线程池*/public static void shutdownGracefully(ExecutorService executor) {if (executor == null || executor.isTerminated()) {return;}
try {// 禁止新任务提交executor.shutdown();// 等待未完成任务结束if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {// 超时后强制关闭executor.shutdownNow();// 再次等待任务终止if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {System.err.println("线程池关闭失败");}}} catch (InterruptedException e) {// 重新调用shutdownNowexecutor.shutdownNow();// 保留中断状态Thread.currentThread().interrupt();}}
/*** 获取线程池状态信息* @param executor 线程池* @return 状态信息字符串*/public static String getThreadPoolStatus(ExecutorService executor) {if (!(executor instanceof ThreadPoolExecutor)) {return "无法获取状态: 不是ThreadPoolExecutor实例";}
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;return String.format("线程池状态: 核心线程数=%d, 最大线程数=%d, 当前线程数=%d, " +"活跃线程数=%d, 已完成任务数=%d, 等待任务数=%d, 任务总数=%d",pool.getCorePoolSize(),pool.getMaximumPoolSize(),pool.getPoolSize(),pool.getActiveCount(),pool.getCompletedTaskCount(),pool.getQueue().size(),pool.getTaskCount());}
}
线程池的核心参数
参数名称 | 作用 |
---|---|
corePoolSize | 核心线程数,线程池始终保持的线程数量,即使空闲也不会销毁 |
maximumPoolSize | 最大线程数,线程池允许的最大线程数量 |
keepAliveTime | 非核心线程空闲后的存活时间,超过则销毁非核心线程 |
unit | keepAliveTime的时间单位,比如秒、毫秒 |
workQueue | 任务队列,用来存储等待执行的任务(如LinkedBlockingQueue 、ArrayBlockingQueue 等) |
threadFactory | 线程工厂,用来创建线程 |
handler | 饱和策略(拒绝策略),线程池满时如何处理新任务(如抛异常、放弃任务、调用者执行等) |
以下为线程任务执行的全过程,包括这些参数的作用
当我们submit一个任务的时候,会先判断当前的线程任务 < corePoolSize,如果是就会创建一个核心线程去执行该条任务,反之会进入阻塞队列等待并且等待空闲线程去执行任务。当任务队列满了,会采取建立非核心线程去执行任务,直到=maximumPoolSize。如果超过这个线程就会执行任务队列的拒绝策略(抛弃任务、抛出异常等)。当任务逐渐变少,临时线程会在最大存活时间之后自动销毁。
线程的生命周期
状态 | 说明 |
---|---|
新建(New) | 线程对象创建,调用构造器,但尚未启动 |
就绪(Runnable) | 线程调用start() 后,等待CPU调度执行 |
运行(Running) | 线程获得CPU时间,执行代码 |
阻塞(Blocked/Waiting/Timed Waiting) | 线程等待某种条件,如等待I/O、等待锁、调用sleep() 或wait() |
终止(Terminated) | 线程执行完成或被异常终止,线程生命周期结束 |
线程中的方法
方法名 | 说明 |
---|---|
start() | 启动线程,调用后线程进入就绪状态,准备运行 |
run() | 线程执行体,线程启动后执行的方法,不能直接调用run() 来启动线程,否则变成普通方法调用 |
sleep(long millis) | 让当前线程暂停指定时间(毫秒),进入阻塞状态,不释放锁 |
join() | 当前线程等待调用join() 的线程执行完成再继续执行 |
yield() | 暂停当前线程,礼让给其他同优先级线程执行,但不保证立即暂停 |
interrupt() | 中断线程,设置线程的中断标志,可以用于结束阻塞状态下的线程 |
isInterrupted() | 检查线程是否被中断(不清除中断标志) |
setDaemon(boolean) | 设置线程是否为守护线程,守护线程会随着所有用户线程结束而结束 |
getName() /setName() | 获取或设置线程名称 |
getPriority() /setPriority(int) | 获取或设置线程优先级,范围1~10,默认5 |
currentThread() | 静态方法,获取当前执行代码的线程对象 |
线程安全问题
使用同步机制
-
synchronized关键字 修饰代码块或方法,保证同一时刻只有一个线程执行被保护的代码。
public synchronized void increment() {count++; }
-
显示锁(Lock接口) 例如
ReentrantLock
,提供比synchronized
更灵活的锁控制。Lock lock = new ReentrantLock(); lock.lock(); try {count++; } finally {lock.unlock(); }
使用volatile关键字
保证变量的可见性,禁止指令重排序,但不保证原子性。
private volatile boolean flag = true;
适用于状态标志的可见性控制。
原子变量类(Atomic)
Java提供java.util.concurrent.atomic
包下的原子变量类,比如AtomicInteger
,提供原子操作,无需加锁。
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();
使用线程安全的集合类
如ConcurrentHashMap
、CopyOnWriteArrayList
,代替普通的HashMap
和ArrayList
。
避免共享可变状态
设计无状态或者不可变对象,减少共享变量。