Python Asyncio:优雅地管理与终止长时间运行的任务

python asyncio:优雅地管理与终止长时间运行的任务

本文旨在探讨在Python asyncio异步编程中,如何有效管理和终止可能长时间阻塞的任务,以避免程序无限期等待。我们将重点介绍 asyncio.wait 和 asyncio.wait_for 这两个关键工具,它们提供了设置任务超时机制的能力。通过详细的代码示例和最佳实践,您将学会如何确保异步应用程序在预设时间内响应并关闭,即使某些I/O操作不活跃。

1. 问题背景:asyncio.gather 的局限性

在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。

考虑以下场景:

import asynciostop = Falseasync def watch_task1(client):    while not stop:        # 假设 client.ws.get_data() 可能长时间没有数据返回        await client.ws.get_data()        print("Task 1 received data")async def watch_task2(client):    while not stop:        # 假设 client.ws.get_news() 也可能长时间没有数据返回        await client.ws.get_news()        print("Task 2 received news")async def stop_after(delay_seconds):    global stop    await asyncio.sleep(delay_seconds)    print(f"Stopping after {delay_seconds} seconds...")    stop = Trueclass MockClient:    async def get_data(self):        await asyncio.sleep(100) # 模拟长时间阻塞    async def get_news(self):        await asyncio.sleep(100) # 模拟长时间阻塞    async def sleep(self, delay):        await asyncio.sleep(delay)async def main_gather():    client = MockClient()    tasks = [        watch_task1(client),        watch_task2(client),        stop_after(5), # 尝试在5秒后停止    ]    try:        # 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成        await asyncio.gather(*tasks, return_exceptions=True)    except Exception as e:        print(f"An exception occurred: {e}")    print("Main gather finished.")# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束

在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。

2. 解决方案:使用 asyncio.wait 进行超时控制

为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。

立即学习“Python免费学习笔记(深入)”;

2.1 asyncio.wait 概述

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。

aws: 一个由协程、任务或 Future 组成的集合。timeout: 可选参数,指定等待的最大秒数。如果超时,函数会立即返回。return_when: 可选参数,定义何时返回。常用值包括:asyncio.ALL_COMPLETED (默认): 等待所有任务完成。asyncio.FIRST_COMPLETED: 只要有一个任务完成就返回。asyncio.FIRST_EXCEPTION: 只要有一个任务抛出异常就返回。

asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。

2.2 实现超时停止逻辑

import asynciostop = False # 这是一个共享状态,用于控制协程的内部循环async def watch_task1(client):    try:        while not stop:            print("Task 1: Waiting for data...")            await client.ws.get_data() # 可能会阻塞            print("Task 1: Data received.")    except asyncio.CancelledError:        print("Task 1: Cancelled.")    finally:        print("Task 1: Exiting.")async def watch_task2(client):    try:        while not stop:            print("Task 2: Waiting for news...")            await client.ws.get_news() # 可能会阻塞            print("Task 2: News received.")    except asyncio.CancelledError:        print("Task 2: Cancelled.")    finally:        print("Task 2: Exiting.")# MockClient 保持不变class MockClient:    async def get_data(self):        # 模拟长时间阻塞,但为了演示,将其缩短        await asyncio.sleep(5)    async def get_news(self):        # 模拟长时间阻塞        await asyncio.sleep(5)    async def sleep(self, delay):        await asyncio.sleep(delay)async def main_wait_timeout():    client = MockClient()    tasks_to_run = [        asyncio.create_task(watch_task1(client)), # 显式创建任务        asyncio.create_task(watch_task2(client)),    ]    print("Starting tasks with a 3-second timeout...")    # 设置一个全局超时,例如3秒    done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED)    print("n--- After asyncio.wait ---")    print(f"Completed tasks ({len(done)}):")    for task in done:        try:            # 获取任务结果,如果任务抛出异常,这里会重新抛出            result = task.result()            print(f"  Task finished successfully: {task.get_name()}, Result: {result}")        except asyncio.CancelledError:            print(f"  Task {task.get_name()} was cancelled (expected for pending tasks).")        except Exception as e:            print(f"  Task {task.get_name()} raised an exception: {e}")    print(f"nPending tasks ({len(pending)}):")    for task in pending:        print(f"  Task {task.get_name()} is still pending. Cancelling...")        task.cancel() # 取消未完成的任务        try:            await task # 等待任务真正结束,以便其处理 CancelledError        except asyncio.CancelledError:            print(f"  Task {task.get_name()} successfully cancelled and cleaned up.")        except Exception as e:            print(f"  Task {task.get_name()} raised an exception during cancellation cleanup: {e}")    print("Main wait_timeout finished.")# 运行主函数if __name__ == "__main__":    asyncio.run(main_wait_timeout())

