线程3 JavaEE(阻塞队列,线程池)
目录
阻塞队列
生产者消费者模型
优点
分布式系统
缺点
java标准库提供的
BlockingQueue
生产者消费者模型代码
模拟实现
线程池
java标准库提供的
参数说明
拒绝策略
工厂设计模式
简化版 Exectors
给线程池加任务
模拟实现
定时器
java标准库提供的
模拟实现
阻塞队列
一种特殊的队列同样满足队列的先进先出原则,天然是线程安全的,带有阻塞的特性(当队列为空时,出队列操作进入阻塞等待入队了使队列不为空,入队列操作,如果队列满阻塞等待出队列使队列不满)
生产者消费者模型
主要目的是减少锁竞争
针对资源分为生产者,消费者 生产资源的是生产者,消耗资源的是消费者 针对不同资源既可以是生产者,也可以是消费者。就像工厂流水线:
机器生产零件,工人打包零件成箱,机械臂搬运箱子
之于零件机器是生产者,工人是消费者,之于箱子工人是生产者,机械臂是消费者
机器生产出零件,工人打包是同时进行的,当机器生产的速度比工人包装快时,工人来不及包装把盛零件的容器满了,机器通知生产等待工人包装以免损坏零件,的同理工人和机械臂也是这样。
优点
1.减少资源竞争,提高效率
2. 更好的降低了模块之间的解耦合(分布式系统)
3.削峰填谷
分布式系统

这样的系统,客户端和服务器通过网络连接 ABC三个服务器直接调用,这时对系统中服务器调整,动一个则牵一发而动全身,耦合比较大,比如改了C,B就可能要改。
此时要想降低耦合就可以使用阻塞队列

