Python滑动窗口中位数优化:双堆法解决TLE问题

Python滑动窗口中位数优化:双堆法解决TLE问题

本文深入探讨了使用双堆法解决滑动窗口中位数问题时遇到的时间超限(TLE)错误。通过分析传统双堆实现中移除操作的低效性,我们提出了一种基于“惰性删除”策略的优化方案。该方案利用元素索引和窗口边界来隐式标记过期元素,从而将移除操作的时间复杂度从O(K)降低到O(logK),显著提升了算法在大规模数据集上的性能。

问题描述:滑动窗口中位数

给定一个整数数组 nums 和一个整数 k,存在一个大小为 k 的滑动窗口,它从数组的最左侧移动到最右侧。每次滑动窗口向右移动一个位置。任务是返回每个窗口中的中位数组成的数组。

示例:nums = [1,3,-1,-3,5,3,6,7], k = 3窗口移动及中位数:[1,3,-1] -> 中位数 1[3,-1,-3] -> 中位数 -1[-1,-3,5] -> 中位数 -1[-3,5,3] -> 中位数 3[5,3,6] -> 中位数 5[3,6,7] -> 中位数 6返回 [1.0, -1.0, -1.0, 3.0, 5.0, 6.0]

传统双堆法及性能瓶颈分析

双堆法是解决中位数问题的常用策略。它维护两个堆:一个最大堆(small)存储较小的一半元素,一个最小堆(large)存储较大的一半元素。通过平衡这两个堆的大小,可以快速获取中位数。

传统实现思路:

添加元素 (addNum):将新元素添加到合适的堆,并进行堆平衡,确保 len(small) 和 len(large) 的差值不超过1。查找中位数 (findMedian):如果两个堆大小相等,中位数是两个堆顶元素的平均值;否则,中位数是元素较多的那个堆的堆顶元素。移除元素 (popNum):当窗口滑动时,需要移除窗口最左侧的元素。

问题中提供的初始代码示例展示了这种传统实现:

import heapqclass Solution(object):    def __init__(self):        self.small = [] # 最大堆 (存储负值模拟)        self.large = [] # 最小堆    def balance(self):        # ... 堆平衡逻辑 ...    def addNum(self, num):        # ... 添加元素逻辑 ...    def findMedian(self):        # ... 查找中位数逻辑 ...    def popNum(self, num):        # 性能瓶颈所在        if num > (self.small[0] * -1):            self.large.remove(num) # O(K) 查找并移除            heapq.heapify(self.large) # O(K) 重新堆化        else:            self.small.remove(num * -1) # O(K) 查找并移除            heapq.heapify(self.small) # O(K) 重新堆化        self.balance()    def medianSlidingWindow(self, nums, k):        # ... 主逻辑 ...

性能瓶颈:在上述代码中,popNum 方法是导致时间超限(TLE)的主要原因。Python 的 list.remove(value) 操作需要遍历列表来查找指定值,其时间复杂度为 O(K)(其中 K 是堆的大小)。移除元素后,为了恢复堆的性质,需要调用 heapq.heapify(),这同样是 O(K) 的操作。

对于一个包含 N 个元素、窗口大小为 k 的数组,滑动窗口总共会进行 N-k+1 次操作。每次操作包含添加、移除和查找中位数。如果移除操作是 O(K),则总时间复杂度将达到 O(NK)。当 N=100000, K=50000 时,`NK的量级为5 * 10^9`,这远远超出了可接受的时间限制。

立即学习“Python免费学习笔记(深入)”;

优化策略:惰性删除与索引标记

为了将移除操作的时间复杂度降低到 O(logK),我们需要避免直接在堆中搜索和物理移除元素。有两种常见的优化方法:

带索引映射的堆: 维护一个哈希表(字典),将每个值映射到其在堆列表中的索引。通过索引可以直接定位元素,然后用堆末尾元素替换,再进行 O(logK) 的上浮或下沉操作来恢复堆性质。这种方法需要更复杂的自定义堆实现。惰性删除(Lazy Deletion): 不立即物理移除元素,而是对其进行标记。当访问堆顶元素时,如果发现它是被标记为“已删除”的元素,则将其弹出并忽略,直到找到一个未被删除的有效元素。

针对滑动窗口问题,惰性删除策略尤为适用,因为它天然地可以通过窗口的移动来“标记”元素是否过期。为了处理数组中可能存在的重复值,我们需要将每个元素与其在原始数组中的索引绑定,形成 (值, 索引) 对。

惰性删除的实现细节:

存储 (值, 索引) 对: 堆中存储的不再是单纯的数值,而是 (value, index) 元组。这样即使值相同,也能通过索引唯一标识一个元素。lowindex 标记: 在每个堆中维护一个 lowindex 变量。当窗口向右滑动时,最左侧的元素 (old_val, old_idx) 离开窗口。此时,将 lowindex 更新为 old_idx + 1。任何元素的索引 item[1] 小于 lowindex,都意味着它已经不在当前窗口内,应被视为“已删除”。peek 和 pop 操作: 在 peek(查看堆顶)或 pop(弹出堆顶)时,不断检查堆顶元素 item 的 item[1] 是否小于 lowindex。如果是,则说明该元素已过期,将其从堆中弹出并继续检查下一个堆顶元素,直到找到一个有效的、未被删除的元素。

