PrioritizedDeque
是一个支持元素优先级的双端队列(Deque)。
它的核心应用场景是在 SingleInputGate
中,用于调度有可用数据的 InputChannel
。在 Flink 的数据流中,除了普通的业务数据(Buffer
),还存在一些高优先级的控制事件,最典型的就是 CheckpointBarrier
。为了保证 Checkpoint 机制的低延迟和正确性,CheckpointBarrier
必须被尽快处理,不能被大量的普通数据阻塞。
PrioritizedDeque
就是为了解决这个问题而设计的。它允许将某些元素(即包含高优先级事件的 InputChannel
)标记为 “优先” ,这些优先元素会被排在队列的前面,从而被 SingleInputGate
优先取出和处理。
@Internal
public final class PrioritizedDeque<T> implements Iterable<T> {
private final Deque<T> deque = new ArrayDeque<>();
private int numPriorityElements;
// ...
}
@Internal
: 表明这是 Flink 内部使用的类,不属于公共 API,可能会在未来版本中发生变化。- 内部实现: 它内部包装了一个标准的
java.util.ArrayDeque
来存储所有元素,并通过一个整型变量numPriorityElements
来记录当前队列头部有多少个是优先元素。
数据结构与核心思想
PrioritizedDeque
的巧妙之处在于它没有使用复杂的堆或者多个队列来实现优先级,而是利用了 ArrayDeque
的特性,在一个队列中维护了两个逻辑分区:
<-- poll() from here
+------------------------------------+------------------------------------+
| P1 | P2 | ... | Pn | E1 | E2 | ... | Em |
+------------------------------------+------------------------------------+
| Priority Elements | Non-Priority Elements |
| (numPriorityElements 个) | |
+------------------------------------+------------------------------------+
addFirst() for priority addLast() for non-priority
- 优先元素区: 位于队列的头部。所有被标记为优先的元素都在这里。
- 普通元素区: 位于队列的尾部。
numPriorityElements
: 这个计数器是区分这两个区域的边界。
当调用 poll()
方法时,它总是从 deque
的头部取元素。由于优先元素被放在头部,它们自然会被先取走。
添加元素
public void add(T element)
: 添加一个普通(非优先) 元素。public void add(T element) { deque.add(element); // 等价于 deque.addLast(element) }
这个操作非常高效,直接在底层
ArrayDeque
的尾部添加元素,时间复杂度为 O(1)。public void addPriorityElement(T element)
: 添加一个优先元素。public void addPriorityElement(T element) { // 优先元素很少,这是最常见的优化路径 if (numPriorityElements == 0) { deque.addFirst(element); } else if (numPriorityElements == deque.size()) { // 队列里全是优先元素 deque.add(element); // 直接加到队尾即可 } else { // 这是最坏的情况:队列中既有优先元素,又有普通元素 // 1. 把所有现有的优先元素(头部n个)取出来,暂存 final ArrayDeque<T> priorPriority = new ArrayDeque<>(numPriorityElements); for (int index = 0; index < numPriorityElements; index++) { priorPriority.addFirst(deque.poll()); } // 2. 把新元素加到队头 deque.addFirst(element); // 3. 把暂存的旧优先元素再按原顺序加回到队头 for (final T priorityEvent : priorPriority) { deque.addFirst(priorityEvent); } } numPriorityElements++; }
这个方法的实现体现了其设计哲学:为最常见的场景优化。
- 最佳情况 (O(1)): 当队列中没有优先元素时,直接
addFirst
。这是 Flink 运行时期望的常态。 - 最坏情况 (O(N_priority)): 当队列中已经存在优先元素时,需要先把它们全部移出,再把新元素和旧元素一起放回去。这个操作的成本与已有优先元素的数量成正比。设计者认为优先元素是稀少且短暂存在的,所以这个开销可以接受。
- 最佳情况 (O(1)): 当队列中没有优先元素时,直接
提升优先级
public void prioritize(T element)
: 将一个已经存在于队列中的元素提升为优先元素。
这个方法用在当一个普通的public void prioritize(T element) { final Iterator<T> iterator = deque.iterator(); // 1. 检查是否已经是优先元素,如果是,直接返回 for (int i = 0; i < numPriorityElements && iterator.hasNext(); i++) { if (iterator.next() == element) { return; } } // 2. 优化:如果它正好是第一个非优先元素,只需增加计数器即可 if (iterator.hasNext() && iterator.next() == element) { numPriorityElements++; return; } // 3. 常规路径:从队列中找到并移除它 while (iterator.hasNext()) { if (iterator.next() == element) { iterator.remove(); break; } } // 4. 作为优先元素重新添加 addPriorityElement(element); }
InputChannel
突然收到了一个CheckpointBarrier
时。SingleInputGate
需要将这个 Channel 的优先级提升,确保它能被优先处理。这个方法的实现同样包含了对常见情况的优化。
取出元素
public T poll()
: 从队列头部取出一个元素。@Nullable public T poll() { final T polled = deque.poll(); // 等价于 deque.pollFirst() if (polled != null && numPriorityElements > 0) { numPriorityElements--; } return polled; }
这个操作非常简单直接。它总是从
deque
的头部取元素。如果取出的元素是一个优先元素(即numPriorityElements > 0
),则将计数器减一。这个操作的时间复杂度是 O(1)。public T peek()
: 查看队头元素但不移除。@Nullable public T peek() { return deque.peek(); // 等价于 deque.peekFirst() }
同样是 O(1) 操作。
总结
PrioritizedDeque
是一个为特定场景(Flink 网络输入调度)高度优化的数据结构。
- 设计目标: 在一个队列中同时管理优先和非优先元素,并保证优先元素总是被先处理。
- 核心思想: 使用单个
ArrayDeque
和一个计数器numPriorityElements
来划分出两个逻辑区域,避免了管理多个队列的复杂性。 - 性能特点:
- 对非优先元素的操作(添加、轮询)非常高效,均为 O(1)。
- 对优先元素的操作在大多数情况下(没有其他优先元素时)也是 O(1)。
- 在最坏情况下(需要移动多个已存在的优先元素),性能会下降,但这种场景被认为是稀有的。
- 应用场景: 它是
SingleInputGate
实现CheckpointBarrier
等控制事件优先处理的关键。当一个InputChannel
收到CheckpointBarrier
时,SingleInputGate
会调用prioritize
或addPriorityElement
将该 Channel 放入PrioritizedDeque
的优先区,从而保证 Task 能及时响应 Checkpoint,避免因数据积压导致 Checkpoint 超时。
总而言之,PrioritizedDeque
是 Flink 系统设计中“为常见路径优化”思想的一个绝佳范例。