当前位置: 首页 > news >正文

JAVA中的多线程

目录

什么是并发?(多线程的概念)

线程的创建方式

Thred线程

Runnable接口线程

Callable接口线程

ExecutorService实现线程

线程池

newCachedThreadPool

newFixedThreadPool

newScheduledThreadPool

newSingleThreadExecutor

自定义线程池

线程池的核心参数

线程的生命周期

线程中的方法

线程安全问题

使用同步机制

使用volatile关键字

原子变量类(Atomic)

使用线程安全的集合类

避免共享可变状态


什么是并发?(多线程的概念)

  • 并发指的是在同一时间段内,有多个任务交替执行,看起来是同时进行的。它提高了程序的资源利用率和响应能力

  • 多线程是实现并发的主要方式之一。一个程序可以启动多个线程,多个线程之间可以同时执行不同的任务

  • 单核CPU上,多线程通过时间片轮转模拟并发;多核CPU上,线程可以真正并行执行。

线程的创建方式

Java中主要有三种创建线程的方式

Thred线程

Thread类是创建线程的方式之一,通过继承Thread然后重新run()方法,然后调用start()方法启动线程。

/** Thread线程测试方法*/
public class ThreadTest extends Thread {
​@Overridepublic void run() {try {// 模拟线程执行任务System.out.println("线程 " + Thread.currentThread().getName() + " 正在执行任务...");Thread.sleep(2000); // 模拟任务耗时System.out.println("线程 " + Thread.currentThread().getName() + " 任务完成");} catch (InterruptedException e) {System.err.println("线程 " + Thread.currentThread().getName() + " 被中断: " + e.getMessage());}}
​public static void main(String[] args) {ThreadTest thread1 = new ThreadTest();ThreadTest thread2 = new ThreadTest();ThreadTest thread3 = new ThreadTest();ThreadTest thread4 = new ThreadTest();ThreadTest thread5 = new ThreadTest();ThreadTest thread6 = new ThreadTest();thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();}
}

Runnable接口线程

runnable也是实现线程的方法之一,通过实现runnable接口重写run方法,然后创建Thread对象,将runnable对象传递,调用start方法

/** 线程runnable测试*/ 
public class RunnableTest implements Runnable{@Overridepublic void run() {try {// 模拟线程执行任务System.out.println("线程Runnable " + Thread.currentThread().getName() + " 正在执行任务...");Thread.sleep(2000); // 模拟任务耗时System.out.println("线程Runnable " + Thread.currentThread().getName() + " 任务完成");} catch (InterruptedException e) {System.err.println("线程Runnable " + Thread.currentThread().getName() + " 被中断: " + e.getMessage());}}
​public static void main(String[] args) {RunnableTest runnable1 = new RunnableTest();RunnableTest runnable2 = new RunnableTest();RunnableTest runnable3 = new RunnableTest();RunnableTest runnable4 = new RunnableTest();RunnableTest runnable5 = new RunnableTest();RunnableTest runnable6 = new RunnableTest();
​Thread thread1 = new Thread(runnable1);Thread thread2 = new Thread(runnable2);Thread thread3 = new Thread(runnable3);Thread thread4 = new Thread(runnable4);Thread thread5 = new Thread(runnable5);Thread thread6 = new Thread(runnable6);
​thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();}
}

Callable接口线程

callable接口也可以实现线程,与runnable不同的是,它可以实现带返回值的结果

/**  Callable测试方法*/
public class CallableTest implements Callable {@Overridepublic Object call() throws Exception {try {// 模拟任务执行System.out.println("Callable " + Thread.currentThread().getName() + " 正在执行任务...");Thread.sleep(2000); // 模拟任务耗时System.out.println("Callable " + Thread.currentThread().getName() + " 任务完成");return "任务完成";} catch (InterruptedException e) {System.err.println("Callable " + Thread.currentThread().getName() + " 被中断: " + e.getMessage());return "任务被中断";}}
​public static void main(String[] args) {CallableTest callable1 = new CallableTest();CallableTest callable2 = new CallableTest();CallableTest callable3 = new CallableTest();CallableTest callable4 = new CallableTest();CallableTest callable5 = new CallableTest();CallableTest callable6 = new CallableTest();
​Thread thread1 = new Thread(() -> {try {System.out.println(callable1.call());} catch (Exception e) {e.printStackTrace();}});Thread thread2 = new Thread(() -> {try {System.out.println(callable2.call());} catch (Exception e) {e.printStackTrace();}});Thread thread3 = new Thread(() -> {try {System.out.println(callable3.call());} catch (Exception e) {e.printStackTrace();}});Thread thread4 = new Thread(() -> {try {System.out.println(callable4.call());} catch (Exception e) {e.printStackTrace();}});Thread thread5 = new Thread(() -> {try {System.out.println(callable5.call());} catch (Exception e) {e.printStackTrace();}});Thread thread6 = new Thread(() -> {try {System.out.println(callable6.call());} catch (Exception e) {e.printStackTrace();}});
​thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();}
}

ExecutorService实现线程

有返回值的任务必须实现 Callable 接口,类似的,无返回值的任务必须 Runnable 接口。执行 Callable 任务后,可以获取一个 Future 的对象,在该对象上调用 get 就可以获取到 Callable 任务 返回的 Object 了,再结合线程池接口 ExecutorService 就可以实现传说中有返回结果的多线程。

线程池

Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而 只是一个执行线程的工具。真正的线程池接口是 ExecutorServiceExecutorService实现的线程池总共有4种。

newCachedThreadPool

创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造 的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并 从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。

特点

  • 线程数量动态调整:无核心线程,最大线程数为 Integer.MAX_VALUE

  • 空闲线程回收:线程空闲 60 秒后自动销毁。

  • 任务队列:使用 SynchronousQueue(无容量队列,任务直接交给线程或创建新线程)。

优点

  • 弹性伸缩:适合短期异步任务,避免频繁创建/销毁线程的开销。

  • 自动回收:长时间空闲时不占用资源。

缺点

  • 线程数量无上限,可能耗尽系统资源(如大量并发时)。

适用场景

  • 高频短任务:例如 HTTP 请求处理、快速计算任务。

  • 测试/原型开发:快速验证异步逻辑。

newFixedThreadPool

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数Threads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何 线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

特点

  • 固定线程数:核心线程数 = 最大线程数(由用户指定)。

  • 任务队列:无界队列(LinkedBlockingQueue),任务堆积可能导致 OOM。

  • 线程存活:即使空闲也不会被回收,除非调用 shutdown()

优点

  • 资源可控:避免线程数量爆炸。

  • 稳定吞吐量:适合长期运行的任务。

缺点

  • 无界队列可能导致内存溢出(如任务提交速度远高于处理速度)。

适用场景

  • CPU 密集型任务:线程数通常设为 CPU 核数 + 1。

  • 稳定负载场景:例如后台批处理、定时统计。

newScheduledThreadPool

创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

特点

  • 支持定时/周期性任务:通过 schedule()scheduleAtFixedRate() 等方法实现。

  • 核心线程固定,但可扩展(通过 DelayedWorkQueue 管理任务)。

优点

  • 精准调度:适合需要延迟或定期执行的任务。

缺点

  • 复杂任务调度可能引发并发问题(需自行处理同步)。

适用场景

  • 定时任务:如每天凌晨数据备份。

  • 延迟任务:如订单超时取消。

  • 周期性任务:如心跳检测。

newSingleThreadExecutor

Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程 池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!

特点

  • 只有一个线程:保证任务按提交顺序串行执行。

  • 自动恢复:线程异常终止后会新建一个线程继续执行后续任务。

  • 任务队列:无界队列(LinkedBlockingQueue)。

优点

  • 顺序执行:避免多线程并发问题(如数据竞争)。

  • 容错性:线程崩溃不影响后续任务。

缺点

  • 吞吐量低,不适合高并发场景。

适用场景

  • 任务需严格顺序执行:如日志写入、单线程消费队列。

  • 简单异步化:如 GUI 的事件分发线程(EDT)。

package com.xiancheng;
​
​
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
​
/** ExecutorService测试类*/
public class ExecutorServiceTest {public static void main(String[] args) throws InterruptedException {// 创建固定大小的线程池//ExecutorService executorService = Executors.newFixedThreadPool(5);
​// 创建可缓存的线程池//ExecutorService executorService = Executors.newCachedThreadPool();
​// 创建单线程的线程池//ExecutorService executorService = Executors.newSingleThreadExecutor();
​//创建scheduled线程池ExecutorService executorService = Executors.newScheduledThreadPool(5);
​CountDownLatch latch = new CountDownLatch(5); // 用于等待所有任务完成
​// 提交5个任务到线程池for (int i = 0; i < 5; i++) {final int taskId = i;Future<String> future = executorService.submit(() -> {try {System.out.println("任务 " + taskId + " 开始执行,线程: " + Thread.currentThread().getName());long startTime = System.currentTimeMillis();
​Thread.sleep(2000); // 模拟任务耗时
​long endTime = System.currentTimeMillis();System.out.println("任务 " + taskId + " 完成,耗时: " + (endTime - startTime) + "ms");return "任务" + taskId + "成功完成";} catch (InterruptedException e) {System.err.println("任务 " + taskId + " 被中断: " + e.getMessage());Thread.currentThread().interrupt();return "任务" + taskId + "被中断";} finally {latch.countDown(); // 任务完成时计数减1}});}
​// 关闭线程池(不再接受新任务)executorService.shutdown();
​// 等待所有任务完成latch.await();System.out.println("所有任务已完成");
​// 检查线程池是否完全终止if (executorService.isTerminated()) {System.out.println("线程池已完全终止");} else {System.out.println("线程池仍在终止过程中...");}}
}
​

自定义线程池

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
​
/*** 自定义线程池工具类,提供线程池的创建、任务提交和优雅关闭功能*/
public final class ThreadPoolUtil {private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;private static final int DEFAULT_MAX_POOL_SIZE = DEFAULT_CORE_POOL_SIZE * 2;private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;private static final int DEFAULT_QUEUE_CAPACITY = 1000;
​private ThreadPoolUtil() {// 私有构造函数,防止实例化}
​/*** 创建固定大小的线程池* @param poolSize 线程池大小* @param threadNamePrefix 线程名称前缀* @return 线程池实例*/public static ExecutorService createFixedThreadPool(int poolSize, String threadNamePrefix) {return new ThreadPoolExecutor(poolSize,poolSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),createThreadFactory(threadNamePrefix),new ThreadPoolExecutor.CallerRunsPolicy());}
​/*** 创建可扩展的线程池* @param corePoolSize 核心线程数* @param maxPoolSize 最大线程数* @param queueCapacity 任务队列容量* @param threadNamePrefix 线程名称前缀* @return 线程池实例*/public static ExecutorService createScalableThreadPool(int corePoolSize,int maxPoolSize,int queueCapacity,String threadNamePrefix) {return new ThreadPoolExecutor(corePoolSize,maxPoolSize,DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),createThreadFactory(threadNamePrefix),new ThreadPoolExecutor.CallerRunsPolicy());}
​/*** 创建IO密集型任务的线程池(线程数通常设置为CPU核心数的2倍)* @param threadNamePrefix 线程名称前缀* @return 线程池实例*/public static ExecutorService createIOIntensivePool(String threadNamePrefix) {return createScalableThreadPool(DEFAULT_CORE_POOL_SIZE,DEFAULT_MAX_POOL_SIZE,DEFAULT_QUEUE_CAPACITY,threadNamePrefix);}
​/*** 创建线程工厂* @param threadNamePrefix 线程名称前缀* @return 线程工厂实例*/private static ThreadFactory createThreadFactory(String threadNamePrefix) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(1);
​@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, threadNamePrefix + "-" + threadNumber.getAndIncrement());if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}};}
​/*** 提交任务到线程池并获取Future* @param executor 线程池* @param task 任务* @param <T> 返回值类型* @return Future对象*/public static <T> Future<T> submitTask(ExecutorService executor, Callable<T> task) {if (executor == null || executor.isShutdown() || executor.isTerminated()) {throw new IllegalStateException("线程池不可用");}return executor.submit(task);}
​/*** 提交Runnable任务到线程池* @param executor 线程池* @param task 任务* @return Future对象*/public static Future<?> submitTask(ExecutorService executor, Runnable task) {if (executor == null || executor.isShutdown() || executor.isTerminated()) {throw new IllegalStateException("线程池不可用");}return executor.submit(task);}
​/*** 优雅关闭线程池* @param executor 线程池*/public static void shutdownGracefully(ExecutorService executor) {if (executor == null || executor.isTerminated()) {return;}
​try {// 禁止新任务提交executor.shutdown();// 等待未完成任务结束if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {// 超时后强制关闭executor.shutdownNow();// 再次等待任务终止if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {System.err.println("线程池关闭失败");}}} catch (InterruptedException e) {// 重新调用shutdownNowexecutor.shutdownNow();// 保留中断状态Thread.currentThread().interrupt();}}
​/*** 获取线程池状态信息* @param executor 线程池* @return 状态信息字符串*/public static String getThreadPoolStatus(ExecutorService executor) {if (!(executor instanceof ThreadPoolExecutor)) {return "无法获取状态: 不是ThreadPoolExecutor实例";}
​ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;return String.format("线程池状态: 核心线程数=%d, 最大线程数=%d, 当前线程数=%d, " +"活跃线程数=%d, 已完成任务数=%d, 等待任务数=%d, 任务总数=%d",pool.getCorePoolSize(),pool.getMaximumPoolSize(),pool.getPoolSize(),pool.getActiveCount(),pool.getCompletedTaskCount(),pool.getQueue().size(),pool.getTaskCount());}
}

线程池的核心参数

参数名称作用
corePoolSize核心线程数,线程池始终保持的线程数量,即使空闲也不会销毁
maximumPoolSize最大线程数,线程池允许的最大线程数量
keepAliveTime非核心线程空闲后的存活时间,超过则销毁非核心线程
unitkeepAliveTime的时间单位,比如秒、毫秒
workQueue任务队列,用来存储等待执行的任务(如LinkedBlockingQueueArrayBlockingQueue等)
threadFactory线程工厂,用来创建线程
handler饱和策略(拒绝策略),线程池满时如何处理新任务(如抛异常、放弃任务、调用者执行等)

以下为线程任务执行的全过程,包括这些参数的作用

当我们submit一个任务的时候,会先判断当前的线程任务 < corePoolSize,如果是就会创建一个核心线程去执行该条任务,反之会进入阻塞队列等待并且等待空闲线程去执行任务。当任务队列满了,会采取建立非核心线程去执行任务,直到=maximumPoolSize如果超过这个线程就会执行任务队列的拒绝策略(抛弃任务、抛出异常等)。当任务逐渐变少,临时线程会在最大存活时间之后自动销毁。

线程的生命周期

状态说明
新建(New)线程对象创建,调用构造器,但尚未启动
就绪(Runnable)线程调用start()后,等待CPU调度执行
运行(Running)线程获得CPU时间,执行代码
阻塞(Blocked/Waiting/Timed Waiting)线程等待某种条件,如等待I/O、等待锁、调用sleep()wait()
终止(Terminated)线程执行完成或被异常终止,线程生命周期结束

线程中的方法

方法名说明
start()启动线程,调用后线程进入就绪状态,准备运行
run()线程执行体,线程启动后执行的方法,不能直接调用run()来启动线程,否则变成普通方法调用
sleep(long millis)让当前线程暂停指定时间(毫秒),进入阻塞状态,不释放锁
join()当前线程等待调用join()的线程执行完成再继续执行
yield()暂停当前线程,礼让给其他同优先级线程执行,但不保证立即暂停
interrupt()中断线程,设置线程的中断标志,可以用于结束阻塞状态下的线程
isInterrupted()检查线程是否被中断(不清除中断标志)
setDaemon(boolean)设置线程是否为守护线程,守护线程会随着所有用户线程结束而结束
getName()/setName()获取或设置线程名称
getPriority()/setPriority(int)获取或设置线程优先级,范围1~10,默认5
currentThread()静态方法,获取当前执行代码的线程对象

线程安全问题

使用同步机制

  • synchronized关键字 修饰代码块或方法,保证同一时刻只有一个线程执行被保护的代码。

    public synchronized void increment() {count++;
    }

  • 显示锁(Lock接口) 例如ReentrantLock,提供比synchronized更灵活的锁控制。

    Lock lock = new ReentrantLock();
    lock.lock();
    try {count++;
    } finally {lock.unlock();
    }

使用volatile关键字

保证变量的可见性,禁止指令重排序,但不保证原子性。

private volatile boolean flag = true;

适用于状态标志的可见性控制。

原子变量类(Atomic)

Java提供java.util.concurrent.atomic包下的原子变量类,比如AtomicInteger,提供原子操作,无需加锁。

AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();

使用线程安全的集合类

ConcurrentHashMapCopyOnWriteArrayList,代替普通的HashMapArrayList

避免共享可变状态

设计无状态或者不可变对象,减少共享变量。

相关文章:

  • 对DOM操作 与 jQuery的简单理解(通俗
  • 【数组和二分查找】
  • 鹰盾播放器禁止录屏操作的深度技术解析与全栈实现方案
  • 《高等数学》(同济大学·第7版)第三章第五节“函数的极值与最大值最小值“
  • SpringDoc集成到Springboot
  • 【PhysUnits】17.5 实现常量除法(div.rs)
  • git clone 时报错超时的问题解决方案
  • windows mysql zip部署
  • 国产 AI 绘画新标杆:HiDream-I1 的技术突破与创作
  • Python入门手册:常用的Python标准库
  • 企业中使用 MCP Server 实现业务打通
  • 全国大学生计算机应用能力与数字素养大赛 C语言程序设计赛项——本科组练习
  • 人工智能增强入侵检测系统以对抗高级持续性杀伤链
  • 《信号与系统》第 7 章 采样
  • 1.一起学习仓颉-编译环境,ide,输出hello,world
  • 鹰盾加密器基于AI的视频个性化压缩技术深度解析:从智能分析到无损压缩实践
  • Pytest断言全解析:掌握测试验证的核心艺术
  • Spring Boot 4.0.0 新特性详解:深入解读 Spring Framework 7.0.0
  • 通过Wrangler CLI在worker中创建数据库和表
  • 【群体结构 ADMIXTURE之一】: fast ancestry estimation
  • 网站制作布局/海南快速seo排名优化
  • 日本不限制内容服务器/seo系统培训课程
  • 淮北做网站的公司/免费网站建设制作
  • 新疆人防建设网站/最新疫情消息
  • 做网站用win2008系统/网站域名ip查询
  • 思创医惠网站建设/seo排名点击工具