JavaEE初阶——多线程3(案例)
目录
1.单例模式
饿汉模式
懒汉模式
改进1:
改进2:
2.阻塞队列
生产者和消费者模型
标准库中的阻塞队列
如何自己实现一个阻塞队列
3.线程池
Executors创建线程池的几种方式
再谈ThreadPoolExecutor提供的参数
工厂模式
实现一个线程池
4.定时器
标准库中的定时器
实现一个定时器
1.单例模式
能保证某个类在程序中只能存在唯一一份实例,而不能创建多个实例(强制要求一个类不能创建多个对象)
饿汉模式
通俗理解为尽量早的去创建线程
//通过类加载的同时,去创建实例
public class Singleton {
private static Singleton instance =new Singleton();
//private 是构造方法这里的点睛之笔
private Singleton(){
}
//后续通过getInstance方法获得实例
public static Singleton getInstance(){
return instance;
}
}public class Singleton {private static Singleton instance =new Singleton();//private 是构造方法这里的点睛之笔Singleton(){} //后续通过getInstance方法获得实例public static Singleton getInstance(){return instance;} }
懒汉模式
通俗理解为尽量晚的去创建线程
//类加载时不创建实例,第一次使用时创建实例
public class Singleton {
private static Singleton instance=null;
private Singleton(){
}
public static Singleton getInstance(){
//如果没有创建实例,那么此时就创建实例(第一次使用时创建实例)
if (instance==null){
instance=new Singleton();
}
return instance;
}
}public class Singleton1{private static Singleton instance=null;private Singleton(){}public static Singleton getInstance(){//如果没有创建实例,那么此时就创建实例if (instance==null){instance=new Singleton();}return instance;} }
线程安全问题会出现在首次创建实例的时候,当多个线程同时调用getiInstance方法时,可能会创建多个案例
改进1:
使用synchronized加锁,解决线程安全问题
public class Singleton1 {
private static Singleton instance=null;
private Singleton1(){
}
public synchronized static Singleton getInstance(){
//如果没有创建实例,那么此时就创建实例
if (instance==null){
instance=new Singleton();
}
return instance;
}
}public class Singleton1 {private static Singleton instance=null;private Singleton1(){}public synchronized static Singleton getInstance(){//如果没有创建实例,那么此时就创建实例if (instance==null){instance=new Singleton();}return instance;} }
由于加锁/解锁时开销较大,同时也会触发指令重排序问题
改进2:
通过使用volatile关键字和双重if判定
public class Singleton2 {
//加入volatile关键字
//1.确保每次读操作,读取的是内存
//2.关于变量的读取和修改,不会触发指令重排序问题
private static volatile Singleton instance=new Singleton();
private static Object lock=new Object();
private Singleton2(){
}
public static Singleton getInstance(){
//判断是否加锁
if (instance==null){
synchronized (lock){
//判断是否需要new一个对象
if(instance==null){
instance=new Singleton();
//可能会触发指令重排序问题
}
}
}
return instance;
}
}package JavaThread;public class Singleton2 {//加入volatile关键字//1.确保每次读操作,读取的是内存//2.关于变量的读取和修改,不会触发指令重排序问题private static volatile Singleton instance=new Singleton();private static Object lock=new Object();private Singleton2(){}public static Singleton getInstance(){//判断是否加锁if (instance==null){synchronized (lock){//判断是否需要new一个对象if(instance==null){instance=new Singleton();//可能会触发指令重排序问题}}}return instance;} }
2.阻塞队列
阻塞队列是一种特殊的队列,同时遵循“先进先出”的原则
阻塞特性:
1.当队列满时,如果继续入队列就会阻塞等待,直到有其他线程取走元素为止
2.当队列空时,如果继续出队列就会阻塞等待,直到有其他线程插入元素为止
阻塞队列典型的应用场景:生产者和消费者模型
生产者和消费者模型
优势:
1.削峰填谷 --》相当于一个缓冲区,平衡生产者和消费者之间的处理能力
2.解耦合 --》生产者和消费者之间(修改代码)
标准库中的阻塞队列
//BlockingQueue是一个接口 真正实现类的是 LinkBlockingQueue
BlockingQueue<String> queue=new LinkedBlockingQueue<>();
//put方法阻塞式入队列,take方法阻塞式出队列
//入队列
queue.put();
//出队列 如果没put就take就会阻塞
String elem=queue.take();
//BlockingQueue也有offer,poll,peek方法, 不带有阻塞特性
生产者和消费者模型:
package JavaThread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class custom_produce{public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();Thread customer=new Thread(()->{while (true){try {int value= blockingQueue.take();System.out.println("消费元素"+value);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"消费者");customer.start();Thread producter=new Thread(()->{Random random=new Random();int num= random.nextInt(1000);while (true){try {System.out.println("生成元素"+num);blockingQueue.put(num);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"生产者");producter.start();customer.join();producter.join();}
}

如何自己实现一个阻塞队列
1.通过“循环队列”来实现
package JavaThread;
public class myBlockingQueue {
private String[] data=null;
//队首
private int head=0;
//队尾
private int tail=0;
//元素个数
private int size=0;
public myBlockingQueue(int capacity){
data=new String[capacity];
}
public void put(String elem){
data[tail]=elem;
//tail=(tail+1)%data.length
tail++;
if(tail>= data.length){
tail=0;
}
size++;
}
public String take(String ret){
ret=data[head];
//head=(head+1)% data.length
head++;
if (head>= data.length){
head=0;
}
size--;
return ret;
}
}
2.使用synchronized加锁控制保证线程安全问题
使用wait notifyAll进行队列阻塞特性
需要注意的是:使用wait时使用while循环,目的是为了进行二次验证,判断当前条件是否成立
(wait之前验证一次,wait唤醒之后再验证一次(再次确定队列是否为空/满))
package JavaThread;
public class myBlockingQueue {
private String[] data=null;
//队首
private int head=0;
//队尾
private int tail=0;
//元素个数
private int size=0;
public myBlockingQueue(int capacity){
data=new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized (this) {
while (size >= data.length) {
wait();
}
data[tail] = elem;
//tail=(tail+1)%data.length
tail++;
if (tail >= data.length) {
tail = 0;
}
size++;
notifyAll();
}
}
public String take() throws InterruptedException {int ret=0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = data[head];
//head=(head+1)% data.length
head++;
if (head >= data.length) {
head = 0;
}
size--;
notifyAll();
}
return ret;
}
}package JavaThread;public class myBlockingQueue {private String[] data=null;//队首private int head=0;//队尾private int tail=0;//元素个数private int size=0;public myBlockingQueue(int capacity){data=new String[capacity];}public void put(String elem) throws InterruptedException {synchronized (this) {while (size >= data.length) {wait();}data[tail] = elem;//tail=(tail+1)%data.lengthtail++;if (tail >= data.length) {tail = 0;}size++;notifyAll();}}public String take(String ret) throws InterruptedException {synchronized (this) {while (size == 0) {wait();}ret = data[head];//head=(head+1)% data.lengthhead++;if (head >= data.length) {head = 0;}size--;notifyAll();}return ret;}}
具体实现如下:
package JavaThread;import java.util.Random;public class myBlockingQueue {private int[] data=null;//队首private int head=0;//队尾private int tail=0;//元素个数private int size=0;public myBlockingQueue(int capacity){data=new int[capacity];}public void put(int elem) throws InterruptedException {synchronized (this) {while (size >= data.length) {this.wait();}data[tail] = elem;//tail=(tail+1)%data.lengthtail++;if (tail >= data.length) {tail = 0;}size++;this.notifyAll();}}public int take() throws InterruptedException {int ret=0;synchronized (this) {while (size == 0) {this.wait();}ret = data[head];//head=(head+1)% data.lengthhead++;if (head >= data.length) {head = 0;}size--;this.notifyAll();}return ret;}public synchronized int size(){return size;}public static void main(String[] args) throws InterruptedException {myBlockingQueue blockingQueue=new myBlockingQueue(1000);Random random=new Random();Thread customer=new Thread(()->{while (true){try {int value= blockingQueue.take();System.out.println("消费元素"+value);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"消费者");customer.start();Thread producter=new Thread(()->{int num= random.nextInt(1000);while (true){try {System.out.println("生成元素"+num);blockingQueue.put(num);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"生产者");producter.start();customer.join();producter.join();}
}
3.线程池
通俗的说,线程池把线程提前创建,放到一个地方(类似于一个数组),需要用时去取,用完还到池子中;优点就是减少每次启动和销毁线程的开销
为什么直接创建/销毁线程的开销比从池子中取还线程的开销大呢?
操作系统分为内核态和用户态 //一个操作系统=内核态+配套的应用程序(一个操作系统内核只有一份,但是这一份内核就是给所有的应用程序提供服务的)
所谓的线程池可以高效的创建/销毁线程,就是节省了应用程序切换到内核中运行这样的开销
Executors创建线程池的几种方式
1.newFixedThreadPool:创建固定线程数的线程池
2.newCachedThreadPool:创建动态增长线程数的线程池
3.newSingleThreadExecutor:创建只包含一个线程的线程池
4.newScheduledThreadPool:创建带有线程池的定时器(进阶版Timer)
Executors本质上是ThreadPoolExecutor类的封装
返回值类型ExecutorService
核心方法submit(Runnable):通过Runnable描述一个要执行的任务,通过submit把任务放到线程池中;此时线程池里的线程就会执行这样的任务//!!!使用要重写run方法
pool.submit(new Runnable(){
public void run(){
System.out.println("hool");
}
});
再谈ThreadPoolExecutor提供的参数

corePoolSize:核心线程数(至少有多少个线程(随线程池创建而创建,随线程池销毁而销毁))
maximumPoolSize:核心线程数+非核心线程数(最大线程数)(线程池中的线程数是自适应的)
keepAliveTime:非核心线程允许空闲的最大时间
Unit:枚举,是keepAliveTime的核心单位,可以是分,是秒,也可以是其他值
workQueue:传递任务的阻塞队列
threadFactory:工厂模式(是一种设计模式,和单例模式是并列的关系)
RejectedExecutionHandler:拒绝策略 ,由于任务量超出线程池的负荷(对于线程池而言,当入队列满的情况下不会真的出现“入队列阻塞”,而是执行拒绝策略)
1)AbortPolicy():超过负荷,直接抛出异常(直接导致线程池无法进行工作)
2)CallerRunsPolicy():让调用者(调用submit的线程)自动执行任务(先判断队列是否为满,再判断是否为CallerRunsPolicy()执行策略,两者但是的话,紧接着submit内部会调用Runnable.run()方法)
3)DiscardOldlestPolicy():丢弃队列中最老的任务
4)DiscardPolicy():丢弃队列中最新的任务(当前submit执行的任务)
线程池本质上也是一个生产者消费者模型:
调用submit就是生产任务,线程池里的线程就是消费任务
工厂模式
为了弥补构造方法的缺陷(核心是通过静态方法,把构造对象new的过程,给属性初始化的过程 进行封装)
给线程类提供工厂类,也是将线程池中的一组线程,进行初始化设置
package JavaThread;
//...类
class thread{
//字段,方法
}
//...的类的工厂
public class threadFactory {
//通过静态方法
public static thread way1(){
thread t=new thread();
return t;
}
public static thread way2(){
thread t=new thread();
return t;
}
}
实现一个线程池
这里是实现一个固定线程数量的线程池
package JavaThread;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class myThreadPool {//使用BlockingQueue组织所有的任务private BlockingQueue<Runnable> queue=new LinkedBlockingQueue<>();//核心方法就是submit,把任务放到线程池中public void submit(Runnable runnable) throws InterruptedException {queue.put(runnable);}//创建固定数量的线程池public myThreadPool(int n){for (int i = 0; i < n; i++) {Thread t=new Thread(()->{//每个线程不停地取任务while (true) {try {//通过Runnable取出任务,取出任务操作有返回值Runnable runnable=queue.take();//取任务后执行runnable.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}});t.start();}}
}
class demo{public static void main(String[] args) throws InterruptedException {myThreadPool my=new myThreadPool(5);for (int i = 0; i < 1000; i++) {my.submit(new Runnable() {@Overridepublic void run() {System.out.println("线程" + Thread.currentThread().getName());}});}}
}
拓展:
调用shutdown()方法:保证线程池里的线程全部关闭,但是不能保证线程池里的任务全部执行完毕
调用awaitTermination()方法:需要等待线程池里的任务全部执行完毕
4.定时器
通俗的讲是,时间到了,要去执行一些逻辑
标准库中的定时器
核心类Timer,核心方法Schedule(第一个参数是即将要执行的任务代码,第二个参数是多次时间之后执行(毫秒))
package JavaThread;import java.util.Timer;
import java.util.TimerTask;public class myTimer {public static void main(String[] args) {Timer timer=new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("holle");}},2000);timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("holle");}},4000);timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("holle");}},6000);}}
实现一个定时器
1.创建一个类,表示一个任务
class myTimerTask{private Runnable runnable;private long time;public myTimerTask(Runnable runnable,long time) {this.runnable=runnable;//记录当前任务的时刻this.time=time;}
}
2.定时器能够管理多个任务,必须使用一些集合类把多个任务管理起来(对象需要放到优先级队列,需要实现comparable接口)
class myTimerTask implements Comparable<myTimerTask>{private Runnable runnable;private long time;public myTimerTask(Runnable runnable,long time) {this.runnable=runnable;//记录当前任务的时刻this.time=time;}@Overridepublic int compareTo(myTimerTask o) {//作差return (int) (this.time-o.time);}
}
3.实现schedule方法(使用PriorityQueue来组织多个任务,将任务添加到队列中)
package JavaThread;import java.util.PriorityQueue;class myTimerTask implements Comparable<myTimerTask>{private Runnable runnable;private long time;public myTimerTask(Runnable runnable,long time) {this.runnable=runnable;//记录当前任务的时刻this.time=time;}@Overridepublic int compareTo(myTimerTask o) {//作差return (int) (this.time-o.time);}
}
//实现一个定时器
public class myTimer {private PriorityQueue<myTimerTask> queue=new PriorityQueue<>();//任务当前时刻为基准多长时间之后执行public void schedule(Runnable runnable,long time){//System.currentTimeMillis()获得当前时刻时间戳api,返回long,ms级别myTimerTask my=new myTimerTask(runnable, System.currentTimeMillis()+time);//从队尾插入queue.offer(my);}
}
4.额外创建一个线程,负责队列中的任务(与线程池不同,线程池只要队列不为空,就立即取任务并执行;定时器要看对首元素时间是否到了,如果到了,就能执行了)
package JavaThread;import java.util.PriorityQueue;class myTimerTask implements Comparable<myTimerTask>{private Runnable runnable;private long time;public myTimerTask(Runnable runnable,long time) {this.runnable=runnable;//记录当前任务的时刻this.time=time;}@Overridepublic int compareTo(myTimerTask o) {//作差return (int) (this.time-o.time);}public long getTime() {return time;}public void run() {run();}
}
//实现一个定时器
public class myTimer {private PriorityQueue<myTimerTask> queue=new PriorityQueue<>();//任务当前时刻为基准多长时间之后执行public void schedule(Runnable runnable,long time){//System.currentTimeMillis()获得当前时刻时间戳api,返回long,ms级别myTimerTask my=new myTimerTask(runnable, System.currentTimeMillis()+time);//从队尾插入queue.offer(my);}public myTimer(){//创建一个线程,执行队列中的任务Thread t=new Thread(()->{while (true){if (queue.isEmpty()){continue;}//获取队列头部的元素,但是不删除该元素myTimerTask task=queue.peek();long current_time=System.currentTimeMillis();if(current_time>=task.getTime()){//时间到了//获取队列头部的元素,并删除该元素queue.poll();task.run();}else {}}});}
}
考虑线程安全问题和阻塞问题,完善代码
package JavaThread;import java.util.PriorityQueue;class myTimerTask implements Comparable<myTimerTask>{private Runnable runnable;private long time;public myTimerTask(Runnable runnable,long time) {this.runnable=runnable;//记录当前任务的时刻this.time=time;}@Overridepublic int compareTo(myTimerTask o) {//作差return (int) (this.time-o.time);}public long getTime() {return time;}public void run() {run();}
}
//实现一个定时器
public class myTimer {private PriorityQueue<myTimerTask> queue=new PriorityQueue<>();private Object object=new Object();//任务当前时刻为基准多长时间之后执行public void schedule(Runnable runnable,long time){//使用synchronized会产出一些误差,所有使用notify提前进行唤醒synchronized (object) {//System.currentTimeMillis()获得当前时刻时间戳api,返回long,ms级别myTimerTask my = new myTimerTask(runnable, System.currentTimeMillis() + time);//从队尾插入queue.offer(my);object.notify();}}public myTimer(){//创建一个线程,执行队列中的任务Thread t=new Thread(()->{synchronized (object) {while (true) {if (queue.isEmpty()) {try {object.wait();} catch (InterruptedException e) {e.printStackTrace();}}//获取队列头部的元素,但是不删除该元素myTimerTask task = queue.peek();long current_time = System.currentTimeMillis();if (current_time >= task.getTime()) {//时间到了//获取队列头部的元素,并删除该元素queue.poll();task.run();} else {//时间还没到try {object.wait(task.getTime() - current_time);} catch (InterruptedException e) {e.printStackTrace();}}}}});}
}
我们还可以基于时间轮去实现定时器,这里就不做过多的讲述了!
这一部分还是有点难度的,希望同学们多加练习!!!
