当前位置: 首页 > news >正文

java 多线程之Worker Thread模式(Thread Pool模式)

Worker Thread模式

Worker的意思是工作的人,在Worker Thread模式中,工人线程Worker thread会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来。

Worker Thread模式也被成为Background Thread(背景线程)模式,另外,如果从保存多个工人线程的场所这一点看,我们也可以称这种模式为Thread Pool模式。

Worker Thread模式中的角色

TransPortThread [将部件塞到传送带上](委托者)
创建表示工作请求的Cargo并将其传递给SimpleChannel 。在示例程序中,TransPortThread 相当于该角色。

2.SimpleChannel [传送带的角色](通信线路)
SimpleChannel 角色接受来自于TransPortThread 的Cargo,并将其传递给WorkerThread。在示例程序中,SimpleChannel 相当于该角色。

3.WorkerThread(工人)
WorkerThread角色从Channel中获取Cargo,并进行工作,当一项工作完成后,它会继续去获取另外的Cargo,在示例程序中,WorkerThread相当于该角色。

4.Cargo(部件)
Cargo角色是表示工作中的部件,Cargo角色中保存了进行工作所必须的信息,在示例程序中,Cargo相当于该部件角色。

Worker Thread使用场景

想象这样一个场景,一个工厂在生产货物,在一个车间里,有几个工人,每次生产部件准备后,车间外的人就将部件放到车间的传送带上,工人每次做完一个货品就从桌子上取部件。在这里,注意到,部件并不是直接交给工人的,另外一点,工人并不是做完一个部件就回家换个新人,后者在现实有点滑稽,但是在程序中却对应一个典型的线程使用方法:线程池。

所谓线程池,就是对线程的复用,当线程执行完任务之后就继续取其他任务执行,而不是销毁启动新线程执行其他任务。因为线程的启动对于系统性能开销比较大,所以这样对于系统性能的提高很有好处。

