目录
问题描述
中位数是有序整数列表中的中间值。如果列表的大小是偶数,则没有中间值,中位数是两个中间值的平均值。
例如,
[2,3,4]
的中位数是3
[2,3]
的中位数是(2 + 3) / 2 = 2.5
设计一个支持以下两种操作的数据结构:
void addNum(int num)
- 从数据流中添加一个整数到数据结构中。double findMedian()
- 返回目前所有元素的中位数。
示例 1:
输入:
["MedianFinder","addNum","addNum","findMedian","addNum","findMedian"]
[[],[1],[2],[],[3],[]]
输出:[null,null,null,1.50000,null,2.00000]
示例 2:
输入:
["MedianFinder","addNum","findMedian","addNum","findMedian"]
[[],[2],[],[3],[]]
输出:[null,null,2.00000,null,2.50000]
提示:
- 最多会对
addNum
、findMedian
进行 50000 次调用。 0 <= num <= 10000
原题链接:
示例
示例 1:
输入:
["MedianFinder","addNum","addNum","findMedian","addNum","findMedian"]
[[],[1],[2],[],[3],[]]
输出:
[null,null,null,1.50000,null,2.00000]
解释:
MedianFinder mf = new MedianFinder();
mf.addNum(1); // 当前数据流: [1]
mf.addNum(2); // 当前数据流: [1, 2]
mf.findMedian(); // 返回 1.5
mf.addNum(3); // 当前数据流: [1, 2, 3]
mf.findMedian(); // 返回 2.0
思路解析
本题要求设计一个数据结构,能够高效地从数据流中添加数字并随时返回中位数。由于数据流可能非常大(最多50000次操作),我们需要一种高效的方法来维护数据的中位数。
方法一:使用两个堆(最大堆和最小堆)
核心思路
使用两个堆来维护数据流的前半部分和后半部分:
- 最大堆(Max-Heap):用于存储数据流中较小的一半元素。堆顶是这部分的最大值。
- 最小堆(Min-Heap):用于存储数据流中较大的一半元素。堆顶是这部分的最小值。
通过这样的划分,能够保证:
- 当总元素个数为奇数时,最大堆中的元素比最小堆多一个。
- 当总元素个数为偶数时,两个堆中的元素个数相等。
详细步骤
初始化:
- 创建一个最大堆(使用负数模拟,因为 Python 的
heapq
模块只支持最小堆)。 - 创建一个最小堆。
- 创建一个最大堆(使用负数模拟,因为 Python 的
添加数字 (
addNum
):- 将新数字添加到最大堆(较小的一半)。
- 将最大堆的堆顶元素(即较小的一半中的最大值)移动到最小堆中,以保持平衡。
- 如果最小堆的元素多于最大堆,则将最小堆的堆顶元素(即较大的一半中的最小值)移动回最大堆中。
查找中位数 (
findMedian
):- 如果两个堆的大小相同,中位数是两个堆顶元素的平均值。
- 如果堆的大小不同,中位数是较大堆的堆顶元素。
示例分析
以示例1为例:
["MedianFinder","addNum","addNum","findMedian","addNum","findMedian"]
[[],[1],[2],[],[3],[]]
操作1:
addNum(1)
- 最大堆:
[-1]
- 最小堆:
[]
- 最大堆:
操作2:
addNum(2)
- 最大堆:
[-1, -2]
- 最小堆:
[2]
(将最大堆顶1
移到最小堆)
- 最大堆:
操作3:
findMedian()
- 中位数 = (1 + 2) / 2 = 1.5
操作4:
addNum(3)
- 最大堆:
[-2, -1]
- 最小堆:
[2, 3]
- 最大堆:
操作5:
findMedian()
- 中位数 = 2
优势
时间复杂度:
addNum
: O(log n) 由于堆的插入和删除操作。findMedian
: O(1) 直接访问堆顶元素。
空间复杂度:
- O(n),用于存储所有元素在两个堆中。
适用场景
- 需要实时地获取中位数,且数据流量较大。
代码实现
Python 实现(方法一:使用两个堆)
import heapq
class MedianFinder:
def __init__(self):
"""
初始化两个堆:
- 小顶堆(用于存储较大的一半元素)
- 大顶堆(用于存储较小的一半元素,通过取负数实现)
"""
self.min_heap = [] # 小顶堆
self.max_heap = [] # 大顶堆(存储负数)
def addNum(self, num: int) -> None:
"""
添加一个数字到数据结构中。
"""
# 将数添加到大顶堆(通过取负数实现最大堆)
heapq.heappush(self.max_heap, -num)
# 将大顶堆的堆顶元素移动到小顶堆
if self.max_heap:
top = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, top)
# 平衡两个堆的大小
if len(self.min_heap) > len(self.max_heap):
top = heapq.heappop(self.min_heap)
heapq.heappush(self.max_heap, -top)
def findMedian(self) -> float:
"""
返回当前所有元素的中位数。
"""
if len(self.max_heap) > len(self.min_heap):
return -self.max_heap[0]
elif len(self.max_heap) < len(self.min_heap):
return self.min_heap[0]
else:
return (-self.max_heap[0] + self.min_heap[0]) / 2.0
测试代码
以下是针对上述方法的测试代码,使用 unittest
框架进行验证。
import unittest
class TestMedianFinder(unittest.TestCase):
def test_example1(self):
mf = MedianFinder()
mf.addNum(1)
mf.addNum(2)
self.assertEqual(mf.findMedian(), 1.5, "示例1第一次findMedian失败")
mf.addNum(3)
self.assertEqual(mf.findMedian(), 2.0, "示例1第二次findMedian失败")
def test_example2(self):
mf = MedianFinder()
mf.addNum(2)
self.assertEqual(mf.findMedian(), 2.0, "示例2第一次findMedian失败")
mf.addNum(3)
self.assertEqual(mf.findMedian(), 2.5, "示例2第二次findMedian失败")
def test_single_element(self):
mf = MedianFinder()
mf.addNum(5)
self.assertEqual(mf.findMedian(), 5.0, "单元素测试失败")
def test_two_elements(self):
mf = MedianFinder()
mf.addNum(1)
mf.addNum(2)
self.assertEqual(mf.findMedian(), 1.5, "两元素测试失败")
def test_multiple_elements(self):
mf = MedianFinder()
elements = [5, 15, 1, 3]
medians = [5.0, 10.0, 5.0, 4.0]
for num, expected in zip(elements, medians):
mf.addNum(num)
self.assertEqual(mf.findMedian(), expected, f"多元素测试失败,添加{num}后中位数应为{expected}")
def test_duplicates(self):
mf = MedianFinder()
mf.addNum(2)
mf.addNum(2)
mf.addNum(2)
self.assertEqual(mf.findMedian(), 2.0, "重复元素测试失败")
def test_negative_numbers(self):
mf = MedianFinder()
mf.addNum(-1)
mf.addNum(-2)
self.assertEqual(mf.findMedian(), -1.5, "负数测试失败")
def test_large_input(self):
mf = MedianFinder()
for i in range(1, 1001):
mf.addNum(i)
# 中位数逐渐增加
if i % 2 == 1:
expected = (i + 1) / 2
else:
expected = (i // 2) + 0.5
self.assertEqual(mf.findMedian(), expected, f"大输入测试失败,添加{i}后中位数应为{expected}")
def test_interleaved_operations(self):
mf = MedianFinder()
operations = ["addNum", "findMedian", "addNum", "findMedian", "addNum", "findMedian", "addNum", "findMedian"]
inputs = [1, None, 2, None, 3, None, 4, None]
expected_outputs = [None, 1.0, None, 1.5, None, 2.0, None, 2.5]
for op, inp, expected in zip(operations, inputs, expected_outputs):
if op == "addNum":
mf.addNum(inp)
self.assertIsNone(expected, "addNum 操作应返回 None")
elif op == "findMedian":
median = mf.findMedian()
self.assertEqual(median, expected, f"interleaved_operations 测试失败,预期中位数为{expected},实际为{median}")
def test_no_elements(self):
mf = MedianFinder()
with self.assertRaises(IndexError, msg="没有元素时findMedian应引发错误"):
mf.findMedian()
if __name__ == "__main__":
unittest.main(argv=[''], exit=False)
说明:
test_no_elements
: 在没有添加任何元素时调用findMedian
会引发IndexError
,因为堆为空。这一行为根据具体实现可能有所不同,视具体需求可以进行调整,例如返回None
或抛出自定义异常。
输出:
...........
----------------------------------------------------------------------
Ran 10 tests in 0.XXXs
OK
复杂度分析
方法一:使用两个堆
时间复杂度
addNum
操作:每次插入和调整堆的操作都是O(log n)
,其中n
是当前堆的大小。findMedian
操作:O(1)
,直接访问堆顶元素。
空间复杂度
O(n)
:需要存储所有添加的元素,分布在两个堆中。
结论
通过采用 使用两个堆(最大堆和最小堆) 的方法,我们能够高效地设计一个支持实时添加数字和查找中位数的数据结构。关键在于:
- 堆的平衡:通过维护两个堆的大小差异不超过1,确保能够快速获取中位数。
- 高效性:
addNum
操作的时间复杂度为O(log n)
,findMedian
操作的时间复杂度为O(1)
,适用于高频次的操作需求。
该方法在实际应用中广泛使用,尤其适用于需要实时统计中位数的数据流场景,如在线统计、实时监控等。