python标准库--heapq - 堆队列算法(优先队列)在算法比赛的应用

发布于:2025-05-13 ⋅ 阅读:(11) ⋅ 点赞:(0)

目录

一、基本操作

1.构造堆

2.访问堆顶元素(返回堆顶元素)

3.删除堆顶元素(返回堆顶元素)

4.插入新元素,时间复杂度为 O (log n)

5.  插入并删除元素(高效操作)

6. 高级操作- 合并多个有序序列

 7.高级操作-获取最大 / 最小的 K 个元素

8.高级操作-实现最大堆 

 9.自定义对象的堆

 10.时间复杂度

二、实例

1.优先队列

2. Top-K 问题

 3. 合并有序序列

4. 动态维护中位数

5. 区间调度问题


一、基本操作

1.构造堆

import heapq
# 方法1
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
print(heap)  # 输出: [1, 3, 2](最小堆的内部结构)

# 方法2
arr = [3, 1, 2]
heapq.heapify(arr)
print(arr)  # 输出: [1, 3, 2]

2.访问堆顶元素(返回堆顶元素)

arr[0]

3.删除堆顶元素(返回堆顶元素)

heapq.heappop(arr)

4.插入新元素,时间复杂度为 O (log n)

heapq.heappush(arr, 5)

5.  插入并删除元素(高效操作)

使用 heappushpop() 或 heapreplace()

  • heappushpop(heap, item):先插入 item,再删除并返回堆顶元素。
  • heapreplace(heap, item):先删除并返回堆顶元素,再插入 item
arr = [2, 3, 5]
print(heapq.heappushpop(arr, 0))  # 输出: 0(插入0后弹出最小值0)
print(arr)                         # 输出: [2, 3, 5]

print(heapq.heapreplace(arr, 1))   # 输出: 2(弹出原堆顶2,插入1)
print(arr)                         # 输出: [1, 3, 5]

6. 高级操作- 合并多个有序序列

a = [1, 4, 7]
b = [2, 5, 8]
merged = heapq.merge(a, b)
print(list(merged))  # 输出: [1, 2, 4, 5, 7, 8]

 7.高级操作-获取最大 / 最小的 K 个元素

  • nlargest(k, iterable, key=None):返回可迭代对象中最大的 K 个元素。
  • nsmallest(k, iterable, key=None):返回可迭代对象中最小的 K 个元素。
arr = [3, 1, 4, 1, 5, 9, 2, 6, 5]
print(heapq.nlargest(3, arr))     # 输出: [9, 6, 5]
print(heapq.nsmallest(3, arr))    # 输出: [1, 1, 2]

# 结合 key 参数(例如对字典按值排序)
students = [
    {'name': 'Alice', 'score': 85},
    {'name': 'Bob', 'score': 92},
    {'name': 'Charlie', 'score': 78}
]
print(heapq.nlargest(2, students, key=lambda x: x['score']))
# 输出: [{'name': 'Bob', 'score': 92}, {'name': 'Alice', 'score': 85}]

8.高级操作-实现最大堆 

heapq 默认为最小堆。若需最大堆,可将元素取负后存储,取出时再取负还原。

arr = [3, 1, 4]
max_heap = [-x for x in arr]
heapq.heapify(max_heap)

# 获取最大值
max_val = -heapq.heappop(max_heap)
print(max_val)  # 输出: 4

 9.自定义对象的堆

通过实现 __lt__ 方法定义对象的比较规则

class Item:
    def __init__(self, value, priority):
        self.value = value
        self.priority = priority
    
    def __lt__(self, other):
        # 定义优先级比较规则(优先级高的元素更小)
        return self.priority < other.priority

heap = []
heapq.heappush(heap, Item("task1", 3))
heapq.heappush(heap, Item("task2", 1))
heapq.heappush(heap, Item("task3", 2))

# 按优先级出队
print(heapq.heappop(heap).value)  # 输出: "task2"(优先级1最高)

 10.时间复杂度

操作 代码示例 时间复杂度
插入元素 heapq.heappush(heap, item) O(log n)
删除堆顶元素 heapq.heappop(heap) O(log n)
获取堆顶元素 heap[0] O(1)
列表转换为堆 heapq.heapify(arr) O(n)
获取最大 / 最小 K 元素 heapq.nlargest(k, arr) O(n log k)

二、实例

1.优先队列

