阻塞队列
阻塞队列也是一种队列,先进的先出
阻塞队列就是对普通的队列做出的拓展
阻塞队列的特性
1.线程安全的,我们普通的队列值线程不安全的
2.具有阻塞的特性:
- a》如果针对一个已经满了的队列进行如队列操作的话,入队列操作就会阻塞,一直到阻塞队列不满了为止(有其他的线程出队列)
- b》如果针对一个已经空了的队列进行出队列操作的话,出队列操作就会阻塞,一直到阻塞队列不空了为止(有其他的线程如队列)
基于阻塞队列,就可以实现“生产者消费者模型”
举一个例子,包饺子
包饺子的流程:
- 和面(一般是一个人负责,没有办法多线程)
- 擀饺子皮
- 包饺子
(第二步和第三步这两环节就可以使用多线程了)
这三个滑稽开始同时完成擀饺子皮和包饺子这两个步骤,这三个滑稽,都是先擀一个饺子皮,再包一个饺子,之后再擀一个饺子皮,再包一个饺子,很明显这种多线程的包饺子的方式,要比单线程包饺子要快,但是我们只有一个擀面杖,我们三个滑稽就要竞争这个擀面杖,如果1号滑稽拿到了,2号和3号滑稽就得阻塞等待
为了解决这个问题,就可以分工协作(如下图)
此时这个专门负责擀面皮的滑稽就可以一个人独享擀面杖了,没有人跟他抢,此时就不会有锁竞争了
这里的分工协作就构成了生产者消费者模型,擀饺子皮的线程就是生产者(生产饺子皮),另外两个线程就是消费者(消费饺子皮)
这个桌子就充当起了传递饺子皮的作用,这个桌子的角色就相当于是阻塞队列
- 1》假设,擀饺子皮的人的速度非常快,但是包饺子的人的速度很慢,于是桌子上的饺子皮会越来越多,一直到了桌子满了为止,这时擀饺子皮的人就需要等一等,等包饺子的人消耗一波饺子皮之后再开始擀饺子皮
- 2》假设,擀饺子皮的人的速度很慢,但是包饺子的人的速度很快,就会导致桌子上的饺子皮越来越少,一直到桌子上没有了饺子皮为止,这时包饺子的人就需要等一等,等擀饺子皮的人擀下饺子皮的时候再开始包饺子
生产者消费者模型,在实际的开发中非常意义:
- 1.引入生产者消费者模型可以更好的“解耦合”(把代码的耦合程度从高减到低,就称为解耦合)
- 2.削峰填谷
解耦合
我们实际开发的过程中,通常会设计到“分布式系统”,整个服务器的功能不是由一个服务器完成,而是有多个服务器,每个服务器完成一部分的功能,通过服务器之间的网络通信,进而完成服务器的整个功能
如上图中,我们的电商网站客户端向我们的服务器发送了获取主页信息的请求,在我们的分布式系统里面,有一个入口服务器专门用来接收来自外部的请求,当我们的入口服务器接收到请求以后,就会调用相关的服务器,主页的信息包含用户的信息和商品的信息,于是就会调用用户服务器和商品服务器,这两个服务器会将信息返回给入口服务器,入口服务器就会将完整的主页信息返回给电商客户端
上述的模型中,A和B,A和C之间的耦合性是比较强的,由于A和B是直接交互的,A的代码里会涉及到关于B的操作,B的代码里面也会涉及关于A的操作,A和C也是同理,此时要是B或者C挂了,由于它们和A的耦合性强,就会影响到A,A也可能就挂了
引入生产者消费者模型,就可以降低耦合度
此时A和B,A和C都不是直接交互,而是通过阻塞队列进行传话,此时A里面的代码只需要跟阻塞队列进行交互,不知道B和C的存在,B和C也不知道A的存在,即使是B和C挂了,对A的影响也是微乎其微,后续如果是要再添加一个D服务器的话,A里面的代码几乎也不用变化
加入阻塞队列的代价是什么呢?
- 最明显的代价就是引入机器,需要更多的硬件资源,上面的阻塞队列并不是单纯的数据结构,而是实现了这个数据结构的服务器的程序,又被部署到单独的服务器主机上了,所以加入阻塞队列意味着需要更多的机器
我们将阻塞队列或者部署了实现该数据结构的程序的服务器成为消息队列- 系统的结构便复杂了,需要维护的服务器变多了
- 效率,引入中间商,还是有差价的,请求从A发出到B,中间要历经阻塞队列的转发,这个转发的过程也是有一定开销的
削峰填谷
举一个三峡大坝的例子
外界客户端发起的请求时固定不变的吗,并不是,这是属于“用户行为”,如果发生一些突发情况,就会使请求剧增,比如滴滴挂了就会使其竞品的请求量增加,还有双十一就会使购物类软件的需求剧增
当需求剧增了以后,A和B和C的压力就会变大,就有可能会导致服务器给挂掉,并且A的抗压能力更强,B和C的抗压能力更弱,这是为什么呢?
- 1》为什么当请求多了以后,服务器就会挂掉呢,服务器处理每个请求都是要消耗硬件资源的(包括但不限于CPU,内存,硬盘,网络带宽),虽然每个请求消耗的硬件资源很少,但是架不住请求多呀,所有的请求加到一起的话,总的消耗的资源就多了,只要软件资源到达瓶颈,服务器就挂了,最直观的感受就是你跟服务器发请求,他不会返回响应了
- 2》为什么A的抗压能力要比B和C的抗压能力更强,A处理的请求比较简单,每个请求消耗的硬件资源少,B和C处理的请求比较复杂(比如获取商品的信息,先要从数据库里面获取到对应的信息,还要根据一些条件进行匹配过滤筛选),每个请求消耗的硬件资源就多
一旦外界的请求数量出现突发的峰值,这个突发的峰值就会使B和C挂掉
我们现在引入阻塞队列(消息队列)
我们的阻塞队列没有什么复杂的业务逻辑,只是存储传递数据,所以阻塞队列的抗压性很好
所以当我们的外界请求量剧增的时候,也是阻塞队列来承担压力,B和C按照之前的速度来处理请求就可以,引入阻塞队列可以有效的防止B和C挂掉
请求太多,那接收的服务器会挂掉吗?
如果请求量一直往上增的话,即使是抗压能力很强的服务器也会挂掉,所以大厂的系统往往会有非常充裕的冗余量(引入更多的硬件资源,避免上述问题), 如果一台机器的CPU/内存的占用率超过50%的话,就需要机器进行扩容,保证机器的冗余量在50%,即使请求翻倍了,我们的机器也是能抗住的
可以往A服务器前面加一个阻塞队列吗
也不是不可以,但是如果请求一直增加的话,队列也可能会挂掉
Java标准库里提供了现成的阻塞队列数据结构
BlockingQueue就是Java标准库里面提供的,它是一个接口,以下是关于这个接口的实现
- Array Blocking Queue
- LinkedBlockingQueue
- PriorityBlockingQueue
- 我们使用put和offer都是一样的入队列,但是put是带有阻塞功能的,offer没有阻塞的功能(队列满了会返回结果)
- take方法出队列,也是带有阻塞功能的
- 但是阻塞队列没有提供带有阻塞功能获取队列首元素的方法
public class Test12 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue=new ArrayBlockingQueue<>(100);
queue.put("aaa");
String elem=queue.take();
System.out.println("elem是"+elem);
elem=queue.take();
System.out.println("elem是"+elem);
}
}
上述代码的结果就是只输出一次“elem是aaa”,之后线程就进入了WAITING的阻塞状态(如下图)
阻塞队列的实现
关于阻塞队列的使用是简单的,上述关于阻塞队列的介绍只是序幕,接下来是正题,我们要自己实现一个阻塞队列
实现一个阻塞队列,我们的步骤如下:
- 先实现一个普通的队列
- 再加上线程安全
- 再加上阻塞功能
阻塞队列我们基于数组来实现(环形数组)
如何判断一个队列是空还是满呢?
- 浪费一个格子,tail最多走到head前面的一个格子
- 引入size变量
环形数组
上图是我们实现的一个普通的队列,注意看我们红色方框标注的部分,他是我们环形数组中最重要的部分,这个代码能够保证我们的数组下标到达末尾之后又可以回0下标的位置
看上图的写法,是我们上面红框部分的另一种写法
- 当tail小于array.length时,tail最终的值就是tail本身
- 当tail等于array.length时,tail最终的结果就是0,又回到0下标的位置
上述的两种写法又什么区别呢?那种方法会更好?
我们从两方面来讲
- 开发效率(代码是否容易理解)
- 运行效率(代码执行的速度快不快)
开发效率
- 我们先看下图的代码
这个代码里面主要是条件跳转语句,条件跳转语句的执行速度非常快,而且tail=0的这个赋值操作,并不是每次都要执行,只有条件成立的时候才会执行,当条件不成立的时候,就不会执行,直接跳过- 我们来看另一种的写法
这种写法里面有取余操作,%本质上是除法,除法是比较低效的指令,我们的CPU擅长于加减,计算加减的速度要比计算乘除的速度要快,除了取余操作,还有赋值操作,这个赋值操作是100%会触发的,不像我们第一种写法,根据条件来判断是否要执行赋值操作,并不是每次都要赋值这样对比下来的话还是我们第一种的写法的运行效率高
开发效率
- 第一种写法还是很通俗易懂的
无非就是一个if加一个赋值的操作,只要是个程序员就可以看懂- 第二种写法里面有“%”
这个百分号虽然也简单,但是在不同的编程语言中%的作用可能还有一些不太一样,在代码的理解性上来说的话,虽然这种写法不难理解,但是对比第一种还是略微逊色一些所以在开发效率方面来说还是第一种写法略胜一筹
所以无论是开发效率还是运行效率来说,第一种写法是最优解
加锁
接下来我们加入锁,解决线程安全问题
我们知道多线程代码里面写操作时会有线程安全问题的,比如上图中红框里面的操作,但是我们上图中绿框的操作是否要加锁呢?
看上图的情况,如果只给写操作加锁的话,是会出现线程问题的如何加锁,锁放到哪里合适,加了锁是否还有问题,这是多线程里面的难点,我们需要确保所有顺序的情况下,程序的执行结果是正确的
以上就是我们加了锁之后的代码
加入阻塞
上图中我们恰好吧wait加到if里面,恰好if刚好在synchronized里面了
如果我们此时数组满了的话,就会执行wait,线程就会进入阻塞,此时就需要另一个线程来notify这个阻塞的线程,当队列不满时,也就是有线程出队列了以后,就可以将阻塞的线程唤醒了
于是我们就在put方法里面出队列之后,调用notify唤醒阻塞的插入线程
同理,出队列的时候队列如果为空的话,也需要阻塞等待插入的线程插入元素之后被唤醒,完整的代码如下
这里的唤醒操作我们不要使用notifyall,只要随机唤醒一个即可
假如我们现在有多个线程的插入操作都阻塞了,此时我们有一个线程出队列成功,即使我们将所有的入队列线程都唤醒的话,因为只空出来一个空,只有一个线程能成功插入,其他剩下的线程只能继续阻塞等待
这个代码还有一个问题
假设我们现在的队列已经满了
此时我们的put方法就会在wait阻塞,并且释放锁
此时锁又被另一个插入线程获取到,但是这个线程也是执行到wait阻塞并且释放锁了
之后锁被一个出队列线程获取到了,于是开始出队列
出队列的notify就会唤醒一个入队列线程,如上图的1步骤
put方法执行完以后,这个被唤醒的线程获取到锁,执行到locker.notify的时候,就有可能吧另一个阻塞的入队列线程唤醒,如上图的3步骤
但是我们的put方法里面的notify初心是要唤醒阻塞的出队列线程的,现在却把入队列线程唤醒了,出现了bug,也就是线程安全问题该怎么解决这个问题呢?
- 我们的wait外面是if的条件判断,if其实是"一锤子买卖",if判断之后,wait有可能进入阻塞,一旦阻塞了,下次被唤醒之后间隔的时间就是沧海桑田,这个过程中if里面的条件就有可能发生改变
- 我们只需要将if改成while即可,意味着唤醒之后还要再判断一次,wait之前判断一次,唤醒之后判断一次,双重保险,相当于是又确认了一次,如果再判断一次还是满着的话,就再wait进入阻塞
我们Java标准库里面也是推荐wait和while搭配使用
完整的代码
class MyArrayBlockingQueue{
private String[] array=null;
public MyArrayBlockingQueue(int size){
array=new String[size];
}
private Object locker=new Object();
private int head;
private int tail;
private int size;
public void put(String elem) throws InterruptedException {
synchronized (locker){
if(size>=array.length){
locker.wait();
}
array[tail]=elem;
tail++;
if(tail>=array.length){
tail=0;
}
size++;
locker.notify();
}
}
public String take() throws InterruptedException {
String elem=null;
synchronized (locker){
if(size<=0){
locker.wait();
}
elem=array[head];
head++;
if(head>=array.length){
head=0;
}
size--;
locker.notify();
}
return elem;
}
}
实际开发中,生产者开发者模型中,不仅仅只有一个生产者和消费者,这里的生产者和消费者不仅仅是一个线程,而是一个服务器程序,或者一组服务器程序,但是核心还是阻塞队列,使用synchronized和wait/notify达到线程安全和阻塞