多线程—应用案例
上篇文章
多线程—保证线程安全https://blog.csdn.net/sniper_fandc/article/details/146421342?fromshare=blogdetail&sharetype=blogdetail&sharerId=146421342&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
目录
1 单例模式
2 阻塞队列
(1)生产者消费者模型
(2)阻塞队列的底层实现
3 定时器
4 线程池
1 单例模式
单例模式中使用双重if和synchronized机制来保证线程安全,详细见文章:
详解单例模式https://blog.csdn.net/sniper_fandc/article/details/143407061?fromshare=blogdetail&sharetype=blogdetail&sharerId=143407061&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
2 阻塞队列
阻塞队列是一种线程安全的数据结构,也遵循“先进先出”的规则,特殊的是:当队列满时,如果继续入队,就会阻塞,直到队列不满,则解除阻塞继续入队;当队列空时,如果继续出队,就会阻塞,直到队列不空,则解除阻塞继续出队。
(1)生产者消费者模型
阻塞队列的典型运用场景是生产者消费者模型。生产者消费者模型是一种通过缓冲区(阻塞队列)解决生产者和消费者关系紧密(耦合)的方式,生产者只负责生产“产品”(发送数据),消费者只负责消费“产品”(接收数据),如果没有缓冲区,生产者直接和消费者通讯,一旦生产者出现问题,就会导致消费者也可能出错。通过缓冲区,也就是阻塞队列,就解决了上述问题,因此阻塞队列在生产者消费者模型中有以下作用:
1.实现了生产者和消费者解耦:生产者把产品直接放入阻塞队列,消费者从阻塞队列中取产品,从而避免了直接交互。
2.作为缓冲区进行“削峰填谷”:在“秒杀”场景下,服务器可能在某个很短的时间内收到大量的订单请求,服务器可能承受不住如此大流量从而崩溃。而使用阻塞队列,服务器把订单请求放到缓冲区中(阻塞队列),由消费者线程依次处理队列中的订单请求,从而缓解了服务器的压力。
public class BlockQueue {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(() -> {
int i = 0;
while(true){
try {
i = queue.take();
System.out.println("消费者取元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread producer = new Thread(() -> {
int i = 0;
while(true){
try {
System.out.println("生产者生产元素:" + i);
queue.put(i);
i++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
customer.start();
}
}
这是生产者消费者模型的实现,BlockingQueue是Java标准库的阻塞队列接口,LinkedBlockingQueue是具体实现类,只有put()(入队)和take()(出队)方法具有阻塞功能,其他方法比如offer()、poll()和peek()等方法不具有阻塞功能。
代码运行结果如下:
(2)阻塞队列的底层实现
public class MyBlockingQueue {
private int[] queue = new int[100];//数组模拟循环队列
private int head = 0;//队空
private int tail = 0;//队尾
private volatile int size = 0;//判断是否队满
public void put(int num) throws InterruptedException {
synchronized (this){
//循环判断,唤醒时可以多判断一次是否队满(应对复杂情况)
while(size == queue.length){
//队满入队阻塞等待
this.wait();
}
queue[tail++] = num;
tail = tail % queue.length;
size++;
//队不空,唤醒出队
this.notify();
}
}
public int take() throws InterruptedException {
int result;
synchronized (this){
//循环判断,唤醒时可以多判断一次是否队空(应对复杂情况)
while(size == 0){
//队空出队阻塞等待
this.wait();
}
result = queue[head++];
head = head % queue.length;
size--;
//队不满,唤醒入队
this.notify();
}
return result;
}
}
这里采用循环队列实现阻塞队列,使用数组来表示循环队列,采用size变量处理队满和队空的情况(还有一种方式:浪费一个空间作为标志位),锁对象是当前的队列对象this,在所有使用这个对象加锁的多线程环境下保证了线程安全。由于需要读写size变量,为了保证内存可见性,使用volatile修饰该变量。这里有两个技巧:1.while循环判断队空和队满,如果使用if在这个场景下也可以处理,但是在复杂场景下,为了保证线程安全,使用while当线程阻塞被唤醒时,会再次判断是否队满或队空,这样避免了线程抢到锁后由于多线程的复杂导致队还是满的或者空的,从而引发线程不安全。2.对于head和tail指针的移动,如果超过数组长度,就需要从头开始,可以使用取余和if判断两种方式,但是一般建议if判断(if(tail >= queue.length){tail = 0;}),这种方式可读性强。而取余运算,当数组长度时2的倍数时,此时会被优化成移位运算,性能更好;不是2的倍数,就效果一般。
注意:分清楚代码中的阻塞和唤醒位置与顺序。
验证阻塞队列:
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(() -> {
int i = 0;
while(true){
try {
i = queue.take();
System.out.println("消费者取元素:" + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread producer = new Thread(() -> {
int i = 0;
while(true){
try {
System.out.println("生产者生产元素:" + i);
queue.put(i);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
customer.start();
}
运行结果:
由于队列的容量是100,生产者生产速度快,消费者每消费1个元素就sleep(500)(500ms),因此阻塞队列很快就满了,此时只有消费者每消费1个元素队列才有容量继续入队,因此就会出现上图的结果。
3 定时器
当我们希望在一段时间后执行一个任务,这时候就可以用到定时器。在Java标准库中使用Timer类,即可使用定时器:
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello timer");
}
}, 5000);
schedule表示启动一个定时器,第一个参数是TimerTask对象,指要执行的任务。第二个参数是定时,单位为ms。即上述代码5s后执行run()方法的内容。
实现一个定时器需要:1.带优先级的阻塞队列2.队列元素是Task对象(待执行的任务)3.Task对象具有一个时间属性(最先要执行的在队首)4.worker线程不断扫描队首元素判断是否要执行。
class MyTimeTask implements Comparable<MyTimeTask>{
private Runnable runnable;//要执行的任务
private long afterTime;//定时
public MyTimeTask(Runnable runnable,long afterTime){
this.runnable = runnable;
this.afterTime = System.currentTimeMillis() + afterTime;
}
public void run(){
runnable.run();
}
public Long getAfterTime() {
return afterTime;
}
@Override
public int compareTo(MyTimeTask o) {
//比较规则:时间戳越小的越优先
return (int)(this.afterTime - o.afterTime);
}
}
class MyTimer {
//锁对象
private Object locker = new Object();
//优先级阻塞队列(要么传入比较器,要么比较对象实现Comparable接口)
private PriorityBlockingQueue<MyTimeTask> queue = new PriorityBlockingQueue<>();
public void schedule(Runnable runnable,long afterTime){
MyTimeTask myTimeTask = new MyTimeTask(runnable,afterTime);
synchronized (locker){
queue.put(myTimeTask);
//放入新任务时唤醒worker线程来看看是否有更优先执行的任务
locker.notify();
}
}
public MyTimer(){
Thread worker = new Thread(() -> {
while(true){
try {
synchronized (locker){
//必须有队空的判断,否则出现死锁问题(
// worker线程持有锁,其他线程向阻塞队列添加任务时无法获得锁被阻塞,
// 从而worker线程一直等待新任务,而新任务永远无法添加到队列中)
while(queue.isEmpty()){
locker.wait();
}
MyTimeTask timeTask = queue.take();
long currentTime = System.currentTimeMillis();
if(timeTask.getAfterTime() > currentTime){
//还未到执行任务的时间
queue.put(timeTask);
//让worker线程等待,防止CPU忙等
//不能用sleep死等,有可能在等待期间有更优先执行的任务到来,wait可以被随时唤醒
locker.wait(timeTask.getAfterTime() - currentTime);
}else{
//到时间了执行任务
timeTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
worker.start();
}
}
public class MyTimerDemo {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("3秒后执行任务1");
}
},3000);
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("2秒后执行任务2");
}
},2000);
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("1秒后执行任务3");
}
},1000);
}
}
这里注意synchronized的范围,如果只对wait和notify加锁(这是必须的),就会出现可能执行完queue.put()后,worker线程被调度下CPU,此时其他线程再添加任务到优先级阻塞队列中,其他线程执行的locker.notify()无事发生(因为worker线程还没来得及阻塞),当worker线程再次调度执行时,就会忽视刚刚新添加的任务直接进入阻塞状态,从而可能导致新添加的任务到时间并未执行。因此必须放大锁的范围,即必须把出队/入队操作和等待/唤醒操作打包成原子操作,防止之间有新任务到来。
在阻塞队列为空时,如果没有循环判空的操作,worker线程被阻塞到take()方法,但是worker线程持有locker锁,其他线程向阻塞队列添加任务时被阻塞到synchronized (locker),导致worker线程一直等待任务,其他线程又无法获得锁添加任务的现象,从而导致死锁的发生。运行结果如下:
4 线程池
当需要频繁创建和销毁线程时,由于创建和销毁线程需要OS内部用户态和内核态频繁切换(开销大),因此需要线程池实现线程的复用。
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
});
Executors创建线程池的有以下方式:1.newFixedThreadPool: 创建固定线程数的线程池2.newCachedThreadPool: 创建线程数目动态增长的线程池3.newSingleThreadExecutor: 创建只包含单个线程的线程池.4.newScheduledThreadPool: 设定延迟时间后执行命令,是线程池版的Timer。
线程池的多个线程并不一定全在工作,有些可能“摸鱼”,有些可能一直在工作。Executors本质上是ThreadPoolExecutor类的封装,通过静态方法来实现不同参数类型实例的创建,这其中蕴含着工厂方法模式的思想。
模拟实现线程池:1.阻塞队列,用该数据结构来存储需要执行的任务,同时保证线程安全2.生产者消费者模型,线程池是消费者,定义任务是生产者,阻塞队列是缓冲区。
class MyThreadPool {
//存储待执行的各个任务
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
//插入任务
public void submit(Runnable runnable){
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//创建固定数量线程的线程池
public MyThreadPool(int n){
//循环创建n个线程,每个线程都尝试从阻塞队列中取任务
//队列不空就消费任务,队列为空就阻塞等待
for(int i = 0;i < n;i++){
Thread t = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
t.start();
}
}
}
public class ThreadPoolDemo {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(10);
for(int i = 0;i < 100;i++){
int finalI = i;
myThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("任务" + finalI + "执行完毕");
}
});
}
}
}
线程池实现的代码需要注意的已经用注释标注了,只是在运行时需要注意int finalI = i;必须有,因为如果没有,每个任务共享变量i的引用(任务还没提交时可能循环已经结束了,此时i的值就已经变成100,打印结果全部都是:“任务100执行完毕”),因此需要创建局部变量finalI来传递i的每次循环的值到任务中。运行结果如下:
下篇文章: