线程 -- 阻塞队列
阻塞队列
阻塞队列,他其实就是一种更复杂的队列,也遵循队列的先进先出原则。它具有以下特性:
1.线程安全
2.阻塞特性
a)队列为空,尝试出队列,出队列操作就会阻塞,阻塞到其他线程添加元素为止。
b)队列为满,尝试入队列,入队列操作也会阻塞,阻塞到其他线程取走元素为止。
阻塞队列,一个最主要的应用场景,就是实现“生产者消费者模型”(多线程编程中,一种典型的编码技巧)。
生产者消费者模型
⽣产者消费者模型就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。
⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
举一个典型的生产者消费者模型:过年的时候,全家人包饺子,一个桌子上,一个人负责擀饺子皮,而其余的人则负责包饺子。那么在这个例子中,资源就是饺子皮,生产者就是负责擀饺子皮的人,其余的人就是消费者,而这个桌子就是生产消费的交易场所。此处的交易场所就是一个阻塞队列。(此处谈到的阻塞,是“极端情况下”生产者和消费者之间速度不协调的时候)
下来我们来聊聊生产者消费者模型的两个重要优势(日常开发中,生产者消费者模型,经常见到,并且消息队列这个东西也很常见。)(消息队列我们可以简单的理解为 假如把队列单独部署成一个服务,独立的服务的阻塞队列,称为“消息队列”,消息队列服务器,里面不只是一个队列,也可以有N个队列)
-
解耦合(不一定是两个线程之间,也可以是两个服务器之间)
如果是A直接访问B,此时A和B的耦合就更高,编写A的代码的时候,多多少少就会有一些和B相关的逻辑,编写B的代码的时候,也会有一些A的相关逻辑。
当我们加一个阻塞队列之后,A和队列交互,B和队列交互,A和B不再直接交互了。那么A的代码中就看不见B了,B的代码中也就看不见A了,A 的代码和B的代码中只能看见队列。(本来A和B耦合,现在成了A和队列耦合,B和队列耦合,那不是还有耦合吗?何来的降低耦合呢?降低耦合,是为了让后续修改的时候,成本低,而队列一般不会修改。之前A的代码需要修改的时候B也得同时改,加了队列之后,A改的时候,B就不用担心受到影响) -
阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒. (削峰填⾕)
可以简单的理解成服务器收到的请求量的曲线图
每当A这边遇到一波流量激增,此时每个请求都会转发给B, B也会承担一样的压力,很容易就把B给搞挂了。
一般来说A这种上游的服务器,尤其是入口的服务器,干的活更简单,单个请求消耗的资源数少,但像B这种下游的服务器,通常承担更重的任务量,复杂的计算/存储工作,单个请求消耗的资源数更多。
服务器处理每个请求的时候,都是需要消耗一定的硬件资源(包括不限于CPU,内存,硬盘,网络带宽…)一旦消耗的总量,超出了机器硬件资源的上限,此时,对应的进程就可能会崩溃或者操作系统产生卡顿-》挂了,也就是 当提供的量小于消耗的量,就会挂。
队列服务器,针对单个请求,做的事也少(储存,转发),队列服务器往往是可以抗很高的请求量。B这边可以不关心队列中的数据量多少,就按照自己的节奏慢慢处理队列中的请求数据即可。趁着峰值过去了,B任然可以继续消费数据。利用波谷的时间,来赶紧消费之前积压的数据,从而达到 削峰填⾕的效果。
生产者消费者模型付出的代价:
1)引入队列之后,整体的结构会更复杂。此时,就需要更多的机器,进行部署,生产环境的结构会更复杂,管理起来更麻烦。
2)效率会有影响
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的即可。
- BlockingQueue 是⼀个接⼝. 真正实现的类是 LinkedBlockingQueue.
- put ⽅法⽤于阻塞式的⼊队列, take ⽤于阻塞式的出队列.(put 和 take 才带有阻塞特性)
- BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些⽅法不带有阻塞特性
我们得抛出个异常不然会报错。
在出队列的时候没有put直接take会陷入阻塞
当然再实例化的阻塞队列 的时候也可以传参
这个参数表示容量,也就是最大能容纳多少元素。
如果不设置capacity ,它默认是一个非常大的数值(21亿左右)。在实际开发中,一般建议大家能够设置上你要求的最大值,否则你的队列可能变得非常大,导致把内存耗尽,产生超出内存范围这样的异常。
下来我们就可以通过阻塞队列来简单的实现生产者消费者模型
当我们运行程序的时候,我们会发现生产者和消费者线程的速度旗鼓相当,所以很难见到阻塞效果。
我们可以在生产者或消费者上面加入 sleep ,两个产生的效果不同。
没设置初始容量也没事,填满也没事,这个队列最多就是21亿个元素,每个元素是一个int(四个字节),8GB,极端情况,打满了,也就是消耗8GB内存。
一个JVM进程不一定能够利用机器所有的内存,是可以在运行JVM的时候通过一定的参数指定JVM最多消耗多少内存。当然,如果实际消耗的内存,超过了JVM运行时候的限制上线确实会挂。
(21亿*4个字节,是咋算出8GB的? ==》 可以记上几个单词 Thousand 千 =》 K Million 百万 =》 M Billion 十亿 =》 G)
自我模拟实现一个阻塞队列
- 1.通过“循环队列”的方式来实现
这里我用的是数组的形式来创建队列。大致思路就是:如果要插入元素,就在数组的 tail 位置插入该元素,当tail 超过数组长度的时候,重置为0。如果要取出元素,就在数组的head 位置插入该元素,同样当head超过数组长度的时候,重置为0。
-
2.在多线程情况中,使用synchronized进行加锁控制,产生阻塞等待效果。
-
put插入元素的时候,判定如果队列满了,就进行wait。
队列不满的时候,才要唤醒,也就是当其他线程执行成功 take 的时候。 -
take取出元素的时候,判定如果队列为空,就进行wait。
队列不空的时候,才要唤醒,也就是其他线程成功 put 的时候。
他们两个互相唤醒对方。
写到这里,阻塞队列其实已经写完了,但是这里面还有一个小问题。
这里的wait是用来确保接下来的操作是有意义的。
也就是,这一块代码要求逻辑,size 必须不能为0.
正常来说 wait 的唤醒就是通过另一个线程执行 put ,另一个线程 put 成功了,此处的size 肯定不是0 。但是 wait 不一定是被 notify 唤醒,还可能被interrupt这样的方法给中断。如果使用 if 作为 wait 的判定条件, 此时就存在 wait 被提前唤醒的风险。所以,我们可以通过 while 来规避这个问题。
这里循环的目的是为了“二次验证”,判定当前这里的条件是否成立。 wait 之前先判定一次, wai唤醒t也判定一次(再确认一次,队列是否不空)。
而且 wait 设计的时候本身就是搭配 while 用的。
操作系统原生api(linux 的 wait , 也是官方文档建议使用 while)
我们也可以用我们自己的阻塞队列来实现生产者消费者模型:
通过加入 sleep 使我们更好的观察。