当前位置: 首页 > news >正文

java多线程及线程池

线程

  • 一、什么是多线程?
  • 二、线程的生命周期
  • 三、简单地创建一个线程
    • 1、实现Runnable接口
    • 2、继承Thread类
    • 3、使用Callable和FutureTask
    • 4、三种实现方式的对比
  • 四、线程同步和锁
    • 1、为什么需要线程同步?
    • 2、线程同步的实现方式
    • 3、synchronized和ReentrantLock使用示例
  • 五、线程池
    • 1、Executor接口
    • 2、ExecutorService接口
      • isShutdown() 和 isTerminated()的区别
    • 3、Executors工厂类
    • 4、Executors示例
    • 5、一般使用ThreadPoolExecutor而不是Executors
    • 6、ThreadPoolExecutor示例
    • 7、线程池的关闭
      • shutdownNow()和shutdown()的区别
      • 注意处理InterruptedException
      • 什么是阻塞操作?

一、什么是多线程?

多线程是指在一个程序中同时运行多个线程,每个线程都是独立的执行流程,可以在同一时间内执行不同的任务,从而提高程序的并发性和效率。

二、线程的生命周期

在Thread.State枚举类中可以看到线程的如下阶段(状态)

阶段说明补充
新建(New)从新建一个线程对象到程序start() 这个线程之间的状态,都是新建状态。在这个状态下,JVM已经为此线程分配了必要的内存。
就绪(Runnable)线程对象调用start()方法后,就处于就绪状态。就绪状态的线程在获得CPU时间片后就可以开始运行。这个状态的线程位于可运行线程池中,等待被线程调度选中,获得CPU的使用权。
运行状态(Running)就绪状态下的线程在获取CPU资源后就可以执行run(),此时的线程便处于运行状态。运行状态的线程可变为就绪、阻塞及死亡三种状态。
阻塞状态(Blocked)当一个线程试图获取一个内部的对象锁(也就是进入一个synchronized块),而该锁被其他线程持有,则该线程进入阻塞状态。阻塞状态的线程在锁被释放时,将会进入就绪状态。
等待状态(Waiting)线程通过调用其自身的wait()方法、join()方法或LockSupport.park()方法,或者通过调用其他线程的join()方法,可以进入等待状态。在等待状态的线程不会被分配CPU时间片,它们只能通过被其他线程显式唤醒进入就绪状态。
超时等待状态(Timed Waiting)当线程调用了sleep(long ms),wait(long ms),join(long ms),或者LockSupport.parkNanos(), LockSupport.parkUntil()等具有指定等待时间的方法,线程就会进入超时等待状态。当超时时间到达后,线程会自动返回到就绪状态。
终止状态(Terminated)当线程的run()方法执行完毕,或者线程中断,线程就会进入终止状态在这个状态下,线程已经完成了它的全部工作。
Thread t = new Thread(); // 线程此时处于New状态
t.start(); // 线程此时处于Runnable状态
t.wait();  // 线程此时处于Waiting状态
t.join();  // 线程此时处于Waiting状态
Thread.sleep(1000); // 线程此时处于Timed Waiting状态

三、简单地创建一个线程

1、实现Runnable接口

最简洁的方式是直接用lambda表达式:

Thread thread = new Thread(() -> {
    while (true) {
	    //具体逻辑
    }
});
thread.setName("Thread-Example");
thread.start();

调用start 与 run方法的区别:
直接调用 run() 是在主线程中执行了 run(),没有启动新的线程。
使用 start() 是启动新的线程,通过新的线程间接执行 run()方法中的代码。

上面的lambda表达式相当于使用了匿名内部类:

Thread thread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (true) {
            // 具体逻辑
        }
    }
});

实现Runnable接口完整的写法是这样的:

// 定义一个Runnable接口的实现类,任何你想用于多线程的类,来实现Runnable接口
class MyRunnable implements Runnable {
    @Override
    public void run() {
        // 定义线程需要执行的逻辑
        System.out.println("这是一个实现Runnable接口的线程");
    }
}

public class Main {
    public static void main(String[] args) {
        // 创建Runnable对象
        MyRunnable runnable = new MyRunnable();
        // 创建线程对象,并将Runnable对象传递给Thread构造方法
        Thread thread = new Thread(runnable);
        // 启动线程
        thread.start();
    }
}

2、继承Thread类

// 定义一个Thread的子类
class MyThread extends Thread {
    @Override
    public void run() {
        // 定义线程需要执行的逻辑
        System.out.println("这是一个继承Thread类的线程");
    }
}

public class Main {
    public static void main(String[] args) {
        // 创建线程对象
        MyThread thread = new MyThread();
        // 启动线程
        thread.start();
    }
}

3、使用Callable和FutureTask

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        // 线程需要执行的逻辑
        System.out.println("这是一个实现Callable接口的线程");
        return "子线程执行完成,返回结果。";
    }
}

public class Main {
    public static void main(String[] args) {
        MyCallable callable = new MyCallable();
        FutureTask<String> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            String result = futureTask.get();
            System.out.println("主线程获取到结果:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

4、三种实现方式的对比

特性继承Thread类实现Runnable接口使用Callable和FutureTask
继承/实现继承Thread类实现Runnable接口实现Callable接口
单继承限制受单继承限制,无法继承其他类可以继承其他类可以继承其他类
代码复用性代码复用性较差代码复用性更好代码复用性更好
适用场景简单的线程逻辑,不需要继承其他类复杂逻辑,需要继承其他类的场景需要返回结果的异步任务,结合线程池使用
资源共享不支持资源共享支持资源共享支持资源共享
结果返回不支持不支持支持
异常处理run()方法不声明异常,需在方法内处理run()方法不声明异常,需在方法内处理call()方法可声明异常,主线程可捕获

四、线程同步和锁

1、为什么需要线程同步?

在多线程环境中,多个线程可能会同时访问和修改同一个共享资源,这可能导致数据不一致和其他并发问题。例如,在银行账户的存取款操作中,如果不进行适当的同步,可能会出现以下情况:

数据不一致:两个线程同时读取账户余额,各自进行操作后,写回不同的结果,导致账户余额错误。
竞态条件:多个线程在竞争共享资源,导致程序执行结果不可预测。
为了避免这些问题,必须使用线程同步机制,确保在同一时间只有一个线程能够访问和修改共享资源。

2、线程同步的实现方式

Java提供了多种线程同步机制,包括:

synchronized关键字:可以修饰方法或代码块,确保在同一时间只有一个线程执行同步代码。
Lock接口:提供了更灵活的锁定机制,可以实现更复杂的同步逻辑。
atomic变量:使用Atomic类来实现线程安全的变量操作,避免加锁。
volatile关键字:确保变量的内存可见性,避免由于缓存不一致导致的问题。

3、synchronized和ReentrantLock使用示例

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 银行账户类
class BankAccount {
    private double balance;

    // 使用synchronized关键字修饰的存款方法
    public synchronized void deposit(double amount) {
        balance += amount;
        System.out.println("存款后余额:" + balance);
    }

    // 使用synchronized关键字修饰的取款方法
    public synchronized void withdraw(double amount) {
        if (balance >= amount) {
            balance -= amount;
            System.out.println("取款后余额:" + balance);
        } else {
            System.out.println("余额不足!");
        }
    }

    // 使用Lock实现的存款方法
    private final Lock lock = new ReentrantLock();

    public void depositWithLock(double amount) {
        lock.lock();
        try {
            balance += amount;
            System.out.println("Lock存款后余额:" + balance);
        } finally {
            lock.unlock();
        }
    }

    // 使用Lock实现的取款方法
    public void withdrawWithLock(double amount) {
        lock.lock();
        try {
            if (balance >= amount) {
                balance -= amount;
                System.out.println("Lock取款后余额:" + balance);
            } else {
                System.out.println("Lock余额不足!");
            }
        } finally {
            lock.unlock();
        }
    }
}

// 模拟银行账户操作的线程
class TransactionThread implements Runnable {
    private BankAccount account;
    private double amount;
    private boolean isUsingSynchronized;

    public TransactionThread(BankAccount account, double amount, boolean isUsingSynchronized) {
        this.account = account;
        this.amount = amount;
        this.isUsingSynchronized = isUsingSynchronized;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                if (isUsingSynchronized) {
                    account.deposit(amount);
                    account.withdraw(amount);
                } else {
                    account.depositWithLock(amount);
                    account.withdrawWithLock(amount);
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ThreadSynchronizationExample {
    public static void main(String[] args) {
        // 创建银行账户实例
        BankAccount account = new BankAccount();

        // 使用synchronized关键字的线程
        System.out.println("=== 使用synchronized关键字 ===");
        Thread thread1 = new Thread(new TransactionThread(account, 100, true));
        Thread thread2 = new Thread(new TransactionThread(account, 50, true));
        thread1.start();
        thread2.start();

        // 等待synchronized线程完成
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 使用Lock接口的线程
        System.out.println("\n=== 使用Lock接口 ===");
        Thread thread3 = new Thread(new TransactionThread(account, 200, false));
        Thread thread4 = new Thread(new TransactionThread(account, 150, false));
        thread3.start();
        thread4.start();

        // 等待Lock线程完成
        try {
            thread3.join();
            thread4.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

五、线程池

Java的java.util.concurrent(简称JUC)包中提供了一套丰富的线程池工具。
该部分内容参考原文链接

1、Executor接口

Executor接口是JUC包中定义的一个执行器接口,它只有一个execute方法,接收一个Runnable对象作为参数,并执行Runnable中的操作。这个接口非常简单,但它定义了执行器的基本功能。

public interface Executor {
    void execute(Runnable command);
}

在实际应用中,我们通常不会直接使用Executor接口,而是使用它的子接口ExecutorService,它提供了更丰富的功能。

2、ExecutorService接口

ExecutorService接口继承自Executor接口,并增加了关于执行器服务的定义。它提供了一系列的方法,包括关闭执行器、立即关闭、检查执行器是否关闭、等待任务终止、提交有返回值的任务以及批量提交任务等。

public interface ExecutorService extends Executor {
    void shutdown(); //关闭执行器,已提交的任务会继续执行,但不接受新的任务。
    List<Runnable> shutdownNow(); //立即关闭执行器,尝试停止所有正在执行的任务,并返回等待执行的任务列表。
    boolean isShutdown(); //检查执行器是否已关闭。
    boolean isTerminated();  //检查执行器是否已终止,即所有任务都已完成。
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //等待任务终止,如果超过指定时间则返回false。
    <T> Future<T> submit(Callable<T> task); //提交一个有返回值的任务,并返回一个Future对象,通过该对象可以查看任务执行是否完成,并获取返回值。
    <T> Future<T> submit(Runnable task, T result); //提交一个Runnable任务和一个结果值,当任务执行完成后,返回该结果值。注意这个结果值是在任务执行前就确定的,与任务的实际执行结果无关。如果希望获取任务的实际执行结果,应该使用Callable任务。
    Future<?> submit(Runnable task); //提交一个Runnable任务,并返回一个Future对象。由于Runnable任务没有返回值,所以这个Future对象的get方法将返回null。这个方法主要用于将Runnable任务转换为Future对象,以便使用Future的相关功能(如取消任务、检查任务是否完成等)。但这个用法并不常见,因为Runnable任务本身就不支持返回值。更常见的做法是直接使用execute(Runnable command)方法执行Runnable任务。                        
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交Callable任务,并返回一个Future对象的列表。当所有任务都完成后,可以通过这些Future对象获取任务的返回值。如果某个任务执行失败,那么对应的Future对象的get方法将抛出ExecutionException异常。这个方法会等待所有任务都完成后才返回。如果希望设置超时时间,可以使用另一个重载版本的方法。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; //批量提交Callable任务,并返回第一个成功完成的任务的返回值。当找到第一个成功完成的任务后,该方法会立即返回,而不会等待其他任务完成。如果所有任务都失败,那么该方法将抛出ExecutionException异常。这个方法通常用于实现“多个路径中选择一个最快路径”的场景。同样地,这个方法也有一个设置超时时间的重载版本。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

isShutdown() 和 isTerminated()的区别

isShutdown(): 判断线程池是否调用了 shutdown() 或 shutdownNow()。
isTerminated(): 判断线程池是否已完全终止(即所有任务都已完成)。

3、Executors工厂类

Executors是一个工厂类,它提供了一系列静态方法来创建不同类型的线程池。这些线程池都是ExecutorService接口的实现类。通过Executors的工厂方法,我们可以非常方便地创建和管理线程池。

需要注意的是,尽量不要用该类创建线程池。
因为Executors.newCachedThreadPool()根据需求创建线程,最大线程数为Integer.MAX_VALUE,可能导致过多线程消耗资源。
而Executors.newFixedThreadPool(int)和Executors.newSingleThreadExecutor()虽然限制了线程池的数量,但队列最大容量为Integer.MAX_VALUE,也可能导致内存溢出。
因此我们一般使用ThreadPoolExecutor

//创建一个大小为10的固定线程池。
//创建时指定线程池的大小,当有新任务提交时,如果线程池中有空闲线程,则使用空闲线程执行任务;如果没有空闲线程,则新任务会等待直到有线程空闲出来。这种线程池适用于已知并发压力的情况下,对线程数做限制,避免由于大量线程的创建和销毁带来的性能开销。
ExecutorService executor = Executors.newFixedThreadPool(10);  
//创建一个单线程执行器。
//所有提交的任务都会按照提交的顺序依次执行,这种线程池适用于需要保证任务执行顺序的场景。
ExecutorService executor = Executors.newSingleThreadExecutor(); 
//创建一个可缓存的线程池。
//可缓存的线程池,这种线程池会根据需要创建线程来执行任务,并且可以重复利用已存在的线程来执行新的任务。当线程池中的线程在一定时间内没有执行任务时,它会被自动销毁以释放资源。
//这种线程池适用于并发压力较大且任务执行时间较短的场景,如Web服务器处理HTTP请求等。但需要注意的是,在实际应用中我们可能需要更加谨慎地使用CachedThreadPool,因为如果不当使用可能会导致系统资源耗尽(如创建过- 多的线程导致内存溢出等)。因此在使用CachedThreadPool时需要特别关注任务的执行时间和数量以及系统的资源状况等因素。
ExecutorService executor = Executors.newCachedThreadPool(); 

4、Executors示例

@Component
public class ThreadMgr {
    private volatile ExecutorService testThreadPoolExecutor;
    public ExecutorService getTestExecutor() {
        if (testThreadPoolExecutor == null) {
            synchronized (this) {
                if (testThreadPoolExecutor == null) {
                    testThreadPoolExecutor = Executors.newFixedThreadPool(3);
                }
            }
        }
        return testThreadPoolExecutor;
    }
}

@Component
public class TaskInit implements ApplicationRunner {
	@Autowired
    private ThreadMgr threadMgr;

	@Override
    public void run(ApplicationArguments args) {
    	//省略数据库查询等逻辑
    	//获取线程池并执行相关逻辑
    	threadMgr.getTestExecutor().submit(() -> XXX);
    }
}

注意这里的TaskInit去继承ApplicationRunner并把逻辑放在run里面,只是为了程序启动时执行run方法中的内容。而threadMgr.getTaskExecutor().submit(() -> XXX);这一句中lambda表达式后面的部分(即XXX,可以是调用的方法等),实际上还是实现了Runnable的run方法,相当于

threadMgr.getTaskExecutor().submit(new Runnable() {
    @Override
    public void run() {
        // 具体逻辑
        XXX
    }
});

5、一般使用ThreadPoolExecutor而不是Executors

ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。
ThreadPoolExecutor也是Executors的底层实现。

该部分内容参考博客原文

ThreadPoolExecutor构造函数:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler){
	//省略具体内容
}
参数含义是否必填
corePoolSize线程池中核心线程数的最大值
maximumPoolSize线程池中能拥有最多线程数
keepAliveTime表示空闲线程的存活时间
TimeUnitkeepAliveTime的单位
workQueue用于缓存任务的阻塞队列
threadFactory指定创建线程的工厂,不指定时默认使用Executors.defaultThreadFactory()
handler表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略;不指定时默认策略为AbortPolicy(),即抛出异常,由开发人员进行处理。

6、ThreadPoolExecutor示例

@Component
public class ThreadMgr {

    private volatile ExecutorService testThreadPoolExecutor2;
    
    public ExecutorService getTestExecutor2() {
        if (testThreadPoolExecutor2 == null) {
            synchronized (this) {
                if (testThreadPoolExecutor2 == null) {
                	//这里只是给了简单的示例,实际可以通过读取配置等方式限制
                    testThreadPoolExecutor2 = new ThreadPoolExecutor(10,20,6000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(20000));
                }
            }
        }
        return testThreadPoolExecutor2;
    }
}

//其他类中调用
threadMgr.getTestExecutor2().submit(() -> XXX);

注意这里的synchronized 主要用于确保仅在一个线程中初始化 testThreadPoolExecutor2,确保线程池的单例性。
使用线程池中的任务时,仍需确保任务自身的线程安全,特别是涉及共享状态时。

7、线程池的关闭

前文我们已经明白,可以通过ExecutorService类的shutdownNow()和shutdown()去关闭线程池。

shutdownNow()和shutdown()的区别

调用shutdown(); 已提交的任务会继续执行,但不接受新的任务。
但调用shutdownNow(); 则会去尝试停止所有正在执行的任务。但是注意,并不是所有任务都一定会成功被关闭。

shutdownNow() 通过调用 Thread.interrupt() 向所有由 executor 管理的线程发送中断信号。
这些线程的中断标志会被设置为 true,并且如果它们在执行阻塞操作,会抛出InterruptedException。但如果是非阻塞操作,则不会被打断,因此我们在代码中也可以加入if(Thread.currentThread().isInterrupted())的判断,来检查线程是否中断,以及时进行需要的处理。

注意处理InterruptedException

在某些情况下,即使线程被中断,JVM也可能不会抛出 InterruptedException。因此在有阻塞操作的代码块中手动catch InterruptedException,catch后调用Thread.currentThread().interrupt(); 可以确保其他依赖于中断的方法可以正常工作。

什么是阻塞操作?

1、等待与睡眠

Thread.sleep(long millis):使当前线程进入睡眠状态,直到指定时间过去或被中断。
Object.wait():使当前线程等待,直到其他线程调用 notify() 或 notifyAll()。
Thread.join():等待指定线程完成执行。

2、I/O 操作

文件输入输出:如 FileInputStream.read(), FileOutputStream.write()
网络通信:如 Socket.receive(), Socket.send()
数据库操作(查询、事务提交或回滚):如 ResultSet.next(), Statement.executeQuery()

3、 锁与同步

使用 synchronized 关键字的同步块:如果其他线程持有锁,当前线程会阻塞,直到锁被释放。
使用 Lock 接口:如 lock.lock()(如果锁已被占用,线程会阻塞,直到获取锁)。
使用 Semaphore:如果信号量不可用,线程会阻塞,直到信号量被释放。

4、线程通信

使用 Object.wait() 和 Object.notify() 进行线程间通信。
使用 Condition 接口:如 await(),会阻塞线程,直到被信号唤醒。

5、 远程调用

REST API 调用:如 HttpURLConnection 或 OkHttp 发送请求,等待服务器响应。
RPC(远程过程调用):如 gRPC、Dubbo 等,等待远程服务返回结果。

6、其他阻塞操作

使用 BlockingQueue 的 put() 和 take() 方法:如果队列已满(put())或为空(take()),线程会阻塞。
使用 CountDownLatch.await():等待计数器归零。
使用 CyclicBarrier.await():等待所有线程到达屏障。

相关文章:

  • Web自动化中Selenium下Chrome与Edge的Webdriver常用Options参数
  • 鸿蒙app 开发中 对于数组方法 filter 的理解
  • 洛谷B3629
  • C#初级教程(7)——初级期末检测
  • 代码随想录第十六天|二叉树part05--654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树
  • 前端面试-网络协议篇
  • 创建监听器报错“一个或多个listeners启动失败”
  • Java之泛型
  • Windows安装MySQL指南
  • SAP on Microsoft Azure Architecture and Administration (Ravi Kashyap)
  • LangChain系列:精通LangChain的合并文档链
  • 【论文阅读】SAM-CP:将SAM与组合提示结合起来的多功能分割
  • LeetCode 236.二叉树的最近公共祖先
  • 抗辐照加固CAN FD芯片的商业航天与车规级应用解析
  • 常用Web性能指标
  • idea安装硅基流动中免费的deepseek(2025)
  • C++标准库——move和forward
  • Windows辉煌的发展历程
  • AMBA-CHI协议详解(十九)
  • Docker用户的困境:免费项目的减少与成本的增加
  • 第78届戛纳电影节开幕,罗伯特·德尼罗领取终身成就奖
  • 最高降价三成,苹果中国iPhone开启大促销,能拉动多少销量?
  • 挖掘机4月销量同比增17.6%,出口增幅创近两年新高
  • 外交部:中方期待印巴巩固和延续停火势头,避免冲突再起
  • 巫蛊:文化的历史暗流
  • 特朗普将启的中东行会如何影响伊美核谈判?专家分析