典型问题

  • Dijkstra 算法(单源最短路径)
  • 哈夫曼编码(最优前缀编码)
  • 任务调度(按优先级执行任务)

示例:Dijkstra 算法

import heapq

def dijkstra(graph, start):
    n = len(graph)
    dist = [float('inf')] * n
    dist[start] = 0
    heap = [(0, start)]  # (距离, 节点)
    
    while heap:
        d, u = heapq.heappop(heap)
        if d > dist[u]:
            continue
        for v, w in graph[u]:
            if dist[v] > d + w:
                dist[v] = d + w
                heapq.heappush(heap, (dist[v], v))
    return dist

# 示例图的邻接表表示
graph = [
    [(1, 4), (2, 1)],  # 节点0的邻接节点及边权
    [(3, 2)],          # 节点1的邻接节点及边权
    [(1, 2), (3, 5)],  # 节点2的邻接节点及边权
    []                 # 节点3的邻接节点及边权
]

print(dijkstra(graph, 0))  # 输出: [0, 3, 1, 5]

2. Top-K 问题

求数组中最大 / 最小的 K 个元素,时间复杂度为 O (N log K)。

示例:最小的 K 个数

import heapq

def smallest_k(arr, k):
    if k == 0:
        return []
    heap = []
    for num in arr:
        # 维护一个大小为K的最大堆(用负数模拟)
        if len(heap) < k:
            heapq.heappush(heap, -num)
        else:
            if -num > heap[0]:  # 当前数比堆顶的数更小
                heapq.heappop(heap)
                heapq.heappush(heap, -num)
    return [-x for x in heap]

arr = [3, 2, 1, 5, 6, 4]
print(smallest_k(arr, 2))  # 输出: [1, 2]

 3. 合并有序序列

高效合并多个有序数组 / 链表,时间复杂度为 O (N log M)(M 为序列数)。

示例:合并 K 个有序链表

import heapq
from typing import List, Optional

class ListNode:
    def __init__(self, val=0, next=None):
        self.val = val
        self.next = next

def merge_k_lists(lists: List[Optional[ListNode]]) -> Optional[ListNode]:
    dummy = ListNode(0)
    curr = dummy
    heap = []
    
    # 初始化堆:将每个链表的头节点加入堆
    for i, node in enumerate(lists):
        if node:
            heapq.heappush(heap, (node.val, i, node))
    
    while heap:
        val, idx, node = heapq.heappop(heap)
        curr.next = node
        curr = curr.next
        if node.next:
            heapq.heappush(heap, (node.next.val, idx, node.next))
    
    return dummy.next

4. 动态维护中位数

使用两个堆(最大堆 + 最小堆)动态维护数据流的中位数。

示例:数据流的中位数

import heapq

class MedianFinder:
    def __init__(self):
        self.max_heap = []  # 存储较小的一半数(取负)
        self.min_heap = []  # 存储较大的一半数
    
    def addNum(self, num: int) -> None:
        if not self.max_heap or num <= -self.max_heap[0]:
            heapq.heappush(self.max_heap, -num)
        else:
            heapq.heappush(self.min_heap, num)
        
        # 平衡两个堆的大小
        if len(self.max_heap) > len(self.min_heap) + 1:
            heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap))
        elif len(self.min_heap) > len(self.max_heap):
            heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
    
    def findMedian(self) -> float:
        if len(self.max_heap) == len(self.min_heap):
            return (-self.max_heap[0] + self.min_heap[0]) / 2
        else:
            return -self.max_heap[0]

# 示例用法
median_finder = MedianFinder()
median_finder.addNum(1)
median_finder.addNum(2)
print(median_finder.findMedian())  # 输出: 1.5

 

5. 区间调度问题

例如会议室预订、活动安排等,需高效判断区间重叠。

示例:会议室预订

import heapq

def min_meeting_rooms(intervals):
    if not intervals:
        return 0
    
    # 按开始时间排序
    intervals.sort(key=lambda x: x[0])
    heap = []  # 存储会议室的结束时间
    
    for start, end in intervals:
        if heap and start >= heap[0]:
            # 最早结束的会议室可用,更新该会议室的结束时间
            heapq.heappop(heap)
        # 分配新会议室或更新已分配会议室的结束时间
        heapq.heappush(heap, end)
    
    return len(heap)

intervals = [[0, 30], [5, 10], [15, 20]]
print(min_meeting_rooms(intervals))  # 输出: 2

 


网站公告

今日签到

点亮在社区的每一天
去签到