Flink 网络消息队列 PrioritizedDeque

发布于:2025-09-05 ⋅ 阅读:(21) ⋅ 点赞:(0)

PrioritizedDeque 是一个支持元素优先级双端队列(Deque)

它的核心应用场景是在 SingleInputGate 中,用于调度有可用数据的 InputChannel。在 Flink 的数据流中,除了普通的业务数据(Buffer),还存在一些高优先级的控制事件,最典型的就是 CheckpointBarrier。为了保证 Checkpoint 机制的低延迟和正确性,CheckpointBarrier 必须被尽快处理,不能被大量的普通数据阻塞。

PrioritizedDeque 就是为了解决这个问题而设计的。它允许将某些元素(即包含高优先级事件的 InputChannel)标记为 “优先” ,这些优先元素会被排在队列的前面,从而被 SingleInputGate 优先取出和处理。

@Internal
public final class PrioritizedDeque<T> implements Iterable<T> {
    private final Deque<T> deque = new ArrayDeque<>();
    private int numPriorityElements;
    // ...
}
  • @Internal: 表明这是 Flink 内部使用的类,不属于公共 API,可能会在未来版本中发生变化。
  • 内部实现: 它内部包装了一个标准的 java.util.ArrayDeque 来存储所有元素,并通过一个整型变量 numPriorityElements 来记录当前队列头部有多少个是优先元素。

数据结构与核心思想

PrioritizedDeque 的巧妙之处在于它没有使用复杂的堆或者多个队列来实现优先级,而是利用了 ArrayDeque 的特性,在一个队列中维护了两个逻辑分区:

              <-- poll() from here
+------------------------------------+------------------------------------+
| P1 | P2 | ... | Pn                  | E1 | E2 | ... | Em                  |
+------------------------------------+------------------------------------+
|      Priority Elements             |      Non-Priority Elements         |
|      (numPriorityElements 个)      |                                    |
+------------------------------------+------------------------------------+
              addFirst() for priority      addLast() for non-priority
  • 优先元素区: 位于队列的头部。所有被标记为优先的元素都在这里。
  • 普通元素区: 位于队列的尾部。
  • numPriorityElements: 这个计数器是区分这两个区域的边界

当调用 poll() 方法时,它总是从 deque 的头部取元素。由于优先元素被放在头部,它们自然会被先取走。

添加元素

  • public void add(T element): 添加一个普通(非优先) 元素。

    public void add(T element) {
        deque.add(element); // 等价于 deque.addLast(element)
    }
    

    这个操作非常高效,直接在底层 ArrayDeque 的尾部添加元素,时间复杂度为 O(1)。

  • public void addPriorityElement(T element): 添加一个优先元素。

    public void addPriorityElement(T element) {
        // 优先元素很少,这是最常见的优化路径
        if (numPriorityElements == 0) {
            deque.addFirst(element);
        } else if (numPriorityElements == deque.size()) {
            // 队列里全是优先元素
            deque.add(element); // 直接加到队尾即可
        } else {
            // 这是最坏的情况:队列中既有优先元素,又有普通元素
            // 1. 把所有现有的优先元素(头部n个)取出来,暂存
            final ArrayDeque<T> priorPriority = new ArrayDeque<>(numPriorityElements);
            for (int index = 0; index < numPriorityElements; index++) {
                priorPriority.addFirst(deque.poll());
            }
            // 2. 把新元素加到队头
            deque.addFirst(element);
            // 3. 把暂存的旧优先元素再按原顺序加回到队头
            for (final T priorityEvent : priorPriority) {
                deque.addFirst(priorityEvent);
            }
        }
        numPriorityElements++;
    }
    

    这个方法的实现体现了其设计哲学:为最常见的场景优化

    • 最佳情况 (O(1)): 当队列中没有优先元素时,直接 addFirst。这是 Flink 运行时期望的常态。
    • 最坏情况 (O(N_priority)): 当队列中已经存在优先元素时,需要先把它们全部移出,再把新元素和旧元素一起放回去。这个操作的成本与已有优先元素的数量成正比。设计者认为优先元素是稀少且短暂存在的,所以这个开销可以接受。

提升优先级

  • public void prioritize(T element): 将一个已经存在于队列中的元素提升为优先元素。
    public void prioritize(T element) {
        final Iterator<T> iterator = deque.iterator();
        // 1. 检查是否已经是优先元素,如果是,直接返回
        for (int i = 0; i < numPriorityElements && iterator.hasNext(); i++) {
            if (iterator.next() == element) {
                return;
            }
        }
        // 2. 优化:如果它正好是第一个非优先元素,只需增加计数器即可
        if (iterator.hasNext() && iterator.next() == element) {
            numPriorityElements++;
            return;
        }
        // 3. 常规路径:从队列中找到并移除它
        while (iterator.hasNext()) {
            if (iterator.next() == element) {
                iterator.remove();
                break;
            }
        }
        // 4. 作为优先元素重新添加
        addPriorityElement(element);
    }
    
    这个方法用在当一个普通的 InputChannel 突然收到了一个 CheckpointBarrier 时。SingleInputGate 需要将这个 Channel 的优先级提升,确保它能被优先处理。这个方法的实现同样包含了对常见情况的优化。

取出元素

  • public T poll(): 从队列头部取出一个元素。

    @Nullable
    public T poll() {
        final T polled = deque.poll(); // 等价于 deque.pollFirst()
        if (polled != null && numPriorityElements > 0) {
            numPriorityElements--;
        }
        return polled;
    }
    

    这个操作非常简单直接。它总是从 deque 的头部取元素。如果取出的元素是一个优先元素(即 numPriorityElements > 0),则将计数器减一。这个操作的时间复杂度是 O(1)。

  • public T peek(): 查看队头元素但不移除。

    @Nullable
    public T peek() {
        return deque.peek(); // 等价于 deque.peekFirst()
    }
    

    同样是 O(1) 操作。

总结

PrioritizedDeque 是一个为特定场景(Flink 网络输入调度)高度优化的数据结构。

  • 设计目标: 在一个队列中同时管理优先和非优先元素,并保证优先元素总是被先处理。
  • 核心思想: 使用单个 ArrayDeque 和一个计数器 numPriorityElements 来划分出两个逻辑区域,避免了管理多个队列的复杂性。
  • 性能特点:
    • 非优先元素的操作(添加、轮询)非常高效,均为 O(1)。
    • 优先元素的操作在大多数情况下(没有其他优先元素时)也是 O(1)。
    • 在最坏情况下(需要移动多个已存在的优先元素),性能会下降,但这种场景被认为是稀有的。
  • 应用场景: 它是 SingleInputGate 实现 CheckpointBarrier 等控制事件优先处理的关键。当一个 InputChannel 收到 CheckpointBarrier 时,SingleInputGate 会调用 prioritize 或 addPriorityElement 将该 Channel 放入 PrioritizedDeque 的优先区,从而保证 Task 能及时响应 Checkpoint,避免因数据积压导致 Checkpoint 超时。

总而言之,PrioritizedDeque 是 Flink 系统设计中“为常见路径优化”思想的一个绝佳范例。


网站公告

今日签到

点亮在社区的每一天
去签到