目录
什么是队列?
队列是一种先进先出的线性数据结构,将元素添加到队列后的操作称为入队,从队列中移除元素的操作称为出队。队列还分为 单端队列(queue) 和 双端队列(deque) 。
- 从理论上讲,队列的一个特征是它没有特定的容量。不管已经包含多少元素,总是可以再添加一个新元素。
- 队列既可以是数组实现也可以是链表实现。所以当我们在 Java 中使用队列的时候,Deque 的实现类就是;LinkedList 和 ArrayDeque的实现类。
- 队列不只是单端从一个口入另外一个口出,也可以是双端队列。例如在 Java 中 Queue 是单端队列接口、Deque 是双端队列接口,都有对应的实现类
延迟队列介绍
这里我们来扩展实现一个延迟队列,并在这个过程中会涉及到阻塞队列、优先队列的使用。通过这样的一个手写源码来学习队列的扩展使用。
说明
DelayQueue 是一个 BlockingQueue(无界阻塞)队列,它封装了一个使用完全二叉堆排序元素的 PriorityQueue(优先队列)。在添加元素时使用 Delay(延迟时间)作为排序条件,延迟最小的元素会优先放到队首。
这个延迟队列中用到的排序方式就是 PriorityQueue 优先队列,它的数据结构是数组实现的队列,但体现形式是一棵二叉堆树结构。在元素存放时,通过对存放元素的比较和替换形成二叉堆结构。
二叉堆结构
二叉堆是一种特殊结构的堆,它的表现形态可以是一棵完整或近似二叉树的结构。我们要实现的延迟队列中的元素存放,使用的就是 PriorityQueue 实现的平衡二叉堆结构,数据以队列形式存放在基础数组中。
父子节点索引关系:
- 假如父节点为queue[n],那么左子节点为queue[2n+1],右子节点为queue[2n+2]
- 任意孩子节点的父节点位置,都是 (n-1)>>>1 相当于减1后除2取整
节点间大小关系:
- 父节点小于等于任意孩子节点
- 同一层级的两个孩子节点大小不需要维护,它是在弹出元素的时候进行判断的
实现延迟队列
实现说明
- 延迟队列的使用,是以在 DelayQueue 中存放实现了 Delayed 延迟接口的对象。因为只有实现这个对象,才能比较出当前元素与所需存放到对应位置的一个比对计算过程。
- 另外这里的核心点包括:PriorityQueue —— 优先队列、ReentrantLock —— 可重入锁、Condition —— 信号量
包含属性
public class DelayQueue<E extends Delayed> implements BlockingQueue<E>{
// 可重入锁,用于保证线程安全
private final ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> pq = new PriorityQueue();
// 条件变量,用于实现阻塞等待
private final Condition available = lock.newCondition();
}
入队操作
@Override
public boolean add(E e) {
offer(e);
return true;
}
@Override
public boolean offer(E e) {
lock.lock();
try {
pq.offer(e);
// 如果新添加的元素成为队列头部(延迟时间最短)
// 需要唤醒可能正在等待的消费者线程
if (pq.peek() == e) {
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
为什么元素是队列头部就要唤醒正在等待的消费者线程?
考虑以下场景:
- 队列中原有元素A的延迟时间是10秒
- 消费者线程正在等待A到期
- 此时插入新元素B,延迟时间只有2秒
如果不唤醒等待的消费者线程:
- 消费者线程会继续等待10秒(基于元素A的延迟时间)
- 但实际上2秒后元素B就可以被消费了
- 这样就造成了8秒的不必要等待
出队操作
@Override
public E poll() {
lock.lock();
try {
E first = pq.peek();
// 如果队列为空,或者头部元素延迟时间未到
if (first == null || first.getDelay(NANOSECONDS) > 0) {
return null;
} else {
return pq.poll();
}
} finally {
lock.unlock();
}
}
为什么没有实现接口,first.getDelay也能正确获取到延迟时间?
优先级队列是如何实现的?
包含属性
public class PriorityQueue<E> implements Queue<E> {
// 日志记录器,用于记录队列操作信息
private Logger logger = LoggerFactory.getLogger(PriorityQueue.class);
// 队列默认初始容量,使用堆结构的典型初始大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 使用Object数组存储队列元素,保存的是Comparable对象
private Object[] queue;
// 记录队列中实际元素的数量
private int size;
}
入队操作
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
// 看是否需要扩容
int i = size;
if (i >= queue.length) {
// i+1表示插入新元素后所需的最小容量
grow(i + 1);
}
size = i + 1;
// 插入元素
if (i == 0) {
queue[i] = e;
} else {
// 目标值上浮
siftUp(i, e);
}
return true;
}
上浮过程
private void siftUp(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
logger.info("【入队】元素:{} 当前队列:{}", JSON.toJSONString(key), JSON.toJSONString(queue));
while (k > 0) {
int parent = (k - 1) >>> 1;
logger.info("【入队】寻找当前节点的父节点位置。k:{} parent:{}", k, parent);
Object e = queue[parent];
// 如果当前位置元素,大于父节点元素,则退出循环
if (key.compareTo((E) e) >= 0) {
logger.info("【入队】值比对,父节点:{} 目标节点:{}", e, key);
break;
}
// 相反父节点位置大于当前位置元素,则进行替换
logger.info("【入队】替换过程,父子节点位置替换,继续循环,父节点:{}, 存放到位置:{}", JSON.toJSONString(e), k);
queue[k] = e;
k = parent;
}
// 此时的k为key上浮的最终位置
queue[k] = key;
logger.info("【入队】完成 Idx:{},Val:{}, 当前队列:{}", k, JSON.toJSONString(key), JSON.toJSONString(queue));
}
上浮过程画图举例说明
出队操作
@Override
public E poll() {
if (size == 0) {
return null;
}
E result = (E) queue[0];
int s = --size;
E x = (E) queue[s];
queue[s] = null;
if (s != 0) {
//因为堆是一种完全二叉树结构,删除堆顶元素后,为了保持树的结构完整(即完全二叉树的特性),
// 我们通常会将最后一个元素移动到堆顶,然后通过下沉操作找到合适的位置。
siftDown(0, x);
}
return result;
}
下沉过程
private void siftDown(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
// 找到非叶子结点的位置(因为只有非叶子节点才有左右子节点)
int notLeaf = size >>> 1;
while (k < notLeaf) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
// 左右子节点比对,取最小的节点
if (right < size && ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) {
logger.info("【出队】左右子节点比对,获取最小值。left:{} right:{}", JSON.toJSONString(c), JSON.toJSONString(queue[right]));
// 右子节点小,把right索引值赋给child,child此时就是代表最小的子节点索引值
c = queue[child = right];
}
// 目标值与子节点比对,当目标值小于子节点值,退出循环。说明此时目标值所在位置适合,迁移完成。
if (key.compareTo((E) c) <= 0) {
break;
}
// 当目标值大于子节点值,位置替换,继续比较
logger.info("【出队】替换过程,节点的值比对。上节点:{},下节点:{} 位置替换", JSON.toJSONString(queue[k]), JSON.toJSONString(c));
queue[k] = c;
k = child;
}
// 迁移完成,将目标值存放到k位置,k此时应该是一个叶子结点位置或者目标值所在位置适合
logger.info("【出队】替换结果,最终更换位置。Idx:{} Val:{}", k, JSON.toJSONString(key));
queue[k] = key;
}
下沉过程画图举例说明
自测过程
package queue.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import queue.DelayQueue;
import queue.Delayed;
import queue.Queue;
import java.util.concurrent.TimeUnit;
public class QueueTest {
private static final Logger logger = LoggerFactory.getLogger(QueueTest.class);
public static void main(String[] args) throws InterruptedException {
Queue<Job> queue = new DelayQueue<>();
queue.add(new Job("1号", 1000L));
queue.add(new Job("3号", 3000L));
queue.add(new Job("5号", 5000L));
queue.add(new Job("11号", 11000L));
queue.add(new Job("4号", 4000L));
queue.add(new Job("6号", 6000L));
queue.add(new Job("7号", 7000L));
queue.add(new Job("12号", 12000L));
queue.add(new Job("15号", 15000L));
queue.add(new Job("10号", 10000L));
queue.add(new Job("9号", 9000L));
queue.add(new Job("8号", 8000L));
while (true) {
Job poll = queue.poll();
if (null == poll) {
Thread.sleep(10);
continue;
}
logger.info(poll.getName());
}
}
static class Job implements Delayed {
private final String name;
private final Long begin;
private final Long delayTime;
public Job(String name, Long delayTime) {
this.name = name;
this.begin = System.currentTimeMillis();
this.delayTime = delayTime;//延时时长
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(begin + delayTime - System.currentTimeMillis(), TimeUnit.MICROSECONDS);
}
public String getName() {
return name;
}
@Override
public int compareTo(Delayed o) {
Job job = (Job) o;
return (int) (this.getDelay(TimeUnit.MICROSECONDS) - job.getDelay(TimeUnit.MICROSECONDS));
}
}
}
测试结果如图:
常见问题
单端队列和双端队列,分别对应的实现类是哪个?
单端队列的接口为Queue:对应实现类有LinkedList、ArrayDeque、PriorityQueue
双端队列的接口为Deque:对应实现类有LinkedList、ArrayDeque
简述延迟队列/优先队列的实现方式
延迟队列是基于优先级队列实现的(依赖PriorityQueue),根据延迟队列的延迟时间在优先级队列中排序,除此之外,为了保证入队和出队的线程安全,引入可重入锁和Condition来唤醒等待消费的线程。
优先队列根据实现了Comparable接口的类进行排序,它的数据结构是数组实现的队列,但体现形式是一棵二叉堆树结构。
二叉堆插入/弹出元素的过程
- 插入过程:首先将元素插入到末尾,然后进行上浮的过程(如果元素大于其父节点元素说明位置合适,否则跟父节点位置进行替换,然后继续循环,直到当前位置的元素大于父节点元素或当前位置索引走到了0就退出for循环。此时的k就是上浮的最终位置,再赋值就好)。
- 弹出过程:首先弹出堆顶元素,然后将最后一个元素移动到堆顶,通过下沉操作找到合适的位置。下沉操作的流程是当前位置索引小于非叶子节点索引就进入while循环,因为只有非叶子节点才有子节点,循环里需要进行左右节点比对选出最小的子节点,然后父节点和子节点比较,大于就交换位置。走出while循环代表迁移完成,此时的k就是下沉的最终位置,再赋值就好。
延迟队列的使用场景
延迟队列可用于定时调度某个时间点执行的任务。例如,定时发送邮件、定时发布社交媒体帖子等。
延迟队列为什么添加信号量
信号量用于线程的等待和唤醒,比如说上文所说的入队操作时就要唤醒线程,因为可以减少线程不必要的等待时间。