在B和C之间加入一个阻塞队列就可以降低耦合,B只需要把信息放到阻塞队列中,C也只需从阻塞队列中取出元素即可,同时这个阻塞队列充当了一个缓冲区的作用,B处理的信息可能C来不及处理,但加入阻塞队列,B就可以把处理好的放进队列中,C有空了从队列中取。B 无需等待 C 处理完成,只需将数据放入队列即可继续处理自身任务,解决了 “生产者(B)速度快于消费者(C)” 时的阻塞问题;同时队列可以暂存请求,避免 C 因突发压力(如瞬时高并发)而崩溃,起到了 “削峰填谷” 的缓冲作用。
缺点
使系统结构更加复杂,引入队列层数太多,网络开销增加,效率会受影响(锁竞争,内存管理,自身维护的开销)
java标准库提供的
BlockingQueue
BlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 括号内填的是队列的最大容量//BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);//入队列 put //queue.put("aaa");//出队列 take //String s = queue.take();//put take 带有阻塞 出发阻塞需要抛出异常
生产者消费者模型代码
package demo;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class demo_productAndConsumer {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>();Thread producer = new Thread(() -> {int count = 0;while(true){try {queue.put("" + count);System.out.println("生产了一个" + count);count++;Thread.sleep(1000); // 可通过时间的修改来控制生产消费速度} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {int count = 0;while(true){try {String s = queue.take();System.out.println("消耗了一个");count--;Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();producer.join();consumer.join();}
}

模拟实现
myBlockingQueue
基于数组写一个不带泛型参数的阻塞队列 以String 为例

package ThreadDemo;public class myBlockingQueue {private String[] arr = null;private int head = 0;private int tail = 0;private int size = 0;private Object locker = new Object();public myBlockingQueue(int cap){this.arr = new String[cap];}public void put(String elem) throws InterruptedException {synchronized (locker) {while (size >= arr.length) {locker.wait();}// 新元素入队arr[tail] = elem; // 修改操作 有线程安全问题tail++;if (tail >= arr.length) {tail = 0;}size++;locker.notify();// 唤醒 take 中的阻塞}}public String take() throws InterruptedException {synchronized (locker) {while (size == 0) {locker.wait();}// 出队String res = arr[head]; // 修改 有线程安全问题head++;size--;if (head >= arr.length) {head = 0;}locker.notify();return res;}}
}
线程池
要创建/销毁的线程多了,创建销毁线程的开销也是很大的,为了提升效率就引入了线程池,提前创建一些线程用的时候直接从池子中调,用完再还给池子。
java标准库提供的
ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory ,RejectedExecutionHandler handler )
参数说明
int corePoolSize 核心线程数 线程创建时就申请的线程当任务多到核心线程处理不过来就会申请一些临时线程,用完销毁但不销毁线程池的情况下,核心线程不会销毁
int maximumPool 最大线程数总数
long keepAliveTime 临时线程最大存活时间 临时线程没任务时最大存活时间
TimeUnit unit 时间单位
BlockLonkingQueue<Runnable> workQueue 线程池要执行的任务
ThreadFactory threadFactory 线程工厂
RejectedExecutionHandler handler 拒绝策略
拒绝策略
线程池的任务队列是一个阻塞队列,当任务队列满了再添加新任务会触发阻塞,实际中我们并不希望除特殊说明发生突发性的阻塞,影响程序。
AlortPolicy 终止 抛出异常
CallerRunsPolicy 调用者负责执行任务 谁请求谁执行
正常应该是线程池中线程执行但现在池中繁忙,谁请求谁执行
DiscardOldestPolicy 丢弃队列中最老的任务
DiscardPolicy 丢弃队列中最新的任务
工厂设计模式
通过“工厂类”统一创建对象,让使用者无需关注对象的具体创建细节,只需通过工厂获取所需实例。
正常通过构造方法创建对象会有一些小问题
例:
表示一个点的坐标有多种表示方法(直角坐标系,极坐标)

通过类的构造方法初始化对象,两种坐标表示方法,要传入不同的参数,这种情况我们的目的是触发方法重载,但两个方法都只有两个参数并不能触发重载。
此时我们就可以创建一个工厂类
通过工厂类(PointBuilder)的工具方法(Builder)静态方法,这时在工厂中无需创建Builder对象可直接通过PointBuiler 直接调用类中的方法
package ThreadDemo;class PointBuilder{public static PointFactory buildPointBy_xy(double x, double y){//针对P初始化return p;}public static PointFactory buildPointBy_ra(double r, double a){//针对P初始化return p;}
}
public class PointFactory {private double x;private double y;public PointFactary(double x, double y){this.x = x;this.y = y;}public static void main(String[] args){PointFactory p = PointBuilder.buildPointBy_xy(1,2);}
}
简化版 Exectors
对 ThreadPoolExecutor 进行了封装 本质上还是工厂类提供了不同工厂方法初始化来ThreadPoolExecutor 来创建线程池
创建格式:Executor

| newFixedThreadPool() | 固定线程数的线程池,最大线程数和核心线程数都为 括号内的值图中为4. |
| newCachedThreadPool() | 核心线程数为零 最大为 int 最大值 2³¹ - 1 |
| newSingleThreadExecutor() | 固定只有一个线程的线程池 |
| newScheduledThreadPool() | 本质为定时器,放到池中的任务会在一定时间后进行 |
给线程池加任务
Collable 有返回值 Runnable 无返回值 两者皆可描述任务
ExecutorService 提供了 submit 方法来提交任务
用法 加100个任务
package demo;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class demo_ExecutorService_submit {private ExecutorService service = null;public demo_ExecutorService_submit(int cap){service = Executors.newFixedThreadPool(cap);}public void putTask(Runnable task){for (int i = 0; i < 100; i++){int id = i;service.submit(new Runnable(){public void run(){System.out.println("执行一个任务" + id);}});}}
}
模拟实现
简单模拟实现固定数目的线程池
通过阻塞队列实现,传入参数限制线程数,线程不断从队列中获取任务并执行。
package demo;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class demo_myFixedThreadPool {private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();public demo_myFixedThreadPool (int n){for(int i = 0; i<n; i++){Thread t = new Thread(() -> {try {while(true){Runnable task = queue.take(); // 取出任务task.run(); // 执行任务}} catch (InterruptedException e) {throw new RuntimeException(e);}});t.start();}}public void putTask(Runnable task) throws InterruptedException {queue.put(task);}
}
定时器
可以按照预定的时间间隔或在特定时间点执行任务
java标准库提供的
Timer 简单定时器
用法: Timer timer = new Timer();
timer.schedule(任务,时间);
package demo;import java.util.Timer;
import java.util.TimerTask;public class demo_Timer {public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("定时器执行任务");}},3000);// 3秒后执行System.out.println("程序启动");}}
模拟实现
同时管理多个任务,用堆组织任务
堆本质是个优先级队列,我们要指定一个优先级规则 用任务执行时间排序
MyTimerTask 类
import java.util.PriorityQueue;class myTimerTask implements Comparable<myTimerTask>{private Runnable task;private long time;private long delay;public myTimerTask(Runnable task,long delay){this.task = task;this.time = System.currentTimeMillis() + delay;}public long getTime(){return time;}public Runnable getTask(){return task;}public int compareTo(myTimerTask o){return (int)(this.time - o.time);}
}
myTimer
启动一个线程,循环从队列中获取任务、判断执行时间,并执行到期任务。
public class myTimer {private PriorityQueue<myTimerTask> queue = new PriorityQueue<>();private Object locker = new Object();public myTimer(){// 创建一个线程 循环的从队列中不断去取出元素,到时间执行,不到时间Thread t = new Thread(()-> {// 队列为空while(true){synchronized(locker){if(queue.isEmpty()){try {locker.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}myTimerTask task = queue.peek();long currentTime = System.currentTimeMillis();if(currentTime < task.getTime()){synchronized(locker){try {locker.wait(task.getTime() - currentTime);} catch (InterruptedException e) {e.printStackTrace();}}}else{task.getTask().run();queue.poll();}}}});t.start();}public void schedule(Runnable task,long delay){synchronized(locker){queue.offer(new myTimerTask(task,delay));locker.notify();}}
测试
public static void main(String[] args) {myTimer timer = new myTimer();timer.schedule(() -> {System.out.println("定时执行任务 3000");}, 3000);timer.schedule(() -> {System.out.println("定时执行任务 2000");}, 2000);timer.schedule(() -> {System.out.println("定时执行任务 1000");}, 1000);}

我是个十七岁的小男孩,我不怕面对世界变多快,做过自己觉得好傻的事,那是多么纯真的年代,
那是多么纯洁的相爱
------- 十七岁 DT
⭐❤️👍
