阻塞队列:线程安全与生产者消费者模型解析
一、阻塞队列
阻塞队列就是基于普通队列做出扩展
1.线程安全的
如果针对一个已经满了的队列进行入队列,此时入队列操作就会阻塞,一直阻塞到队列不满(其他线程出队列元素)之后
如果针对一个已经空了的队列进行出队列,此时出队列操作就会阻塞,一直阻塞到队列不空(其他线程入队列元素)之后
包饺子
A和B和C在过年期间要包饺子,为了快一点三个人决定一起包饺子
但包饺子得先擀饺子皮,现在就只有一个擀面杖,所以A擀饺子皮是BC就陷入了阻塞,显然这就会出现竞争擀面杖问题(锁竞争)。
我们加入了一个桌子(阻塞队列),A负责擀饺子皮,BC通过桌子来包饺子,
如果A干的特别快,BC赶不上他的速度,很快桌子就会被放满(队列满了),这下A就会停下来(阻塞)
如果BC干的特别快,A赶不上他俩的速度,桌子为空(队列为空),BC就得等待(阻塞)
生产者消费者模型
1.引入生产者消费者模型,就可以更好的做到“解耦合”
上述过程中,A和B,A和C的耦合性较强,如果B或者C挂了,对于A的影响是很大的
加了阻塞队列A和B,C几乎几乎就没有交集了,B,C就算是挂了,对于A的影响也是微乎其微
阻塞队列(消息队列)
1.上述的阻塞队列,是基于这个数据结构实现的服务器程序,又被部署到单独的主机上
2.整个系统的结构更复杂了,你要维护的服务器更多了
3.效率,请求从A发过来B受到,这个过程经过转发要有一定开销
2.削峰填谷
当外网的请求数量就像洪水一样,急剧增加,这时A的请求数量就会增加很多,即使工作一般比较简单,每个请求消耗的资源少,但架不住多。B和C完成的工作更复杂,消耗的资源更多,这就可能导致BC挂了。
我们加入了阻塞队列之后
队列就像抗洪用的水库,只是用来存储数据的,消耗资源很少,抗压能力很强,在数据多的时候,存下来保持稳定的速度,少的是否就会放出数据,同样保持稳定请求速度,需要注意的是这个队列只是防止BC挂掉了。
Java标准库提供了现成的阻塞队列数据结构
使用put 和 offer一样都是入队列
但是put是带有阻塞功能,offer 没带阻塞(队列满了会返回结果)
take 方法用来出队列,同样带有阻塞功能
阻塞队列没有提供带有阻塞功能的获取队首元素的方法
尝试实现一个阻塞队列
既然涉及到了出队列,那数据结构就得用到循环队列,保证空间利用率
class MyBlockingQueue{private String[] elems = null;private int head = 0;private int tail = 0;private int size = 0;//准备锁对象,使用this也可以private Object locker = new Object();public MyBlockingQueue(int capacity){elems= new String[capacity];}public void put(String elem) throws InterruptedException {while(size >= elems.length){}//新的元素要放到tail指向的位置上elems[tail] = elem;tail++;if(tail >= elems.length){tail = 0;}size++;}public String take() throws InterruptedException {String elem = null;while(size == 0){}//取出 head 位置的元素返回elem = elems[head];head++;if(head >= elems.length){head = 0;}size--;return elem;}}
现在我们就相当于实现了一个循环队列,我们需要加上阻塞效果,我们知道只有当队列满和队列空才会阻塞,而解除阻塞则需要队列中有一个元素出去和队列中有一个元素进来。
所以我们可以在put结束时加上notify,并在判断队列满了的时候进行阻塞,至于在判断条件加上while循环,是为了防止在多线程情况下被不小心解锁,毕竟,每次put,take之后都对解一次锁,为了保证代码的原子性,我们为整个方法都加上锁。试想一下如果没有这么加锁,就会引发数组越界等一系列问题。
take方法也是同理
至此我们可以来验证一下阻塞队列是否成功,我们创建了两个线程分别是t1,t2,t1线程为生产元素的线程并且要比t2线程消费元素快的多。
Thread t1 = new Thread(() -> {int n = 1;while(true){try {queue.put(n + "");System.out.println("生产元素" + n);n++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});//消费者Thread t2 = new Thread(() -> {while (true){try {String n = queue.take();System.out.println("消费元素" + n);Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t2.start();t1.start();
我们发现在一开始运行时,t1线程十分快都到了一千多了,但t2线程才到2,这就导致了t1线程要想继续添加元素,就得等t2线程,这就出现了消费一个元素,就生产一个元素。
定时器
public class ThreadDemo30 {public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {//时间到了之后要执行的代码System.out.println("hello timer 3000");}},3000);timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("hello timer 2000");}}, 2000);timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("hello timer 1000");}},1000);System.out.println("hello main");}
}
我们发现进程并没有结束,Timer 里内置了前台线程
实现一个简单timer
要想实现定时器,得先剖析它,怎么剖析呢,我们发现定时器就像一个容器一样将线程存储起来,等线程的时间到了就调度,时间短的限制性,这个容器我们就可以用
优先级队列
而优先级队列存储的是啥呢?优先级队列比较器基准是啥呢?
存储的这个元素得有时间和进程,这时没问题的,我们主要的是根据时间来做文章,首先便是初始化问题,时间得是相对时间所以我们发现
在构造方法上我们是现在的时间加上等待的时间、
其次就是比较器
我们比较的基准是时间,时间少的往前走反之往后走,
比较的顺序往往是试出来的,而不是去猜出来的
回到优先级队列那里,我们要有一个方法创建任务对象,并且将任务放入队列中
接下来是进行任务的时刻
我们将代码放到构造方法中
我们得有一个线程来做遍历队列中任务的工作
我们之所以加上了锁
是因为防止线程安全问题出现,试着想一下没有锁会咋样,可能队列中都没有元素了它还进行访问,可能队列中的这个元素都出去来,还去执行
为了保持原子性还是得加上锁的,在漫长的等待后,队列进入元素,locker重新获得锁,需要注意的是我们每次去往队列添加一个元素都会解锁。之后我们得获取当前时间和任务规定的时间进行计较,如果没达到约定时间就要进行等待。
至此我们就完成了定时器的设计了,我们发现做完后好像云里雾里啊,我们来重新画图梳理
首先
因为定时是时间越短越先执行,所以我们用优先级队列来存储线程
我们存储的元素不仅要有线程还要有时间,并且还要有比较器
我们元素构建好了得把元素放到队列中就衍生出了schdule方法,它负责创建对象,并将对象放入队列
接下来我们就要执行进程,我们选在了构造方法里
一进来就先加锁,防止出现线程安全问题,然后判断是否为空,如果为空就要等待,等schdule方法放入对象,并notify。
然后比较当前时间和任务预期执行时间,没到预期时间就去等待相应时间、
到达时间出队列,并且运行线程。
结束!!!