代码解释:

显式创建任务: 在 main_wait_timeout 中,我们使用 asyncio.create_task() 将协程包装成任务。这是推荐的做法,因为 asyncio.wait 接受任务或 Future 对象。设置超时: await asyncio.wait(tasks_to_run, timeout=3) 会在3秒后返回,无论 watch_task 是否完成其内部的 await client.ws.get_data()。处理 done 集合: 遍历 done 集合中的任务,通过 task.result() 获取其结果或捕获可能抛出的异常。处理 pending 集合: 遍历 pending 集合中的任务。这些任务在超时时仍未完成。为了确保资源被释放,必须对它们调用 task.cancel()。协程中的 CancelledError: 当一个任务被 cancel() 时,它会向任务内部抛出一个 asyncio.CancelledError。任务内部的协程应该捕获这个异常,并执行必要的清理工作(例如关闭文件句柄、网络连接等)。在上述 watch_task 示例中,我们添加了 try…except asyncio.CancelledError…finally 块来演示这一点。等待任务真正结束: 在 task.cancel() 之后,最好 await task 一下,确保任务有时间处理 CancelledError 并完成其清理逻辑。

3. 替代方案:asyncio.wait_for

如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。

async def example_wait_for():    client = MockClient()    try:        print("Attempting to get data with a 2-second timeout...")        # watch_task1 内部的循环会因为超时而中断        await asyncio.wait_for(watch_task1(client), timeout=2)        print("watch_task1 finished within timeout.")    except asyncio.TimeoutError:        print("watch_task1 timed out!")    except Exception as e:        print(f"watch_task1 raised an unexpected exception: {e}")    print("Example wait_for finished.")# 如果在 main_wait_timeout 之后运行# asyncio.run(example_wait_for())

asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。

4. 注意事项与最佳实践

任务取消的响应: 这是 asyncio 中非常重要的一点。当一个任务被 cancel() 时,它不会立即停止,而是在其下一个 await 点抛出 asyncio.CancelledError。任务的编写者必须在协程内部捕获 CancelledError,并执行必要的清理工作。如果协程不处理 CancelledError,则在取消后可能会继续运行,直到自然结束或遇到下一个 await 点。资源清理: 在 finally 块中进行资源清理是良好的实践,无论任务是正常完成还是被取消。选择 wait 还是 wait_for:asyncio.wait_for 适用于为单个协程设置超时,并在超时时抛出 TimeoutError。asyncio.wait 适用于管理一组协程/任务,并提供更细粒度的控制,如 return_when 参数,以及返回已完成和未完成任务的集合,方便后续处理(如取消未完成的任务)。return_exceptions=True 与 task.result():asyncio.gather 的 return_exceptions=True 会将协程中抛出的异常作为结果返回,而不是直接抛出。对于 asyncio.wait 返回的 done 任务,调用 task.result() 会重新抛出任务内部发生的任何异常,或者返回其结果。这是检查任务是否成功完成的推荐方式。

5. 总结

在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。

以上就是Python Asyncio:优雅地管理与终止长时间运行的任务的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1366350.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 05:07:37
下一篇 2025年12月14日 05:07:47

相关推荐

  • SLURM 并行处理:在多个文件上运行相同的脚本

    本文旨在指导用户如何使用 SLURM(Simple Linux Utility for Resource Management)在多个输入文件上并行运行同一个 Python 脚本。文章详细解释了 SLURM 脚本的编写,包括资源申请、任务分配以及如何利用 srun 命令实现并行处理。同时,还介绍了 …

    2025年12月14日
    000
  • Python 多进程 Pool 冻结问题排查与解决:一份实用指南

    本文旨在解决 Python 多进程 multiprocessing.Pool 在使用 pool.map 或 pool.map_async 等方法时出现程序冻结或 TypeError: ‘MapResult’ object is not iterable 错误的问题。通过分析常…

    2025年12月14日
    000
  • 整数与有序列表比较:高效查找前一个匹配或相等的值

    本文详细介绍了如何在有序整数列表中查找一个给定整数的“前一个匹配值”或“相等值”。通过分析常见编程陷阱,并提供一个鲁棒的Python函数实现,该函数能有效处理精确匹配、区间查找以及列表边界条件(如小于最小值或大于最大值)等多种场景,确保输出结果的准确性和稳定性。 问题背景与挑战 在实际编程中,我们经…

    2025年12月14日
    000
  • 从字符串到DataFrame:Pandas数据转换指南

    本文旨在指导读者如何将字符串形式的数据转换为Pandas DataFrame。我们将探讨使用eval函数(需谨慎使用)以及更安全、更推荐的方法来实现数据转换,并提供详细的代码示例和注意事项,帮助读者高效地处理字符串数据并将其转换为结构化的DataFrame对象。 使用 eval 函数转换字符串到 D…

    2025年12月14日
    000
  • CodeHS 中检测键盘输入:超越方向键的指南

    本文档旨在解决 CodeHS 环境下检测除方向键之外的其他键盘输入的问题。由于 CodeHS 的特殊库环境,传统的键盘输入检测方法可能不适用。本文将介绍如何利用 keyboard 库在 CodeHS 中实现对任意按键的检测,并提供示例代码和注意事项,帮助开发者克服这一挑战。 在 CodeHS 中,直…

    2025年12月14日
    000
  • VS Code Python项目中的环境变量管理:深入理解与实践

    本教程详细探讨了在VS Code中管理Python项目环境变量的多种方法。我们将分析.env文件在不同运行模式下的加载行为,并提供使用python-dotenv库进行显式加载的实用指南,同时介绍调试配置(launch.json)在环境变量设置中的作用,旨在帮助开发者构建更健壮、可移植的Python应…

    2025年12月14日
    000
  • 使用装饰器实现函数结果缓存:避免 setdefault 的陷阱

    在 Python 中,我们经常需要对一些计算密集型的函数进行优化,避免重复计算相同参数的结果。一种常见的做法是使用缓存,将函数的结果保存下来,下次使用相同的参数调用时直接返回缓存的结果。装饰器是一种优雅的实现缓存的方式,但如果不小心,可能会掉入一些陷阱。 setdefault 的误用 一个常见的误用…

    2025年12月14日
    000
  • 在VS Code中管理Python环境变量:理解.env文件加载机制与最佳实践

    本文详细探讨了在VS Code中为Python项目设置环境变量的方法,重点关注.env文件的加载行为。通过分析不同的代码执行模式(如终端运行、交互式窗口、调试模式),文章揭示了VS Code处理环境变量的差异,并提供了相应的解决方案,包括利用内置功能和python-dotenv库,确保开发环境的稳定…

    2025年12月14日
    000
  • Python 子进程异常的捕获与传递

    子进程异常无法被父进程直接捕获,因进程间内存和调用栈隔离。需通过IPC机制如Queue或ProcessPoolExecutor传递异常信息。使用Queue时,子进程捕获异常并序列化发送,父进程从队列读取并处理;而ProcessPoolExecutor在调用future.result()时自动重新抛出…

    2025年12月14日
    000
  • Python中UTF-8到UTF-7编码的特殊处理与实践

    本文深入探讨了Python中UTF-8字符串转换为UTF-7编码时,尤其对于“可选直接字符”如的处理机制。揭示了Python内置encode(“utf-7”)默认采用直接ASCII编码而非Unicode移位编码的原因,并提供了一种通过bytes.replace()方法手动替换…

    2025年12月14日
    000
  • Python pandas apply vs vectorized 操作

    向量化操作性能优于apply,因底层用C实现,如df[‘A’] + df[‘B’]比apply快;apply适合复杂逻辑但慢,建议优先使用向量化方法。 在使用 Python 的 pandas 处理数据时,apply 和 向量化(vectorized)操…

    2025年12月14日
    000
  • Python 异常处理与测试驱动开发(TDD)

    将异常处理融入TDD,能提升代码健壮性与可维护性。首先明确功能的失败场景及应抛出的异常类型,再编写测试用例验证异常行为,如使用pytest.raises断言特定异常;接着编写最小实现使测试通过,并补全成功路径测试;最后重构优化。异常处理成为功能契约的一部分,通过自定义异常、精准捕获、资源管理等实践,…

    2025年12月14日
    000
  • 在 WSL Ubuntu 终端中连续执行多个命令

    本文旨在指导开发者如何在 Windows Subsystem for Linux (WSL) Ubuntu 终端中,通过 Python 脚本连续执行多个命令。文章将介绍如何利用 os 和 subprocess 模块,实现目录切换和 Python 脚本的执行,并提供详细的代码示例和步骤说明,帮助读者理…

    2025年12月14日
    000
  • Selenium Edge WebDriver 自动化:有效禁用弹窗通知的策略

    本文旨在解决使用Selenium Edge WebDriver时遇到的弹窗通知干扰自动化脚本的问题。我们将探讨如何通过配置Edge浏览器选项来禁用“功能和工作流推荐”等通知,并提供处理Cookie同意弹窗的策略,确保自动化流程顺畅无阻。 在使用Selenium进行Web自动化测试时,Microsof…

    2025年12月14日
    000
  • 如何优雅地在 VS Code 中为 Python 项目设置环境变量

    本文旨在深入探讨在 VS Code 中为 Python 项目设置环境变量的多种方法,重点关注 .env 文件的使用及其在不同运行/调试模式下的行为差异。我们将详细分析 VS Code 提供的内置机制,并介绍如何通过外部库 python-dotenv 实现更灵活、一致的环境变量管理,确保项目在各种执行…

    2025年12月14日
    000
  • 使用装饰器和字典缓存函数结果:避免 setdefault 的陷阱

    本文旨在帮助读者理解如何使用 Python 装饰器实现函数结果缓存,提高代码执行效率。我们将深入探讨使用 dict.setdefault 方法的潜在问题,并提供一种更健壮的缓存实现方案,包括处理可变参数和关键字参数,以及如何避免全局缓存带来的问题。 装饰器与函数缓存 装饰器是 Python 中一种强…

    2025年12月14日
    000
  • Taipy file_selector 组件行为详解与最佳实践

    本文深入探讨了Taipy file_selector 组件的工作原理,解释了文件上传后路径指向临时目录及文件名递增的机制,并强调了其在服务器部署中的必要性。同时,文章提供了正确获取上传文件路径的方法,并指出当前版本无法禁用自动上传成功通知的限制。 理解 Taipy file_selector 的文件…

    2025年12月14日
    000
  • Stanza Lemmatizer:仅提取 Lemma 的方法

    本文介绍了如何使用 Stanza 库进行西班牙语文本的词形还原,并提取所需的 Lemma 信息,避免处理冗余的字典结构。通过解析 Stanza pipeline 的输出结构,展示了如何以简洁高效的方式获取 Lemma 列表,并提供示例代码进行演示。本文适用于需要使用 Stanza 进行词形还原,但仅…

    2025年12月14日
    000
  • 在PySpark中利用数组列与列表交集进行DataFrame过滤的正确姿势

    本文详细介绍了如何在PySpark中高效地过滤DataFrame,当需要根据数组列与一个给定Python列表的交集来筛选数据时。核心解决方案是利用pyspark.sql.functions.arrays_overlap函数,并结合lit函数将Python列表中的元素转换为Spark字面量表达式,从而…

    2025年12月14日
    000
  • 解决Scapy在Windows上“无法将硬件过滤器设置为混杂模式”错误的教程

    本文旨在解决Scapy在Windows 11环境下发送数据包时遇到的“无法将硬件过滤器设置为混杂模式”错误。该问题通常源于过时的Npcap驱动版本或硬件/驱动对混杂模式支持不足。教程提供了两种主要解决方案:升级Npcap驱动至1.74或更高版本,或在Scapy配置中禁用混杂模式,确保用户能够顺利进行…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信