多线程代码案例 - 2
阻塞队列
阻塞队列,我们熟悉的概念是队列,即一种先进先出的数据结构。阻塞队列,就是基于普通队列做出的扩展。
特点
1. 线程安全的
2. 具有阻塞特性
(a)如果针对一个已经满了的队列进行入队列,此时入队操作就会阻塞,一直阻塞到队列不满(即有其他线程进行出队操作)之后
(b)如果针对一个已经空了的队列进行出队列,此时出队操作就会阻塞,一直阻塞到队列不空(即有其他线程进行入队操作)之后
阻塞队列的作用非常大,因为基于阻塞队列,就可以实现“生产者消费者模型”!!!‘
什么是生产者消费者模型???
举个栗子:
包饺子的流程:
1.和面(一般都是一个人负责,没办法多线程完成)
2. 擀饺子皮
3. 包饺子 (第二步和第三步,这两步就可以多线程完成了)
现在有 A B C 三位大兄弟,共同完成上面包饺子的步骤,擀面杖,一般一个家庭中,只有一个擀面杖,所以会发生,三个线程都去竞争这个擀面杖,A 大兄弟,拿到擀面杖擀皮了,B C 就需要阻塞等待,所以,很明显,包饺子的方式适合用多线程的方式来实现,即A 是和面的,B C 负责擀皮和包饺子。每次都是 B C 的一位大兄弟擀一个皮,然后另一个大兄弟包一个饺子,再擀一个皮,再包一个饺子...
于是就可以分工协作:和面之后,三位大兄弟就要研究 擀皮 和 包饺子了:
这里的分工协作,就构成了生产者消费者模型,擀饺子皮的线程就是生产者(生产饺子皮),擀完一个饺子皮,饺子数目 +1,另外两个包饺子的线程,就是消费者(消费饺子皮),包完一个饺子,饺子皮的数目 -1。
而中间的桌子,就起到了“传递饺子皮”的效果。这个桌子的角色就相当于“阻塞队列”。
假设:擀饺子皮的非常快,包饺子的人包的很慢。就会导致桌子上的饺子皮越来越多,一直这样下去,桌子上的饺子皮就会满了。此时擀饺子皮的人就得停下来等一等,等这俩包饺子的人,消费一波之后,再接着擀...
又或许:擀饺子皮的非常满,包饺子的人包的非常快,就会导致桌子上的饺子皮,越来越少,一直这样下去,桌子上的饺子皮就会没有了。此时包饺子的人就得停下来等一等,等擀饺子皮的人,再擀出来一波,再接着包...
上述的栗子,大概就是模拟了生产者消费者模型。
意义
这个生产者消费者模型,在实际开发中,非常有意义。
1. 解耦合
1. 引入生产着消费者模型,就可以更好的做到“解耦合”。
(耦合程度:指的是代码中不同模块,类,函数之间相互依赖,相互关联的紧密程度,耦合度低:模块之间的依赖关系就少,相互影响就小。一个模块的修改不容易影响到其他模块,各个模块之间可以相对独立的进行开发...耦合度高:模块之间存在很强的依赖关系,一个模块的修改往往会导致其他多个模块也需要相应修改,代码的维护和扩展难度比较大...)(而我们一般是期望我们的代码耦合度低一些,即使用这个消费者生产者模型可以降低代码的耦合程度)
实际开发中,经常会涉及到“分布式系统”,即服务器整个功能不是由一个服务器全部完成的,而是每个服务器负责一部分功能,通过服务器之间的网络通信,最终完成整个功能。
上述模型中:A 和 B,A 和 C 之间的耦合性是比较强的!!!A 的代码中就需要设计到一些和 B 相关的操作,B 的代码中也涉及到一些和 A 的操作。同样的,A 的代码中也需要设计和 C 的操作,C 的代码也涉及到和 A 的操作。另外,如果 B 或者 C “挂了”,此时对于 A 的影响就很大,A 也可能就跟着 “挂” 了。
引入生产者消费者模型,就可以降低上述耦合度:
A 和 B,A 和 C 之间都不是直接交互了,而是通过队列在中间进行传话。此时,A 的代码中,只需要和队列交互就可以了,A 是并不知道 B 和 C 的存在的,同样的,B C 的代码,也只需要和队列进行交互,他们也是不知道 A 的存在的。
如果 B C “挂了”,对于 A 的影响是微乎其微的...假设后续如果要增加一个 D,A 的代码也是不用发生任何变化的。
引入生产者消费者模型,降低耦合度之后,也是需要付出一些代价的 ==》 需要加机器,即需要引入更多的硬件资源。
1. 上述描述的阻塞队列,并非是简单的数据结构,而是基于这个数据结构实现的服务器程序,又被部署到单独的主机上了。我们称这种未“消息队列(message queue)”
2. 整个系统的结构更复杂了。即我们要维护的服务器更多了。
3. 效率问题。引入了中间商“阻塞队列”,是存在差价的。请求从 A 发出来到 B 收到,这个过程中就需要经历队列的转发,这个过程中是存在一定开销的...
2. 削峰填谷
在讲代码之前,让我们先用一个栗子,来引入削峰填谷:
三峡水坝,大家应该都直到,是一个非常牛 x 的工程。
它的其中一项工作,就是可以使得上流的水流,按照固定的速率往下流去放水。
如下图所示:
如果上游的降雨量突然增大,那上游的洪水,就会以一个极其快的速度冲向下游,对中下游,造成很大的冲击,从而引起洪灾。三峡工程呢,就是在中间,建立了一个水库。
有了这个三峡水库之后,即使上游的水,非常的湍急,但在冲向下游的途中被三峡水库给拦住了,三峡大坝本身就是一个水库,可以存储很多的水,然后,我们就可以进行调控,使得三峡按照一定的速率,往下游放水。
即:上游降雨骤增,三峡大坝就可以关闸蓄水。
上游降雨骤减,三峡大坝就可以开闸放水。
上面的栗子就是对削峰填谷的大概比喻。(此处所谓的 峰 和 谷,都不是长时间持续的,而是短时间内所出现的...)
回到代码中,以我们的工作举栗子:
上面是一个分布式系统的大致模型,但我们要考虑到的是,当外网的请求突然增多时,即入口服务器 A 接收到的请求数量增加很多,A 的压力就会变大,但因为 A 做的工作一般比较简单,每个请求消耗的资源是比较少的,但是 B 和 C 服务器就不一定了,他们的压力同样会很大,且假设:B 是用户服务器,需要从数据库中找到对应的用户信息,C 是商品服务器,也需要从数据库找到对应的商品,还需要一些规则进行匹配,过滤等等...
A 的抗压能力比较强,B C 的抗压能力比较弱(他们需要完成的工作可能更加复杂,每个请求消耗的资源多...) ==》 一旦外界的请求出现突发的峰值,就会直接到导致 B C 服务器挂了...
那为什么,当请求多的时候,服务器就会挂了呢???
服务器处理每个请求,都是需要消耗硬件资源的!!!(包括但不限于 cpu 内存 硬盘 网络带宽等等...)即使一个请求消耗的资源比较少,但也无法承受住,同时会有很多的请求,加到一起来,这样消耗的总资源就多了。 ==》 上述任何一种硬件资源达到瓶颈,服务器都会挂(即客户端给服务器发出请求,但服务器不会再进行相应返回了)....
外界客服端发起的请求的数量,并不是固定的,有多少请求,是属于‘客户的请问“。有多少的请求,都是属于”客户的行为“...
我们就可以使用阻塞队列 / 消息队列了(阻塞队列:是以数据结构的视角命名的。消息队列:是基于阻塞队列实现服务器程序的视角命名的)...
当在 A 与 B C 之间添加一个阻塞队列之后,因为阻塞队列的特性,即使外界的请求出现峰值,也是由队列来承担峰值的请求,B 和 C(下游)仍然可以按照之前的速度来获得请求,这样就可以有效的防止 B 和 C 被高峰值的冲击导致服务器”挂了“。
补充:当请求太多的时候,接收请求的服务器也会挂的。请求一直往上增加,A 肯定也会有顶不住的时候,也可以给 A 前面再加一个阻塞队列,但当请求进一步的增加,队列也是可能挂的...(引入更多的硬件资源,避免上述情况...)
阻塞队列对应的数据结构
BlockingQueue 的使用
Java 标准库中提供了线程的阻塞队列的数据结果:
BlockingQueue 是一个总的 interface(接口),下面有三个具体的实现类:ArrayBlockingQueue LinkedBlockingQueue PriorityBlockingQueue
代码示例如下:
注意:使用 put 和 offer 一样都是入队列,但是 put 是带有阻塞功能的,offer 是没有阻塞功能的(队列满了之后就会返回 false),take 方法是用来出队列的,也是带有阻塞功能的。
但在阻塞队列中,并没有提供带有阻塞功能的,获取队首元素的方法。
实现一个 MyBlockingQueue
我们可以基于数组来实现其数据结构(环形队列)
环形队列:有两个指向头尾的引用 head 和 tail
每次插入数据的时候,将数据插入 tail 的位置,然后 tail 向后走
一直这样走
直到数组满了之后
因为我们要实现的是环形队列,所以要判断是否为满:
1. 浪费一个格子,tail 最多走到 head 的前一个位置。
2. 引入 size 变量
代码实现:(put 方法中,使用 size 来判断队列是否为满)
在 put 方法中的判断是否为满中,是由两种写法的,第一中就是我们上述所示:if(tail >= elems.length) 第二种是 tail = tail % elems.length,即(如果 tail < length,此时求余的量,就是 tail 原来的值,如果 tail == length,求余的值就是 0)
上述两种方法都能满足我们的目标,那如何评价某个代码段好还是不好呢?
1. 开发效率(代码是否容易被理解)
2. 运行效率(代码执行速度快不快)
让我们分析上面两种代码,if 代码,只要是个程序员,就认识 if 条件(大学生都认识...),但不理解 % 的,还是可能的,尤其是,在不同编程语言中,% 的作用可能还不一样...
而且,if 是条件跳转语句(执行速度非常快),大多情况下,并不会触发方法体中的赋值。但 % 本质上是除法运算指令,除法运算,是属于比较低效的指令(CPU 更加擅长计算 + -,计算 * / 的速度要比 + - 逊色一些),而且,第二种代码,是会百分百触发赋值操作的,运行效率会更低一些...
引入锁,解决线程安全问题
在 put 方法中,使得队列阻塞的代码先不提,就后面的代码:均是写操作,这几个代码都必须用锁包裹起来。
上述直接这样加锁,是线程安全的吗?
如下图为两个线程,如果随机调度成这样的情况
并且,此时这个 put 正好是添加最后一个元素,就会出现下面的情况:
所以我们的 synchronized 是需要加在最外面的,锁加到这里和加到方法上,本质上就都是一样的
阻塞部分的代码:
提起阻塞,我们就要想到使用 wait 来进行阻塞。
把 wait 加入到 if 的函数体中,巧了,正好这个 if 在 synchronized 的内部!!!
光有 wait 还不够,还需要有其他线程来对 wait 进行唤醒操作(队列如果没有满,就可以进行唤醒操作了)。这里有个问题是,什么叫做”队列不满“呢?什么情况下,是队列不满呢? ==》 出队成功,就是队列不满!对于满了的队列,就是在出队列成功之后唤醒。同样的,队列空了,再出队列,同样也需要阻塞(take 方法),同样是在另一个入列成功后的线程中唤醒...
put 代码
take 代码
这样看起来,wok,好像线程太安全了,锁也上了,操蛋的情况也排除了,我们的期望是,take 操作中的唤醒操作,将 wait 方法中的 wait 成功唤醒,wait 方法中的 notify 将 take 中的 wait 唤醒。但是!
可能会出现下图的情况:
不同线程之间的,put 和 take 方法中的 notify 可能会不正确的将错误的 wait 给唤醒。
或者出现如下情况:入队列的唤醒操作,把其他线程的入队列的 wait 唤醒了。在第一步中,两个 wait 都执行到 put 了(注意:wait 之后会有三步操作,第一步就是释放锁,所以可以出现两个 wait 都执行到 put 了),第二步,有一个 take 方法执行到了 notify,将其中一个 put 的 wait 唤醒了,然后这个 put 操作向下执行代码,执行到 notify 之后,将另一个 put 方法的 wait 唤醒了...
如上两种,又是不符合我们预期的两种 bug,并且似乎还很麻烦,锁的对象又必须是 locker 这一个对象,如果我们定义两个 locker1 和 locker2,那又无法实现锁竞争 ==》 线程不安全了...如何解决呢?
其实解决方案很简单,但是问题是为什么,一定要想明白。
在进行阻塞的时候,我们是都只是用了 if 来进行条件判断,在 put 方法中,使用 if(size >= elems.length) 判断,在 take 方法中,使用 if(size == 0) 来判断,if 是 “一锤子买卖”,只判定一次条件,一旦程序进入阻塞之后,再被唤醒,这中间隔的时间,就是沧海桑田了,阻塞状态的过程中,发生的时候,会导致出现很多变数。有了这些变数之后,就很难以保证,()中的条件是否仍然满足了,入队列的条件是否仍然具备了...
我们可以将 if 改为 while,改为 while 之后,意味着 wait 唤醒之后,还需要再判定一次条件。即,wiat 之前判定一次,唤醒之后,再判定一次(相当于多做了一步确认操作!!!)
如果再次判定条件,发现队列还是满的,即是在 wait 等待过程中,出现了变数,此时就应该继续等待!
Java 标准库也是推荐,wait 要 搭配 while 进行使用,多 N 次确认操作!!!
上面英文的大概意思是: wait 可能被提前唤醒,即明明条件还没满足,就被唤醒了,所以经常是一个循环,所以我们可以使用 while 进行确认操作!
基于阻塞队列,写一个简单的生产者消费者模型
前言:在实际的开发中,生产者消费者模型,往往是多个生产者和多个消费者。这里的生产者和消费者往往不仅仅是一个线程,也可能是一个独立的服务器程序,甚至是一组服务器程序...但最核心的仍然是阻塞队列,使用 synchronized 和 wiat / notify 达到线程安全 and 阻塞
如下图,在 t2 中有 Thread.sleep(500) ==》 这对应的是生产者非常快,消费者非常慢的情况,即生产者生产 1000 个之后,消费者消费一个,生产者生产一个...
运行如下: