【重学数据结构】队列 Queue

发布于:2025-07-23 ⋅ 阅读:(18) ⋅ 点赞:(0)

目录

什么是队列?

延迟队列介绍

说明

二叉堆结构

实现延迟队列 

实现说明

入队操作

为什么元素是队列头部就要唤醒正在等待的消费者线程? 

出队操作

优先级队列是如何实现的?

入队操作 

 出队操作

自测过程 


什么是队列?

队列是一种先进先出的线性数据结构,将元素添加到队列后的操作称为入队,从队列中移除元素的操作称为出队。队列还分为 单端队列(queue) 和 双端队列(deque) 。

  • 从理论上讲,队列的一个特征是它没有特定的容量。不管已经包含多少元素,总是可以再添加一个新元素。
  • 队列既可以是数组实现也可以是链表实现。所以当我们在 Java 中使用队列的时候,Deque 的实现类就是;LinkedList 和 ArrayDeque的实现类。
  • 队列不只是单端从一个口入另外一个口出,也可以是双端队列。例如在 Java 中 Queue 是单端队列接口、Deque 是双端队列接口,都有对应的实现类

延迟队列介绍

这里我们来扩展实现一个延迟队列,并在这个过程中会涉及到阻塞队列、优先队列的使用。通过这样的一个手写源码来学习队列的扩展使用。

说明

DelayQueue 是一个 BlockingQueue(无界阻塞)队列,它封装了一个使用完全二叉堆排序元素的 PriorityQueue(优先队列)。在添加元素时使用 Delay(延迟时间)作为排序条件,延迟最小的元素会优先放到队首。
这个延迟队列中用到的排序方式就是 PriorityQueue 优先队列,它的数据结构是数组实现的队列,但体现形式是一棵二叉堆树结构。在元素存放时,通过对存放元素的比较和替换形成二叉堆结构。 

二叉堆结构

二叉堆是一种特殊结构的堆,它的表现形态可以是一棵完整或近似二叉树的结构。我们要实现的延迟队列中的元素存放,使用的就是 PriorityQueue 实现的平衡二叉堆结构,数据以队列形式存放在基础数组中。 

父子节点索引关系:

  • 假如父节点为queue[n],那么左子节点为queue[2n+1],右子节点为queue[2n+2]
  • 任意孩子节点的父节点位置,都是 (n-1)>>>1 相当于减1后除2取整

节点间大小关系:

  • 父节点小于等于任意孩子节点
  • 同一层级的两个孩子节点大小不需要维护,它是在弹出元素的时候进行判断的 

实现延迟队列 

实现说明

  • 延迟队列的使用,是以在 DelayQueue 中存放实现了 Delayed 延迟接口的对象。因为只有实现这个对象,才能比较出当前元素与所需存放到对应位置的一个比对计算过程。
  • 另外这里的核心点包括:PriorityQueue —— 优先队列、ReentrantLock —— 可重入锁、Condition —— 信号量 

包含属性

public class DelayQueue<E extends Delayed> implements BlockingQueue<E>{
    // 可重入锁,用于保证线程安全
    private final ReentrantLock lock = new ReentrantLock();
    // 优先级队列
    private final PriorityQueue<E> pq = new PriorityQueue();
    // 条件变量,用于实现阻塞等待
    private final Condition available = lock.newCondition();
}