优化后的双堆实现

以下是基于惰性删除策略的优化实现。为了更好地组织代码和复用逻辑,我们创建了自定义的堆类。

import heapq# 辅助函数:用于MaxHeap将值取反def negate(item):    """将 (value, index) 元组中的值取反,用于模拟最大堆。"""    return -item[0], item[1]class WindowHeap(object):    """    基础窗口堆类,支持惰性删除。    conv: 一个转换函数,用于处理元素(例如,MaxHeap需要将值取反)。    """    def __init__(self, conv=lambda x: x):        self.heap = []        self.conv = conv  # 元素转换函数 (例如,MaxHeap的取反操作)        self.lowindex = 0 # 窗口下限索引,用于识别已删除元素    def peek(self):        """        查看堆顶元素,跳过所有已删除的元素。        返回 (value, index) 元组或 None(如果堆为空)。        """        while self.heap:            item = self.conv(self.heap[0]) # 获取原始值和索引            if item[1] >= self.lowindex: # 如果索引在当前窗口内,则有效                return item            # 否则,该元素已过期,从堆中弹出并忽略            heapq.heappop(self.heap)        return None # 堆中没有有效元素    def push(self, item):        """向堆中添加一个 (value, index) 元组。"""        heapq.heappush(self.heap, self.conv(item))    def pop(self):        """        弹出堆顶元素,跳过所有已删除的元素。        返回弹出的 (value, index) 元组。        """        item = self.peek() # 确保获取的是有效元素        if item:            heapq.heappop(self.heap)        return itemclass MinWindowHeap(WindowHeap):    """最小窗口堆,直接存储 (value, index) 元组。"""    def __init__(self):        super(MinWindowHeap, self).__init__(lambda x: x) # 无需转换class MaxWindowHeap(WindowHeap):    """最大窗口堆,通过 negate 函数将值取反存储。"""    def __init__(self):        super(MaxWindowHeap, self).__init__(negate) # 使用 negate 函数class Solution(object):    def rebalance(self, add_val):        """        平衡两个堆的大小,并更新平衡因子。        add_val: 1 表示 large 堆增加,-1 表示 small 堆增加。        """        self.balance += add_val        if abs(self.balance)  1: # 意味着 small 堆比 large 堆多一个元素            self.small.push(self.large.pop()) # 注意:这里是 large.pop() 然后 push 到 small            # 实际上是:如果 small 比 large 多 2 个,需要从 small 移一个到 large            # 或者 large 比 small 多 2 个,需要从 large 移一个到 small            # 这里代码的 self.balance 含义与常规理解可能不同            # 假设 self.balance > 0 意味着 large 堆元素多, self.balance  1: # 意味 large 堆比 small 堆多 2 个或以上            #     self.small.push(self.large.pop())            # elif self.balance  len(self.large.heap) + 1:                self.large.push(self.small.pop())            # 如果 large 堆比 small 堆多两个或以上元素            elif self.large.peek() and self.small.peek() and len(self.large.heap) > len(self.small.heap) + 1:                self.small.push(self.large.pop())            # 重新计算平衡因子            self.balance = len(self.large.heap) - len(self.small.heap) # 假设 balance 是 large - small            # 简化平衡逻辑(根据原答案,balance 变量的更新是关键)            # 原答案的 rebalance 逻辑是基于 self.balance 的变化来判断的            # self.balance 初始为0,每次 insert/remove 改变其值            # 如果 self.balance > 1,表示 large 堆比 small 堆“多”了一个元素,需要从 large 移到 small            # 如果 self.balance  1: # large 堆有效元素比 small 堆多 2 个或以上                self.small.push(self.large.pop())                self.balance -= 2 # large 减少1,small 增加1,差值减少2            elif self.balance < -1: # small 堆有效元素比 large 堆多 2 个或以上                self.large.push(self.small.pop())                self.balance += 2 # small 减少1,large 增加1,差值增加2    def insert(self, item):        """向双堆结构中插入一个 (value, index) 元组。"""        # 确定插入到哪个堆        # 如果 large 堆为空,或当前 item 小于 large 堆顶,则插入 small 堆        if not self.large.peek() or item[0] = self.large.peek()[0]: # 元素在 large 堆中            self.balance -= 1 # large 堆有效元素减少,平衡因子减1        else: # 元素在 small 堆中            self.balance += 1 # small 堆有效元素减少,平衡因子加1        # 更新 lowindex,标记该元素及其之前的所有元素为已删除        # 注意:这里更新的是两个堆的 lowindex,确保它们只关注当前窗口内的元素        self.large.lowindex = self.small.lowindex = item[1] + 1        self.rebalance(0) # 移除后进行平衡    def getMedian(self):        """获取当前窗口的中位数。"""        # 如果平衡因子为 0,表示两个堆有效元素数量相等        if self.balance == 0:            # 中位数是两个堆顶的平均值            return (self.large.peek()[0] + self.small.peek()[0]) * 0.5        # 如果 large 堆有效元素多,中位数是 large 堆顶        elif self.balance > 0:            return float(self.large.peek()[0])        # 如果 small 堆有效元素多,中位数是 small 堆顶        else:            return float(self.small.peek()[0])    def medianSlidingWindow(self, nums, k):        """        计算滑动窗口中位数的主函数。        :type nums: List[int]        :type k: int        :rtype: List[float]        """        self.small = MaxWindowHeap() # 存储较小一半的元素        self.large = MinWindowHeap() # 存储较大一半的元素        self.balance = 0 # 平衡因子:large 堆有效元素数量 - small 堆有效元素数量        # 将原始数组转换为 (value, index) 对列表        items = [(val, i) for i, val in enumerate(nums)]        # 初始化第一个窗口        for item in items[:k]:            self.insert(item)        result = [self.getMedian()]        # 滑动窗口并计算后续中位数        # zip(items, items[k:]) 巧妙地生成 (旧元素, 新元素) 对        for olditem, newitem in zip(items, items[k:]):            self.remove(olditem) # 移除旧元素(惰性删除)            self.insert(newitem) # 插入新元素            result.append(self.getMedian())        return result

