一,队列基本信息
1 实现类
2 队列分类
2.1 阻塞队列BlockingQueue
阻塞队列(Blocking Queue)提供了可阻塞的 put 和 take 方法,它们与可定时的 offer 和 poll 是等价的。如果队列满了 put 方法会被阻塞等到有空间可用再将元素插入;如果队列是空的,那么 take 方法也会阻塞,直到有元素可用。当队列永远不会被充满时,put 方法和 take 方法就永远不会阻塞。在java包"java.util.concurrent"中,提供六个实现了"BlockingQueue"接口的阻塞队列:
ArrayBlockingQueue 用数组实现的有界阻塞队列;
LinkedBlockingQueue 基于链表实现的有界阻塞队列
PriorityBlockingQueue是一个带优先级的队列,基于堆数据结构的;
DelayQueue是在PriorityQueue基础上实现的,底层也是数组构造方法,是一个存放Delayed 元素的无界阻塞队列;
SynchronousQueue 一个没有容量的队列 ,不会存储数据;
LinkedBlockingDeque 是双向链表实现的双向并发阻塞队列;
2.2 非阻塞队列
所有无Blocking Queue的都是非阻塞,并且它不会包含 put 和 take 方法。
2.3 有界队列和无界队列
2.3.1 有界队列
是指有固定大小的队列,比如设定了固定大小的 ArrayBlockingQueue,又或者大小为 0 的 SynchronousQueue。
2.3.2 无界队列
指的是没有设置固定大小的队列,但其实如果没有设置固定大小也是有默认值的,只不过默认值是 Integer.MAX_VALUE
2.4 双端队列
Deque是一个双端队列接口,继承自Queue接口,Deque的实现类是LinkedList、ArrayDeque、LinkedBlockingDeque,其中LinkedList是最常用的。
2.5 优先队列
优先队列(PriorityQueue)是一种特殊的队列,它并不是先进先出的,而是优先级高的元素先出队。 优先队列是根据二叉堆实现的。最大堆和最小堆。
2.6 延迟队列
延迟队列(DelayQueue)是基于优先队列 PriorityQueue 实现的,它可以看作是一种以时间为度量单位的优先的队列,当入队的元素到达指定的延迟时间之后方可出队。
二,队列的使用场景
最典型的就是线程池,不同的线程池都是基于不同的队列来实现多任务等待的。
1.LinkedBlockingQueue使用场景:
在项目的一些核心业务且生产和消费速度相似的场景中:订单完成的邮件/短信提醒。 订单系统中当用户下单成功后,将信息放入ArrayBlockingQueue中,由消息推送系统取出数据进行消息推送提示用户下单成功。如果订单的成交量非常大,那么使用ArrayBlockingQueue就会有一些问题,固定数组很容易被使用完,此时调用的线程会进入阻塞,那么可能无法及时将消息推送出去,所以使用LinkedBlockingQueue比较合适,但是要注意消费速度不能太低,不然很容易内存被使用完。
2.PriorityBlockingQueue使用场景:
在项目上存在优先级的业务:VIP排队购票 用户购票的时候,根据用户不同的等级,优先放到队伍的前面,当存在票源的时候,根据优先级分配
3.DelayQueue使用场景 :
由于是基于优先级队列实现,但是它比较的是时间,我们可以根据需要去倒叙或者正序排列(一般都是倒叙,用于倒计时)。所以适用于:
订单超时取消功能、网站刷题倒计时 用户下订单未支付开始倒计时,超时则释放订单中的资源,如果取消或者完成支付,我们再将队列中的数据移除掉。
4.SynchronousQueue使用场景:
参考线程池newCachedThreadPool()。 如果我们不确定每一个来自生产者请求数量但是需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是最简洁的办法。
cking Queue)提供了可阻塞的 put 和 take 方法,它们与可定时的 offer 和 poll 是等价的。如果队列满了 put 方法会被阻塞等到有空间可用再将元素插入;如果队列是空的,那么 take 方法也会阻塞,直到有元素可用。当队列永远不会被充满时,put 方法和 take 方法就永远不会阻塞。在java包"java.util.concurrent"中,提供六个实现了
三,代码实现
ArrayBlockingQueue
package org.jeecg.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 一,ArrayBlockingQueue是一个阻塞的队列,继承了AbstractBlockingQueue,
* 间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据,所以它是基于数组的阻塞队列。
* ArrayBlockingQueue是有边界值的,在创建ArrayBlockingQueue时就要确定好该队列的大小,
* 一旦创建,该队列大小不可更改。内部的全局锁是使用的ReentrantLock。
* 二、使用场景
* 先进先出(队首是最旧的元素;队尾是新添加的元素)
* 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
* 队列不支持空元素
*三,常用方法
* 添加:add方法(在队列的尾部添加元素,返回true。当队列满的时候会抛出IllegalStateException异常)
* offer方法(在队列的尾部添加元素,如果添加成功返回true,否则返回false。)
* put方法(在队列的尾部添加元素,如果队列已满,则会阻塞住,等待队列有空闲位置,该方法可被打断。)
* 获取:take方法(获取队列中的元素,如果没有则会被阻塞住。)
* poll方法(获取队列中队首的元素,获取后该元素就会移除该队列)
* peek方法(读取队列中队首的元素,不会删除该元素。)
* 删除:remove方法(删除队列中指定元素。)
*
* 其它:contains方法(查询队列中是否存在某个元素)
* remainingCapacity方法(返回队列中剩余空闲位置)
* drainTo方法(将队列中的元素排到指定集合中去)
*
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
// 创建一个容量为10的阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 启动消费者线程
Thread consumer = new Thread(new Consumer(queue), "Consumer");
consumer.start();
// 启动生产者线程
Thread producer = new Thread(new Producer(queue), "Producer");
producer.start();
}
static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
System.out.println("Producing: " + i);
queue.put(i); // 如果队列满了,这里会阻塞
Thread.sleep(1000); // 模拟生产间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take(); // 如果队列空了,这里会阻塞
System.out.println("Consuming: " + item);
Thread.sleep(2000); // 模拟消费间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
LinkedBlockingQueue
package org.jeecg.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 一,LinkedBlockingQueue中的锁是分离的,使用两个ReentrantLock来保证线程安全:入列前需要获取到入列锁(putLock),出列前需要获取到出列锁(takeLock),实现了入列锁和出列锁的分离。
* 二,使用场景
* 任务队列:在实现线程池时,如 ThreadPoolExecutor 默认使用的就是 LinkedBlockingQueue 作为工作队列,用来暂存待执行的任务。
* 异步处理管道:在需要解耦生产者和消费者的场景下,可以使用 LinkedBlockingQueue 作为消息队列,生产者将消息放入队列,消费者从队列中取出并处理消息,实现高效的异步处理。
* 批量处理:当需要收集一定数量的数据后再进行统一处理时,可以利用 LinkedBlockingQueue 的阻塞特性,当队列达到预设的“满”条件时再进行批量处理,提高处理效率。
* 流式处理:在实时数据处理系统中,LinkedBlockingQueue 可以作为缓冲区,平衡数据生产和消费的速度差异,保证数据处理的平滑进行
* 流量控制:在高并发系统中,如果后端处理速度跟不上前端生成数据的速度,可能会导致系统崩溃,通过使用LinkedBlockingQueue,可以设置一个有限的队列容量,当队列满时,生产者会被阻塞,从而实现了对流量的控制。
*三,常用方法
* 添加:add方法(在队列的尾部添加元素,返回true。当队列满的时候会抛出IllegalStateException异常)
* offer方法(在队列的尾部添加元素,如果添加成功返回true,否则返回false。)
* put方法(在队列的尾部添加元素,如果队列已满,则会阻塞住,等待队列有空闲位置,该方法可被打断。)
* 获取:take方法(获取队列中的元素,如果没有则会被阻塞住。)
* poll方法(获取队列中队首的元素,获取后该元素就会移除该队列)
* peek方法(读取队列中队首的元素,不会删除该元素。)
* 删除:remove方法(删除队列中指定元素。)
*
* element:检索队首元素。队首无数据时,抛出NoSuchElementException异常。
*
*/
public class LinkedBlockingQueueTest {
public static void main(String[] args) {
// 创建一个默认容量的LinkedBlockingQueue(如果不指定大小,则为Integer.MAX_VALUE)
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 启动消费者线程
Thread consumer = new Thread(new Consumer(queue), "Consumer");
consumer.start();
// 启动生产者线程
Thread producer = new Thread(new Producer(queue), "Producer");
producer.start();
}
static class Producer implements Runnable {
private final LinkedBlockingQueue<String> queue;
Producer(LinkedBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
String item = "Message-" + i;
System.out.println("Producing: " + item);
queue.put(item); // 放入队列,如果队列满将阻塞
TimeUnit.SECONDS.sleep(1); // 模拟生产间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
private final LinkedBlockingQueue<String> queue;
Consumer(LinkedBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String item = queue.take(); // 从队列中取出,如果队列空将阻塞
System.out.println("Consuming: " + item);
TimeUnit.SECONDS.sleep(2); // 模拟消费间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
PriorityBlockingQueue
package org.jeecg.Queue;
import java.util.concurrent.PriorityBlockingQueue;
/**
* 一,PriorityBlockingQueue 是 Java 并发包 (java.util.concurrent) 中的一个线程安全的数据结构,它实现了 BlockingQueue 接口,
* 并具备优先级排序功能。这个队列的特点是无界(除非系统资源耗尽)、线程安全,并且在插入或移除元素时可能会阻塞。
*
* 二、使用场景
* 任务调度:在多任务处理系统中,可以根据任务的优先级来调度执行。例如,高优先级的任务(如紧急的系统维护任务)可以优先得到处理。
* 资源分配:如在网络通信、数据库访问等场景下,可以根据用户的优先级(如VIP用户)来决定谁先获得资源。
* 事件处理系统:在处理不同类型的事件时,可以根据事件的重要程度设置优先级,确保重要的事件先被处理。
* 工作流引擎:在工作流管理系统中,任务根据其依赖性和优先级被安排执行顺序。
*三,常用方法
* 添加:add方法(在队列的尾部添加元素,返回true。当队列满的时候会抛出IllegalStateException异常)
* offer方法(在队列的尾部添加元素,如果添加成功返回true,否则返回false。)
* put方法(在队列的尾部添加元素,如果队列已满,则会阻塞住,等待队列有空闲位置,该方法可被打断。)
* 获取:take方法(获取队列中的元素,如果没有则会被阻塞住。)
* poll方法(获取队列中队首的元素,获取后该元素就会移除该队列)
* peek方法(读取队列中队首的元素,不会删除该元素。)
* 删除:remove方法(删除队列中指定元素。)
*/
public class PriorityBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 创建一个优先级阻塞队列,使用PriorityTask类的自然顺序进行排序
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
// 添加任务到队列
queue.put(new PriorityTask(3, "Task 3"));
queue.put(new PriorityTask(1, "Task 1"));
queue.put(new PriorityTask(2, "Task 2"));
// 从队列中取出并打印任务,优先级高的先出队
while (!queue.isEmpty()) {
PriorityTask task = queue.take();
System.out.println("Processing: " + task);
}
}
// 定义一个具有优先级的任务类
static class PriorityTask implements Comparable<PriorityTask> {
private int priority;
private String name;
public PriorityTask(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public int compareTo(PriorityTask o) {
if (this.priority < o.priority) {
return -1;
} else if (this.priority > o.priority) {
return 1;
} else {
return 0;
}
}
@Override
public String toString() {
return "PriorityTask{" +
"priority=" + priority +
", name='" + name + '\'' +
'}';
}
}
}
DelayQueue
package org.jeecg.Queue;
import lombok.Data;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 一,PriorityBlockingQueue 是 Java 并发包 (java.util.concurrent) 中的一个线程安全的数据结构,它实现了 BlockingQueue 接口,
* 并具备优先级排序功能。这个队列的特点是无界(除非系统资源耗尽)、线程安全,并且在插入或移除元素时可能会阻塞。
*
* 二、使用场景
* 缓存系统设计:使用DelayQueue保存缓存元素的有效期,用一个线程循环查询DelayQueue,一旦从DelayQueue中取出元素,就表示有元素到期。
* 定时任务调度:使用DelayQueue保存当天要执行的任务和执行的时间,一旦从DelayQueue中获取到任务,就开始执行,比如Timer,就是基于DelayQueue实现的。
*三,常用方法
* 添加:add方法(在队列的尾部添加元素,返回true。当队列满的时候会抛出IllegalStateException异常)
* offer方法(在队列的尾部添加元素,如果添加成功返回true,否则返回false。)
* put方法(在队列的尾部添加元素,如果队列已满,则会阻塞住,等待队列有空闲位置,该方法可被打断。)
* 获取:take方法(获取队列中的元素,如果没有则会被阻塞住。)
* poll方法(获取队列中队首的元素,获取后该元素就会移除该队列)
* peek方法(读取队列中队首的元素,不会删除该元素。)
* 删除:remove方法(删除队列中指定元素。)
*
* compareTo()主要用来对队列中的元素进行排序
* getDelay()主要用来判断存入队列的元素是否超时,超时则可以取出数据
*/
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
DelayQueue<Order> orders = new DelayQueue<>();
System.out.println(df.format(new Date()) + "订单A加入队列");
//生成订单A并加入队列
Order order_A = new Order("ALiangX","testA_" +random.nextInt(100),10,TimeUnit.SECONDS);
orders.offer(order_A);
Thread.sleep(5000);
//等待5s生成订单B并加入队列
System.out.println(df.format(new Date()) + "订单B加入队列");
Order order_B = new Order("Jack Ma","testB_" +random.nextInt(100),10,TimeUnit.SECONDS);
orders.offer(order_B);
Thread.sleep(5000);
//再等待5s生成订单C并加入队列
System.out.println(df.format(new Date()) + "订单C加入队列");
Order order_C = new Order("Pony","testC_" +random.nextInt(100),10,TimeUnit.SECONDS);
orders.offer(order_C);
while(!orders.isEmpty()){
//若没有超时的元素,take()会阻塞该线程
Order order = orders.take();
System.out.println("订单因为未支付:" +order.toString() + " 在 " + df.format(new Date()) + "被取出!!" );
}
}
@Data
public static class Order implements Delayed {
private String user;
private String order_id;
private long order_time;
public Order(String user, String order_id, long order_time, TimeUnit timeUnit){
this.user = user;
this.order_id = order_id;
this.order_time = System.currentTimeMillis() + (order_time > 0 ? timeUnit.toMillis(order_time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return order_time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
long to = this.order_time - order.getOrder_time();
return to <= 0 ? -1 : 1;
}
@Override
public String toString() {
return "Order{" +
"user='" + user + '\'' +
", order_id='" + order_id + '\'' +
", order_time=" + order_time +
'}';
}
}
}
SynchronousQueue
package org.jeecg.Queue;
import java.util.concurrent.SynchronousQueue;
/**
* 一,SynchronousQueue 的特性,生产者线程在添加下一个元素之前必须等待消费者线程取走当前元素,反之亦然,从而实现了严格的同步。
* 每个 put 操作必须等待一个对应的 take 操作
* 二、使用场景
* 任务传递:在线程池设计中,可以作为工作队列使用,特别是当你希望任务立即被工作者线程接管处理,而不是等待在队列中时。这有助于实现严格的生产者-消费者模式,保持并发度的精确控制。
* 事件驱动架构:在事件驱动的系统中,事件可以被立即传递给事件处理器,无需中间缓存,保证了低延迟和高效处理。
* 数据管道:在需要将数据从一个线程直接传递到另一个线程的场景中,比如在并行处理流水线中,它可以作为连接不同处理阶段的桥梁。
* 同步点:可以作为线程间同步的机制,当一个线程需要等待另一个线程的信号(通过插入一个元素)才能继续执行时。
*三,常用方法
* 添加:add方法(在队列的尾部添加元素,返回true。当队列满的时候会抛出IllegalStateException异常)
* offer方法(在队列的尾部添加元素,如果添加成功返回true,否则返回false。)
* put方法(在队列的尾部添加元素,如果队列已满,则会阻塞住,等待队列有空闲位置,该方法可被打断。)
* 获取:take方法(获取队列中的元素,如果没有则会被阻塞住。)
* poll方法(获取队列中队首的元素,获取后该元素就会移除该队列)
* peek方法(读取队列中队首的元素,不会删除该元素。)
* 删除:remove方法(删除队列中指定元素。)
*/
public class SynchronousQueueTest {
public static void main(String[] args) {
// 创建一个SynchronousQueue实例
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
// 启动生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
System.out.println("Producing: " + i);
queue.put(i); // put方法会阻塞,直到有消费者取走元素
Thread.sleep(100); // 模拟生产间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 启动消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Integer item = queue.take(); // take方法会阻塞,直到有元素可取
System.out.println("Consuming: " + item);
Thread.sleep(200); // 模拟消费间隔,大于生产间隔以观察效果
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}