java多线程及线程池
线程
- 一、什么是多线程?
- 二、线程的生命周期
- 三、简单地创建一个线程
- 1、实现Runnable接口
- 2、继承Thread类
- 3、使用Callable和FutureTask
- 4、三种实现方式的对比
- 四、线程同步和锁
- 1、为什么需要线程同步?
- 2、线程同步的实现方式
- 3、synchronized和ReentrantLock使用示例
- 五、线程池
- 1、Executor接口
- 2、ExecutorService接口
- isShutdown() 和 isTerminated()的区别
- 3、Executors工厂类
- 4、Executors示例
- 5、一般使用ThreadPoolExecutor而不是Executors
- 6、ThreadPoolExecutor示例
- 7、线程池的关闭
- shutdownNow()和shutdown()的区别
- 注意处理InterruptedException
- 什么是阻塞操作?
一、什么是多线程?
多线程是指在一个程序中同时运行多个线程,每个线程都是独立的执行流程,可以在同一时间内执行不同的任务,从而提高程序的并发性和效率。
二、线程的生命周期
在Thread.State枚举类中可以看到线程的如下阶段(状态)
阶段 | 说明 | 补充 |
---|---|---|
新建(New) | 从新建一个线程对象到程序start() 这个线程之间的状态,都是新建状态。 | 在这个状态下,JVM已经为此线程分配了必要的内存。 |
就绪(Runnable) | 线程对象调用start()方法后,就处于就绪状态。就绪状态的线程在获得CPU时间片后就可以开始运行。 | 这个状态的线程位于可运行线程池中,等待被线程调度选中,获得CPU的使用权。 |
运行状态(Running) | 就绪状态下的线程在获取CPU资源后就可以执行run(),此时的线程便处于运行状态。 | 运行状态的线程可变为就绪、阻塞及死亡三种状态。 |
阻塞状态(Blocked) | 当一个线程试图获取一个内部的对象锁(也就是进入一个synchronized块),而该锁被其他线程持有,则该线程进入阻塞状态。 | 阻塞状态的线程在锁被释放时,将会进入就绪状态。 |
等待状态(Waiting) | 线程通过调用其自身的wait()方法、join()方法或LockSupport.park()方法,或者通过调用其他线程的join()方法,可以进入等待状态。 | 在等待状态的线程不会被分配CPU时间片,它们只能通过被其他线程显式唤醒进入就绪状态。 |
超时等待状态(Timed Waiting) | 当线程调用了sleep(long ms),wait(long ms),join(long ms),或者LockSupport.parkNanos(), LockSupport.parkUntil()等具有指定等待时间的方法,线程就会进入超时等待状态。 | 当超时时间到达后,线程会自动返回到就绪状态。 |
终止状态(Terminated) | 当线程的run()方法执行完毕,或者线程中断,线程就会进入终止状态 | 在这个状态下,线程已经完成了它的全部工作。 |
Thread t = new Thread(); // 线程此时处于New状态
t.start(); // 线程此时处于Runnable状态
t.wait(); // 线程此时处于Waiting状态
t.join(); // 线程此时处于Waiting状态
Thread.sleep(1000); // 线程此时处于Timed Waiting状态
三、简单地创建一个线程
1、实现Runnable接口
最简洁的方式是直接用lambda表达式:
Thread thread = new Thread(() -> {
while (true) {
//具体逻辑
}
});
thread.setName("Thread-Example");
thread.start();
调用start 与 run方法的区别:
直接调用 run() 是在主线程中执行了 run(),没有启动新的线程。
使用 start() 是启动新的线程,通过新的线程间接执行 run()方法中的代码。
上面的lambda表达式相当于使用了匿名内部类:
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
// 具体逻辑
}
}
});
实现Runnable接口完整的写法是这样的:
// 定义一个Runnable接口的实现类,任何你想用于多线程的类,来实现Runnable接口
class MyRunnable implements Runnable {
@Override
public void run() {
// 定义线程需要执行的逻辑
System.out.println("这是一个实现Runnable接口的线程");
}
}
public class Main {
public static void main(String[] args) {
// 创建Runnable对象
MyRunnable runnable = new MyRunnable();
// 创建线程对象,并将Runnable对象传递给Thread构造方法
Thread thread = new Thread(runnable);
// 启动线程
thread.start();
}
}
2、继承Thread类
// 定义一个Thread的子类
class MyThread extends Thread {
@Override
public void run() {
// 定义线程需要执行的逻辑
System.out.println("这是一个继承Thread类的线程");
}
}
public class Main {
public static void main(String[] args) {
// 创建线程对象
MyThread thread = new MyThread();
// 启动线程
thread.start();
}
}
3、使用Callable和FutureTask
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
// 线程需要执行的逻辑
System.out.println("这是一个实现Callable接口的线程");
return "子线程执行完成,返回结果。";
}
}
public class Main {
public static void main(String[] args) {
MyCallable callable = new MyCallable();
FutureTask<String> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();
try {
String result = futureTask.get();
System.out.println("主线程获取到结果:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
4、三种实现方式的对比
特性 | 继承Thread类 | 实现Runnable接口 | 使用Callable和FutureTask |
---|---|---|---|
继承/实现 | 继承Thread类 | 实现Runnable接口 | 实现Callable接口 |
单继承限制 | 受单继承限制,无法继承其他类 | 可以继承其他类 | 可以继承其他类 |
代码复用性 | 代码复用性较差 | 代码复用性更好 | 代码复用性更好 |
适用场景 | 简单的线程逻辑,不需要继承其他类 | 复杂逻辑,需要继承其他类的场景 | 需要返回结果的异步任务,结合线程池使用 |
资源共享 | 不支持资源共享 | 支持资源共享 | 支持资源共享 |
结果返回 | 不支持 | 不支持 | 支持 |
异常处理 | run()方法不声明异常,需在方法内处理 | run()方法不声明异常,需在方法内处理 | call()方法可声明异常,主线程可捕获 |
四、线程同步和锁
1、为什么需要线程同步?
在多线程环境中,多个线程可能会同时访问和修改同一个共享资源,这可能导致数据不一致和其他并发问题。例如,在银行账户的存取款操作中,如果不进行适当的同步,可能会出现以下情况:
数据不一致:两个线程同时读取账户余额,各自进行操作后,写回不同的结果,导致账户余额错误。
竞态条件:多个线程在竞争共享资源,导致程序执行结果不可预测。
为了避免这些问题,必须使用线程同步机制,确保在同一时间只有一个线程能够访问和修改共享资源。
2、线程同步的实现方式
Java提供了多种线程同步机制,包括:
synchronized关键字:可以修饰方法或代码块,确保在同一时间只有一个线程执行同步代码。
Lock接口:提供了更灵活的锁定机制,可以实现更复杂的同步逻辑。
atomic变量:使用Atomic类来实现线程安全的变量操作,避免加锁。
volatile关键字:确保变量的内存可见性,避免由于缓存不一致导致的问题。
3、synchronized和ReentrantLock使用示例
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// 银行账户类
class BankAccount {
private double balance;
// 使用synchronized关键字修饰的存款方法
public synchronized void deposit(double amount) {
balance += amount;
System.out.println("存款后余额:" + balance);
}
// 使用synchronized关键字修饰的取款方法
public synchronized void withdraw(double amount) {
if (balance >= amount) {
balance -= amount;
System.out.println("取款后余额:" + balance);
} else {
System.out.println("余额不足!");
}
}
// 使用Lock实现的存款方法
private final Lock lock = new ReentrantLock();
public void depositWithLock(double amount) {
lock.lock();
try {
balance += amount;
System.out.println("Lock存款后余额:" + balance);
} finally {
lock.unlock();
}
}
// 使用Lock实现的取款方法
public void withdrawWithLock(double amount) {
lock.lock();
try {
if (balance >= amount) {
balance -= amount;
System.out.println("Lock取款后余额:" + balance);
} else {
System.out.println("Lock余额不足!");
}
} finally {
lock.unlock();
}
}
}
// 模拟银行账户操作的线程
class TransactionThread implements Runnable {
private BankAccount account;
private double amount;
private boolean isUsingSynchronized;
public TransactionThread(BankAccount account, double amount, boolean isUsingSynchronized) {
this.account = account;
this.amount = amount;
this.isUsingSynchronized = isUsingSynchronized;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
if (isUsingSynchronized) {
account.deposit(amount);
account.withdraw(amount);
} else {
account.depositWithLock(amount);
account.withdrawWithLock(amount);
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ThreadSynchronizationExample {
public static void main(String[] args) {
// 创建银行账户实例
BankAccount account = new BankAccount();
// 使用synchronized关键字的线程
System.out.println("=== 使用synchronized关键字 ===");
Thread thread1 = new Thread(new TransactionThread(account, 100, true));
Thread thread2 = new Thread(new TransactionThread(account, 50, true));
thread1.start();
thread2.start();
// 等待synchronized线程完成
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 使用Lock接口的线程
System.out.println("\n=== 使用Lock接口 ===");
Thread thread3 = new Thread(new TransactionThread(account, 200, false));
Thread thread4 = new Thread(new TransactionThread(account, 150, false));
thread3.start();
thread4.start();
// 等待Lock线程完成
try {
thread3.join();
thread4.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
五、线程池
Java的java.util.concurrent(简称JUC)包中提供了一套丰富的线程池工具。
该部分内容参考原文链接
1、Executor接口
Executor接口是JUC包中定义的一个执行器接口,它只有一个execute方法,接收一个Runnable对象作为参数,并执行Runnable中的操作。这个接口非常简单,但它定义了执行器的基本功能。
public interface Executor {
void execute(Runnable command);
}
在实际应用中,我们通常不会直接使用Executor接口,而是使用它的子接口ExecutorService,它提供了更丰富的功能。
2、ExecutorService接口
ExecutorService接口继承自Executor接口,并增加了关于执行器服务的定义。它提供了一系列的方法,包括关闭执行器、立即关闭、检查执行器是否关闭、等待任务终止、提交有返回值的任务以及批量提交任务等。
public interface ExecutorService extends Executor {
void shutdown(); //关闭执行器,已提交的任务会继续执行,但不接受新的任务。
List<Runnable> shutdownNow(); //立即关闭执行器,尝试停止所有正在执行的任务,并返回等待执行的任务列表。
boolean isShutdown(); //检查执行器是否已关闭。
boolean isTerminated(); //检查执行器是否已终止,即所有任务都已完成。
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //等待任务终止,如果超过指定时间则返回false。
<T> Future<T> submit(Callable<T> task); //提交一个有返回值的任务,并返回一个Future对象,通过该对象可以查看任务执行是否完成,并获取返回值。
<T> Future<T> submit(Runnable task, T result); //提交一个Runnable任务和一个结果值,当任务执行完成后,返回该结果值。注意这个结果值是在任务执行前就确定的,与任务的实际执行结果无关。如果希望获取任务的实际执行结果,应该使用Callable任务。
Future<?> submit(Runnable task); //提交一个Runnable任务,并返回一个Future对象。由于Runnable任务没有返回值,所以这个Future对象的get方法将返回null。这个方法主要用于将Runnable任务转换为Future对象,以便使用Future的相关功能(如取消任务、检查任务是否完成等)。但这个用法并不常见,因为Runnable任务本身就不支持返回值。更常见的做法是直接使用execute(Runnable command)方法执行Runnable任务。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交Callable任务,并返回一个Future对象的列表。当所有任务都完成后,可以通过这些Future对象获取任务的返回值。如果某个任务执行失败,那么对应的Future对象的get方法将抛出ExecutionException异常。这个方法会等待所有任务都完成后才返回。如果希望设置超时时间,可以使用另一个重载版本的方法。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; //批量提交Callable任务,并返回第一个成功完成的任务的返回值。当找到第一个成功完成的任务后,该方法会立即返回,而不会等待其他任务完成。如果所有任务都失败,那么该方法将抛出ExecutionException异常。这个方法通常用于实现“多个路径中选择一个最快路径”的场景。同样地,这个方法也有一个设置超时时间的重载版本。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
isShutdown() 和 isTerminated()的区别
isShutdown(): 判断线程池是否调用了 shutdown() 或 shutdownNow()。
isTerminated(): 判断线程池是否已完全终止(即所有任务都已完成)。
3、Executors工厂类
Executors是一个工厂类,它提供了一系列静态方法来创建不同类型的线程池。这些线程池都是ExecutorService接口的实现类。通过Executors的工厂方法,我们可以非常方便地创建和管理线程池。
需要注意的是,尽量不要用该类创建线程池。
因为Executors.newCachedThreadPool()根据需求创建线程,最大线程数为Integer.MAX_VALUE,可能导致过多线程消耗资源。
而Executors.newFixedThreadPool(int)和Executors.newSingleThreadExecutor()虽然限制了线程池的数量,但队列最大容量为Integer.MAX_VALUE,也可能导致内存溢出。
因此我们一般使用ThreadPoolExecutor
//创建一个大小为10的固定线程池。
//创建时指定线程池的大小,当有新任务提交时,如果线程池中有空闲线程,则使用空闲线程执行任务;如果没有空闲线程,则新任务会等待直到有线程空闲出来。这种线程池适用于已知并发压力的情况下,对线程数做限制,避免由于大量线程的创建和销毁带来的性能开销。
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建一个单线程执行器。
//所有提交的任务都会按照提交的顺序依次执行,这种线程池适用于需要保证任务执行顺序的场景。
ExecutorService executor = Executors.newSingleThreadExecutor();
//创建一个可缓存的线程池。
//可缓存的线程池,这种线程池会根据需要创建线程来执行任务,并且可以重复利用已存在的线程来执行新的任务。当线程池中的线程在一定时间内没有执行任务时,它会被自动销毁以释放资源。
//这种线程池适用于并发压力较大且任务执行时间较短的场景,如Web服务器处理HTTP请求等。但需要注意的是,在实际应用中我们可能需要更加谨慎地使用CachedThreadPool,因为如果不当使用可能会导致系统资源耗尽(如创建过- 多的线程导致内存溢出等)。因此在使用CachedThreadPool时需要特别关注任务的执行时间和数量以及系统的资源状况等因素。
ExecutorService executor = Executors.newCachedThreadPool();
4、Executors示例
@Component
public class ThreadMgr {
private volatile ExecutorService testThreadPoolExecutor;
public ExecutorService getTestExecutor() {
if (testThreadPoolExecutor == null) {
synchronized (this) {
if (testThreadPoolExecutor == null) {
testThreadPoolExecutor = Executors.newFixedThreadPool(3);
}
}
}
return testThreadPoolExecutor;
}
}
@Component
public class TaskInit implements ApplicationRunner {
@Autowired
private ThreadMgr threadMgr;
@Override
public void run(ApplicationArguments args) {
//省略数据库查询等逻辑
//获取线程池并执行相关逻辑
threadMgr.getTestExecutor().submit(() -> XXX);
}
}
注意这里的TaskInit去继承ApplicationRunner并把逻辑放在run里面,只是为了程序启动时执行run方法中的内容。而threadMgr.getTaskExecutor().submit(() -> XXX);
这一句中lambda表达式后面的部分(即XXX,可以是调用的方法等),实际上还是实现了Runnable的run方法,相当于
threadMgr.getTaskExecutor().submit(new Runnable() {
@Override
public void run() {
// 具体逻辑
XXX
}
});
5、一般使用ThreadPoolExecutor而不是Executors
ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。
ThreadPoolExecutor也是Executors的底层实现。
该部分内容参考博客原文
ThreadPoolExecutor构造函数:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler){
//省略具体内容
}
参数 | 含义 | 是否必填 |
---|---|---|
corePoolSize | 线程池中核心线程数的最大值 | 是 |
maximumPoolSize | 线程池中能拥有最多线程数 | 是 |
keepAliveTime | 表示空闲线程的存活时间 | 是 |
TimeUnit | keepAliveTime的单位 | 是 |
workQueue | 用于缓存任务的阻塞队列 | 是 |
threadFactory | 指定创建线程的工厂,不指定时默认使用Executors.defaultThreadFactory() | 否 |
handler | 表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略;不指定时默认策略为AbortPolicy(),即抛出异常,由开发人员进行处理。 | 否 |
6、ThreadPoolExecutor示例
@Component
public class ThreadMgr {
private volatile ExecutorService testThreadPoolExecutor2;
public ExecutorService getTestExecutor2() {
if (testThreadPoolExecutor2 == null) {
synchronized (this) {
if (testThreadPoolExecutor2 == null) {
//这里只是给了简单的示例,实际可以通过读取配置等方式限制
testThreadPoolExecutor2 = new ThreadPoolExecutor(10,20,6000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(20000));
}
}
}
return testThreadPoolExecutor2;
}
}
//其他类中调用
threadMgr.getTestExecutor2().submit(() -> XXX);
注意这里的synchronized 主要用于确保仅在一个线程中初始化 testThreadPoolExecutor2,确保线程池的单例性。
使用线程池中的任务时,仍需确保任务自身的线程安全,特别是涉及共享状态时。
7、线程池的关闭
前文我们已经明白,可以通过ExecutorService类的shutdownNow()和shutdown()去关闭线程池。
shutdownNow()和shutdown()的区别
调用shutdown(); 已提交的任务会继续执行,但不接受新的任务。
但调用shutdownNow(); 则会去尝试停止所有正在执行的任务。但是注意,并不是所有任务都一定会成功被关闭。
shutdownNow() 通过调用 Thread.interrupt() 向所有由 executor 管理的线程发送中断信号。
这些线程的中断标志会被设置为 true,并且如果它们在执行阻塞操作,会抛出InterruptedException。但如果是非阻塞操作,则不会被打断,因此我们在代码中也可以加入if(Thread.currentThread().isInterrupted())的判断,来检查线程是否中断,以及时进行需要的处理。
注意处理InterruptedException
在某些情况下,即使线程被中断,JVM也可能不会抛出 InterruptedException。因此在有阻塞操作的代码块中手动catch InterruptedException,catch后调用Thread.currentThread().interrupt(); 可以确保其他依赖于中断的方法可以正常工作。
什么是阻塞操作?
1、等待与睡眠
Thread.sleep(long millis):使当前线程进入睡眠状态,直到指定时间过去或被中断。
Object.wait():使当前线程等待,直到其他线程调用 notify() 或 notifyAll()。
Thread.join():等待指定线程完成执行。
2、I/O 操作
文件输入输出:如 FileInputStream.read(), FileOutputStream.write()
网络通信:如 Socket.receive(), Socket.send()
数据库操作(查询、事务提交或回滚):如 ResultSet.next(), Statement.executeQuery()
3、 锁与同步
使用 synchronized 关键字的同步块:如果其他线程持有锁,当前线程会阻塞,直到锁被释放。
使用 Lock 接口:如 lock.lock()(如果锁已被占用,线程会阻塞,直到获取锁)。
使用 Semaphore:如果信号量不可用,线程会阻塞,直到信号量被释放。
4、线程通信
使用 Object.wait() 和 Object.notify() 进行线程间通信。
使用 Condition 接口:如 await(),会阻塞线程,直到被信号唤醒。
5、 远程调用
REST API 调用:如 HttpURLConnection 或 OkHttp 发送请求,等待服务器响应。
RPC(远程过程调用):如 gRPC、Dubbo 等,等待远程服务返回结果。
6、其他阻塞操作
使用 BlockingQueue 的 put() 和 take() 方法:如果队列已满(put())或为空(take()),线程会阻塞。
使用 CountDownLatch.await():等待计数器归零。
使用 CyclicBarrier.await():等待所有线程到达屏障。