Main.java
public class WorkClient {public static void main(String[] args) {// 创建容量为5的SimpleChannel实例(数据缓冲通道)SimpleChannel simpleChannel = new SimpleChannel(5);// 启动工作通道开始处理数据(消费者启动)simpleChannel.startWork();// 启动三个数据传输线程(生产者角色)new TransPortThread("june", simpleChannel).start();new TransPortThread("july", simpleChannel).start();new TransPortThread("alen", simpleChannel).start();}}
Cargo.java

角色:部件或货物

import java.util.Optional;/*** 货物类,包含货物编号和名称,并提供执行操作时打印线程执行信息的功能** @author [请填写作者名称]* @version 1.0*/
public class Cargo {/*** 货物名称*/private final String name;/*** 货物编号*/private final int num;/*** 构造方法,初始化货物信息** @param name 货物名称* @param num  货物编号*/public Cargo(String name, int num) {this.name = name;this.num = num;}/*** 执行货物处理操作,打印当前线程名称和货物信息*/public void execute(){Optional.of(Thread.currentThread().getName()+" executed "+this.toString()).ifPresent(System.out::println);}/*** 获取货物信息的字符串表示** @return 格式化后的货物信息(例如:Cargo ==> NO.123 Name.ItemA)*/@Overridepublic String toString() {return "Cargo ==> NO."+num+" Name."+name;}
}
SimpleChannel.java

职责:将部件或货物从源头传给处理端

import java.util.Arrays;/*** 简单线程通信通道,管理货物队列和工作线程池* 使用环形缓冲区实现生产者-消费者模式,通过synchronized和wait/notify机制保证线程安全*/
public class SimpleChannel {/** 缓冲区最大容量 */private static final int BUFFER_SIZE = 100;/** 环形缓冲区数组,存储货物对象 */private final Cargo[] cargoQueue;/** 队列头指针,指向下一个要取出的元素位置 */private int headIndex;/** 当前队列中的元素数量 */private int count;/** 队列尾指针,指向下一个要插入的元素位置 */private int tailIndex;/** 工作线程池 */private final WorkerThread[] workerPool;/*** 构造方法初始化通道* @param workerCount 工作线程数量*/public SimpleChannel(int workerCount) {this.cargoQueue = new Cargo[BUFFER_SIZE];this.headIndex = 0;this.count = 0;this.tailIndex = 0;this.workerPool = new WorkerThread[workerCount];this.init();}/*** 初始化工作线程池* 为每个线程设置名称并绑定当前通道实例*/private void init() {for (int i = 0; i < workerPool.length; i++) {workerPool[i] = new WorkerThread("worker-"+i, this);}}/*** 启动所有工作线程开始工作* 通过遍历线程池调用start()方法启动每个线程*/public void startWork() {Arrays.asList(workerPool).forEach(WorkerThread::start);}/*** 生产者添加货物到缓冲区* @param cargo 要添加的货物对象* @throws InterruptedException 线程中断异常*/public synchronized void pushCargo(Cargo cargo) {while (count >= BUFFER_SIZE) {try {this.wait(); // 缓冲区满时等待} catch (InterruptedException e) {// 异常处理(建议补充日志)}}// 添加货物到尾部并更新指针this.cargoQueue[tailIndex] = cargo;this.tailIndex = (this.tailIndex + 1) % this.cargoQueue.length;this.count++;this.notifyAll(); // 通知等待线程}/*** 消费者从缓冲区取出货物* @return 取出的货物对象* @throws InterruptedException 线程中断异常*/public synchronized Cargo popCargo() {while (count <= 0) {try {this.wait(); // 缓冲区空时等待} catch (InterruptedException e) {// 异常处理(建议补充日志)}}// 取出头部元素并更新指针Cargo cargo = this.cargoQueue[headIndex];this.headIndex = (this.headIndex + 1) % this.cargoQueue.length;this.count--;this.notifyAll(); // 通知等待线程return cargo;}
}
TransPortThread .java

职责:将请求的信息塞到传输通道上去

/*** 运输线程,负责持续生成货物并推送到传输通道 {@link SimpleChannel}*/
public class TransPortThread extends Thread {private SimpleChannel simpleChannel;/*** 静态随机数生成器,用于生成随机休眠时间*/private static final Random RANDOM = new Random(System.currentTimeMillis());/*** 构造方法** @param name 线程名称* @param simpleChannel 货物传输通道*/public TransPortThread(String name, SimpleChannel simpleChannel) {super(name);this.simpleChannel = simpleChannel;}@Overridepublic void run() {try {int i = 0;// 无限循环持续生成货物while (true){Cargo cargo = new Cargo(getName(), i++); // 创建货物对象(包含线程名和序号)this.simpleChannel.pushCargo(cargo);   // 将货物推送到通道Thread.sleep(RANDOM.nextInt(1000));    // 随机休眠 0-1000ms}} catch (InterruptedException e) {// 线程中断时的异常处理(当前空实现)}}
}
WorkerThread.java

职责:从传输带上获取部件或货物

import java.util.Random;
/*** 工作线程类,负责从共享通道中持续获取任务并执行*/
public class WorkerThread extends Thread {/*** 存储任务通道实例,用于获取待处理的任务对象*/private final SimpleChannel simpleChannel;/*** 静态随机数生成器,用于生成随机休眠时间(模拟任务执行间隔)*/private static final Random random = new Random(System.currentTimeMillis());/*** 构造方法初始化线程名称和任务通道* @param name 线程名称标识* @param simpleChannel 任务通道对象,用于获取待处理任务*/public WorkerThread(String name, SimpleChannel simpleChannel) {super(name);this.simpleChannel = simpleChannel;}/*** 线程执行主体方法,持续循环执行以下操作:* 1. 从通道中获取任务对象* 2. 执行任务* 3. 随机休眠(模拟任务处理间隔)*/@Overridepublic void run() {while (true) {try {// 从通道中获取任务对象Cargo cargo = simpleChannel.popCargo();// 执行任务逻辑cargo.execute();// 随机休眠0-1000毫秒(模拟任务间隔)Thread.sleep(random.nextInt(1000));} catch (Exception e) {// 异常处理:打印堆栈信息(实际应用中建议添加更完善的异常处理)e.printStackTrace();}}}
}

执行的结果

worker-1 executed Cargo ==> NO.0 Name.alen
worker-0 executed Cargo ==> NO.0 Name.june
worker-4 executed Cargo ==> NO.0 Name.july
worker-3 executed Cargo ==> NO.1 Name.june
worker-0 executed Cargo ==> NO.1 Name.alen
worker-2 executed Cargo ==> NO.1 Name.july
worker-1 executed Cargo ==> NO.2 Name.june
worker-0 executed Cargo ==> NO.2 Name.july
worker-2 executed Cargo ==> NO.2 Name.alen
worker-3 executed Cargo ==> NO.3 Name.june
worker-4 executed Cargo ==> NO.4 Name.june
worker-2 executed Cargo ==> NO.3 Name.alen
worker-3 executed Cargo ==> NO.3 Name.july
worker-4 executed Cargo ==> NO.5 Name.june
worker-1 executed Cargo ==> NO.6 Name.june
worker-0 executed Cargo ==> NO.4 Name.alen
worker-2 executed Cargo ==> NO.7 Name.june
worker-3 executed Cargo ==> NO.5 Name.alen
worker-2 executed Cargo ==> NO.4 Name.july
worker-4 executed Cargo ==> NO.6 Name.alen
worker-0 executed Cargo ==> NO.5 Name.july
worker-2 executed Cargo ==> NO.8 Name.june
worker-3 executed Cargo ==> NO.7 Name.alen
worker-1 executed Cargo ==> NO.6 Name.july
worker-4 executed Cargo ==> NO.8 Name.alen
worker-4 executed Cargo ==> NO.9 Name.june
worker-2 executed Cargo ==> NO.10 Name.june
worker-3 executed Cargo ==> NO.9 Name.alen
worker-1 executed Cargo ==> NO.7 Name.july
worker-2 executed Cargo ==> NO.10 Name.alen
worker-4 executed Cargo ==> NO.11 Name.alen
worker-3 executed Cargo ==> NO.11 Name.june
worker-0 executed Cargo ==> NO.8 Name.july
worker-4 executed Cargo ==> NO.12 Name.alen
worker-3 executed Cargo ==> NO.12 Name.june
worker-4 executed Cargo ==> NO.13 Name.alen
worker-1 executed Cargo ==> NO.9 Name.july
worker-2 executed Cargo ==> NO.14 Name.alen
worker-0 executed Cargo ==> NO.13 Name.june
worker-4 executed Cargo ==> NO.15 Name.alen
worker-2 executed Cargo ==> NO.10 Name.july
worker-1 executed Cargo ==> NO.14 Name.june
worker-3 executed Cargo ==> NO.15 Name.june
worker-4 executed Cargo ==> NO.16 Name.alen
worker-3 executed Cargo ==> NO.11 Name.july
worker-2 executed Cargo ==> NO.12 Name.july
worker-0 executed Cargo ==> NO.16 Name.june
worker-1 executed Cargo ==> NO.17 Name.june
worker-2 executed Cargo ==> NO.18 Name.june
worker-2 executed Cargo ==> NO.17 Name.alen
worker-4 executed Cargo ==> NO.13 Name.july
worker-0 executed Cargo ==> NO.18 Name.alen
worker-0 executed Cargo ==> NO.19 Name.june
worker-3 executed Cargo ==> NO.14 Name.july
worker-1 executed Cargo ==> NO.19 Name.alen
worker-2 executed Cargo ==> NO.20 Name.june
worker-3 executed Cargo ==> NO.21 Name.june
worker-0 executed Cargo ==> NO.15 Name.july
worker-3 executed Cargo ==> NO.20 Name.alen
worker-4 executed Cargo ==> NO.21 Name.alen
worker-3 executed Cargo ==> NO.22 Name.june
worker-1 executed Cargo ==> NO.16 Name.july
worker-2 executed Cargo ==> NO.17 Name.july
worker-0 executed Cargo ==> NO.23 Name.june

相关文章:

  • 基于Django框架的图书索引智能排序系统设计与实现(源码+lw+部署文档+讲解),源码可白嫖!
  • 大数据开发核心技术难点:数据倾斜问题深度解析
  • docker harbor私有仓库登录报错
  • CASS 用户坐标系转换到世界坐标系
  • 阿里云ECS访问不了
  • 【NLP 64、基于LLM的垂直领域【特定领域】问答方案】
  • Java与MySQL数据库连接的JDBC驱动配置教程
  • ORA-00600: internal error code, arguments: [kcratr_nab_less_than_odr], [1],
  • RabbitMQ原理及代码示例
  • ESP32之OTA固件升级流程,基于VSCode环境下的ESP-IDF开发,基于阿里云物联网平台MQTT-TLS连接通信(附源码)
  • 2025华中杯B题——AI实现
  • Ubuntu20.04配置cartographer记录
  • 函数递归:递归的概念
  • C#日志辅助类(Log4Net)实现
  • Redis之全局唯一ID
  • 2. 判断列表元素的单一性
  • IO、存储、硬盘、文件系统相关常识
  • IT资产管理(一)之GLPI安装及部署
  • 【信息系统项目管理师】高分论文:论信息系统项目的质量管理(视频大数据平台项目)
  • 数智化招标采购系统分类及功能亮点
  • 中美“第二阶段”贸易协定是否会在会谈中提出?商务部回应
  • 视频|漫画家寂地:古老丝路上的文化与交流留下的独特印记
  • 首家股份行旗下AIC来了,兴银金融资产投资有限公司获批筹建
  • 中标多家学校采购项目的App查成绩需付费?涉事公司回应
  • “穿越看洪武”,明太祖及其皇后像台北故宫博物院南园展出
  • 今年五一档电影票房已破7亿