代码解释:

negate 函数: 辅助函数,用于将 (value, index) 元组中的 value 取反,以模拟最大堆的行为(Python 的 heapq 默认是最小堆)。WindowHeap 类:__init__:初始化堆列表、转换函数 conv 和 lowindex。peek:在返回堆顶元素之前,会循环弹出所有 item[1] push:将转换后的元素推入堆。pop:先调用 peek 确保找到有效元素,然后弹出并返回。MinWindowHeap 和 MaxWindowHeap: 分别继承 WindowHeap,并传入不同的 conv 函数。Solution 类:rebalance(add_val):负责调整两个堆的大小,确保它们的有效元素数量差值不超过1。self.balance 记录了 large 堆和 small 堆的有效元素数量差。insert(item):根据 item 的值将其插入 small 堆或 large 堆,并更新 self.balance,然后调用 rebalance。remove(item):这是核心优化所在。它不直接从堆中移除 item。而是根据 item 属于哪个堆来更新 self.balance,然后将 self.large.lowindex 和 self.small.lowindex 都更新为 item[1] + 1。这意味着所有索引小于 item[1] + 1 的元素都已过期。最后调用 rebalance。getMedian():根据 self.balance 和两个堆的 peek() 结果计算中位数。medianSlidingWindow(nums, k):主逻辑。首先将 nums 转换为 (value, index) 对列表。然后初始化第一个窗口,并循环滑动窗口,每次调用 remove 和 insert 来更新双堆,并获取中位数。

性能分析

添加元素 (insert): O(logK),因为 heapq.heappush 是 O(logK),并且 rebalance 操作也是 O(logK)。移除元素 (remove): O(logK),因为 lowindex 的更新是 O(1),rebalance 操作是 O(logK)。peek 和 pop 操作虽然可能循环多次弹出过期元素,但在整个滑动窗口过程中,每个元素最多被弹出一次,因此分摊到每次操作上仍然是 O(logK)。获取中位数 (getMedian): O(1),因为 peek 操作是 O(logK)(但在分摊意义上),获取堆顶元素是 O(1)。

因此,优化后的算法总时间复杂度为 O(N log K)。对于 N=100000, K=50000 的测试用例,100000 * log(50000) 大约是 100000 * 16,即 1.6 * 10^6 级别,这在可接受的时间范围内。

总结与注意事项

惰性删除是关键: 通过将物理移除替换为逻辑标记,并延迟处理过期元素,显著提升了滑动窗口中位数问题的性能。lowindex 的作用: 它作为窗口的左边界,巧妙地利用元素索引来判断其是否过期。balance 变量: 准确维护两个堆的有效元素数量差是正确计算中位数和进行堆平衡的关键。处理重复值: 使用 (value, index) 元组是处理输入数组中重复值的必要手段,确保每个元素都能被唯一标识和追踪。Python heapq 模块: 默认实现最小堆。模拟最大堆需要存储元素的负值。Python 2/3 super() 兼容性: 原始答案提到了 super 调用在 Python 2 中需要参数,Python 3 中可以省略。在现代 Python 开发中,通常使用 Python 3 语法,即 super().__init__()。

通过采用这种惰性删除和索引标记的优化策略,我们可以高效地解决滑动窗口中位数问题,避免在大规模数据集上出现时间超限错误。

以上就是Python滑动窗口中位数优化:双堆法解决TLE问题的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1375026.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 14:39:25
下一篇 2025年12月14日 14:39:33

相关推荐

发表回复

登录后才能评论
关注微信