入队操作

    @Override
    public boolean add(E e) {
        offer(e);
        return true;
    }

    @Override
    public boolean offer(E e) {
        lock.lock();
        try {
            pq.offer(e);
            
            // 如果新添加的元素成为队列头部(延迟时间最短)
            // 需要唤醒可能正在等待的消费者线程
            if (pq.peek() == e) {
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

为什么元素是队列头部就要唤醒正在等待的消费者线程? 

考虑以下场景:

  • 队列中原有元素A的延迟时间是10秒
  • 消费者线程正在等待A到期
  • 此时插入新元素B,延迟时间只有2秒

如果不唤醒等待的消费者线程:

  • 消费者线程会继续等待10秒(基于元素A的延迟时间)
  • 但实际上2秒后元素B就可以被消费了
  • 这样就造成了8秒的不必要等待 

出队操作

    @Override
    public E poll() {
        lock.lock();
        try {
            E first = pq.peek();
            // 如果队列为空,或者头部元素延迟时间未到
            if (first == null || first.getDelay(NANOSECONDS) > 0) {
                return null;
            } else {
                return pq.poll();
            }
        } finally {
            lock.unlock();
        }
    }

为什么没有实现接口,first.getDelay也能正确获取到延迟时间?

优先级队列是如何实现的?

包含属性

public class PriorityQueue<E> implements Queue<E> {
    // 日志记录器,用于记录队列操作信息
    private Logger logger = LoggerFactory.getLogger(PriorityQueue.class);
    
    // 队列默认初始容量,使用堆结构的典型初始大小
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    // 使用Object数组存储队列元素,保存的是Comparable对象
    private Object[] queue;
    
    // 记录队列中实际元素的数量
    private int size;
}

入队操作 

    @Override
    public boolean offer(E e) {
        if (e == null) {
            throw new NullPointerException();
        }
        // 看是否需要扩容
        int i = size;
        if (i >= queue.length) {
            // i+1表示插入新元素后所需的最小容量
            grow(i + 1);
        }
        size = i + 1;
        // 插入元素
        if (i == 0) {
            queue[i] = e;
        } else {
            // 目标值上浮
            siftUp(i, e);
        }
        return true;
    }

 上浮过程

private void siftUp(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    logger.info("【入队】元素:{} 当前队列:{}", JSON.toJSONString(key), JSON.toJSONString(queue));
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        logger.info("【入队】寻找当前节点的父节点位置。k:{} parent:{}", k, parent);
        Object e = queue[parent];
        // 如果当前位置元素,大于父节点元素,则退出循环
        if (key.compareTo((E) e) >= 0) {
            logger.info("【入队】值比对,父节点:{} 目标节点:{}", e, key);
            break;
        }
        // 相反父节点位置大于当前位置元素,则进行替换
        logger.info("【入队】替换过程,父子节点位置替换,继续循环,父节点:{}, 存放到位置:{}", JSON.toJSONString(e), k);
        queue[k] = e;
        k = parent;
    }
    // 此时的k为key上浮的最终位置
    queue[k] = key;
    logger.info("【入队】完成 Idx:{},Val:{}, 当前队列:{}", k, JSON.toJSONString(key), JSON.toJSONString(queue));
}

上浮过程画图举例说明 

 

 出队操作

    @Override
    public E poll() {
        if (size == 0) {
            return null;
        }
        E result = (E) queue[0];
        int s = --size;
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0) {
            //因为堆是一种完全二叉树结构,删除堆顶元素后,为了保持树的结构完整(即完全二叉树的特性),
            // 我们通常会将最后一个元素移动到堆顶,然后通过下沉操作找到合适的位置。
            siftDown(0, x);
        }
        return result;
    }

下沉过程

private void siftDown(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    // 找到非叶子结点的位置(因为只有非叶子节点才有左右子节点)
    int notLeaf = size >>> 1;
    while (k < notLeaf) {
        int child = (k << 1) + 1;
        Object c = queue[child];
        int right = child + 1;
        // 左右子节点比对,取最小的节点
        if (right < size && ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) {
            logger.info("【出队】左右子节点比对,获取最小值。left:{} right:{}", JSON.toJSONString(c), JSON.toJSONString(queue[right]));
            // 右子节点小,把right索引值赋给child,child此时就是代表最小的子节点索引值
            c = queue[child = right];
        }
        // 目标值与子节点比对,当目标值小于子节点值,退出循环。说明此时目标值所在位置适合,迁移完成。
        if (key.compareTo((E) c) <= 0) {
            break;
        }
        // 当目标值大于子节点值,位置替换,继续比较
        logger.info("【出队】替换过程,节点的值比对。上节点:{},下节点:{} 位置替换", JSON.toJSONString(queue[k]), JSON.toJSONString(c));
        queue[k] = c;
        k = child;
    }
    // 迁移完成,将目标值存放到k位置,k此时应该是一个叶子结点位置或者目标值所在位置适合
    logger.info("【出队】替换结果,最终更换位置。Idx:{} Val:{}", k, JSON.toJSONString(key));
    queue[k] = key;
}

下沉过程画图举例说明 

自测过程 

package queue.test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import queue.DelayQueue;
import queue.Delayed;
import queue.Queue;

import java.util.concurrent.TimeUnit;

public class QueueTest {

    private static final Logger logger = LoggerFactory.getLogger(QueueTest.class);

    public static void main(String[] args) throws InterruptedException {
        Queue<Job> queue = new DelayQueue<>();

        queue.add(new Job("1号", 1000L));
        queue.add(new Job("3号", 3000L));
        queue.add(new Job("5号", 5000L));
        queue.add(new Job("11号", 11000L));
        queue.add(new Job("4号", 4000L));
        queue.add(new Job("6号", 6000L));
        queue.add(new Job("7号", 7000L));
        queue.add(new Job("12号", 12000L));
        queue.add(new Job("15号", 15000L));
        queue.add(new Job("10号", 10000L));
        queue.add(new Job("9号", 9000L));
        queue.add(new Job("8号", 8000L));

        while (true) {
            Job poll = queue.poll();
            if (null == poll) {
                Thread.sleep(10);
                continue;
            }
            logger.info(poll.getName());
        }
    }

    static class Job implements Delayed {

        private final String name;
        private final Long begin;
        private final Long delayTime;

        public Job(String name, Long delayTime) {
            this.name = name;
            this.begin = System.currentTimeMillis();
            this.delayTime = delayTime;//延时时长
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(begin + delayTime - System.currentTimeMillis(), TimeUnit.MICROSECONDS);
        }

        public String getName() {
            return name;
        }

        @Override
        public int compareTo(Delayed o) {
            Job job = (Job) o;
            return (int) (this.getDelay(TimeUnit.MICROSECONDS) - job.getDelay(TimeUnit.MICROSECONDS));
        }
    }

}

测试结果如图:

image.png

常见问题

单端队列和双端队列,分别对应的实现类是哪个?

单端队列的接口为Queue:对应实现类有LinkedList、ArrayDeque、PriorityQueue

双端队列的接口为Deque:对应实现类有LinkedList、ArrayDeque  

简述延迟队列/优先队列的实现方式 

延迟队列是基于优先级队列实现的(依赖PriorityQueue),根据延迟队列的延迟时间在优先级队列中排序,除此之外,为了保证入队和出队的线程安全,引入可重入锁和Condition来唤醒等待消费的线程。 

优先队列根据实现了Comparable接口的类进行排序,它的数据结构是数组实现的队列,但体现形式是一棵二叉堆树结构。 

二叉堆插入/弹出元素的过程 

  • 插入过程:首先将元素插入到末尾,然后进行上浮的过程(如果元素大于其父节点元素说明位置合适,否则跟父节点位置进行替换,然后继续循环,直到当前位置的元素大于父节点元素或当前位置索引走到了0就退出for循环。此时的k就是上浮的最终位置,再赋值就好)。
  • 弹出过程:首先弹出堆顶元素,然后将最后一个元素移动到堆顶,通过下沉操作找到合适的位置。下沉操作的流程是当前位置索引小于非叶子节点索引就进入while循环,因为只有非叶子节点才有子节点,循环里需要进行左右节点比对选出最小的子节点,然后父节点和子节点比较,大于就交换位置。走出while循环代表迁移完成,此时的k就是下沉的最终位置,再赋值就好。 

延迟队列的使用场景

延迟队列可用于定时调度某个时间点执行的任务。例如,定时发送邮件、定时发布社交媒体帖子等。 

延迟队列为什么添加信号量 

信号量用于线程的等待和唤醒,比如说上文所说的入队操作时就要唤醒线程,因为可以减少线程不必要的等待时间。 


网站公告

今日签到

点亮在社区的每一天
去签到