
本文旨在探讨在Python asyncio异步编程中,如何有效管理和终止可能长时间阻塞的任务,以避免程序无限期等待。我们将重点介绍 asyncio.wait 和 asyncio.wait_for 这两个关键工具,它们提供了设置任务超时机制的能力。通过详细的代码示例和最佳实践,您将学会如何确保异步应用程序在预设时间内响应并关闭,即使某些I/O操作不活跃。
1. 问题背景:asyncio.gather 的局限性
在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。
考虑以下场景:
import asynciostop = Falseasync def watch_task1(client): while not stop: # 假设 client.ws.get_data() 可能长时间没有数据返回 await client.ws.get_data() print("Task 1 received data")async def watch_task2(client): while not stop: # 假设 client.ws.get_news() 也可能长时间没有数据返回 await client.ws.get_news() print("Task 2 received news")async def stop_after(delay_seconds): global stop await asyncio.sleep(delay_seconds) print(f"Stopping after {delay_seconds} seconds...") stop = Trueclass MockClient: async def get_data(self): await asyncio.sleep(100) # 模拟长时间阻塞 async def get_news(self): await asyncio.sleep(100) # 模拟长时间阻塞 async def sleep(self, delay): await asyncio.sleep(delay)async def main_gather(): client = MockClient() tasks = [ watch_task1(client), watch_task2(client), stop_after(5), # 尝试在5秒后停止 ] try: # 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成 await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: print(f"An exception occurred: {e}") print("Main gather finished.")# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束
在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。
2. 解决方案:使用 asyncio.wait 进行超时控制
为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。
立即学习“Python免费学习笔记(深入)”;
2.1 asyncio.wait 概述
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。
aws: 一个由协程、任务或 Future 组成的集合。timeout: 可选参数,指定等待的最大秒数。如果超时,函数会立即返回。return_when: 可选参数,定义何时返回。常用值包括:asyncio.ALL_COMPLETED (默认): 等待所有任务完成。asyncio.FIRST_COMPLETED: 只要有一个任务完成就返回。asyncio.FIRST_EXCEPTION: 只要有一个任务抛出异常就返回。
asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。
2.2 实现超时停止逻辑
import asynciostop = False # 这是一个共享状态,用于控制协程的内部循环async def watch_task1(client): try: while not stop: print("Task 1: Waiting for data...") await client.ws.get_data() # 可能会阻塞 print("Task 1: Data received.") except asyncio.CancelledError: print("Task 1: Cancelled.") finally: print("Task 1: Exiting.")async def watch_task2(client): try: while not stop: print("Task 2: Waiting for news...") await client.ws.get_news() # 可能会阻塞 print("Task 2: News received.") except asyncio.CancelledError: print("Task 2: Cancelled.") finally: print("Task 2: Exiting.")# MockClient 保持不变class MockClient: async def get_data(self): # 模拟长时间阻塞,但为了演示,将其缩短 await asyncio.sleep(5) async def get_news(self): # 模拟长时间阻塞 await asyncio.sleep(5) async def sleep(self, delay): await asyncio.sleep(delay)async def main_wait_timeout(): client = MockClient() tasks_to_run = [ asyncio.create_task(watch_task1(client)), # 显式创建任务 asyncio.create_task(watch_task2(client)), ] print("Starting tasks with a 3-second timeout...") # 设置一个全局超时,例如3秒 done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED) print("n--- After asyncio.wait ---") print(f"Completed tasks ({len(done)}):") for task in done: try: # 获取任务结果,如果任务抛出异常,这里会重新抛出 result = task.result() print(f" Task finished successfully: {task.get_name()}, Result: {result}") except asyncio.CancelledError: print(f" Task {task.get_name()} was cancelled (expected for pending tasks).") except Exception as e: print(f" Task {task.get_name()} raised an exception: {e}") print(f"nPending tasks ({len(pending)}):") for task in pending: print(f" Task {task.get_name()} is still pending. Cancelling...") task.cancel() # 取消未完成的任务 try: await task # 等待任务真正结束,以便其处理 CancelledError except asyncio.CancelledError: print(f" Task {task.get_name()} successfully cancelled and cleaned up.") except Exception as e: print(f" Task {task.get_name()} raised an exception during cancellation cleanup: {e}") print("Main wait_timeout finished.")# 运行主函数if __name__ == "__main__": asyncio.run(main_wait_timeout())
代码解释:
显式创建任务: 在 main_wait_timeout 中,我们使用 asyncio.create_task() 将协程包装成任务。这是推荐的做法,因为 asyncio.wait 接受任务或 Future 对象。设置超时: await asyncio.wait(tasks_to_run, timeout=3) 会在3秒后返回,无论 watch_task 是否完成其内部的 await client.ws.get_data()。处理 done 集合: 遍历 done 集合中的任务,通过 task.result() 获取其结果或捕获可能抛出的异常。处理 pending 集合: 遍历 pending 集合中的任务。这些任务在超时时仍未完成。为了确保资源被释放,必须对它们调用 task.cancel()。协程中的 CancelledError: 当一个任务被 cancel() 时,它会向任务内部抛出一个 asyncio.CancelledError。任务内部的协程应该捕获这个异常,并执行必要的清理工作(例如关闭文件句柄、网络连接等)。在上述 watch_task 示例中,我们添加了 try…except asyncio.CancelledError…finally 块来演示这一点。等待任务真正结束: 在 task.cancel() 之后,最好 await task 一下,确保任务有时间处理 CancelledError 并完成其清理逻辑。
3. 替代方案:asyncio.wait_for
如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。
async def example_wait_for(): client = MockClient() try: print("Attempting to get data with a 2-second timeout...") # watch_task1 内部的循环会因为超时而中断 await asyncio.wait_for(watch_task1(client), timeout=2) print("watch_task1 finished within timeout.") except asyncio.TimeoutError: print("watch_task1 timed out!") except Exception as e: print(f"watch_task1 raised an unexpected exception: {e}") print("Example wait_for finished.")# 如果在 main_wait_timeout 之后运行# asyncio.run(example_wait_for())
asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。
4. 注意事项与最佳实践
任务取消的响应: 这是 asyncio 中非常重要的一点。当一个任务被 cancel() 时,它不会立即停止,而是在其下一个 await 点抛出 asyncio.CancelledError。任务的编写者必须在协程内部捕获 CancelledError,并执行必要的清理工作。如果协程不处理 CancelledError,则在取消后可能会继续运行,直到自然结束或遇到下一个 await 点。资源清理: 在 finally 块中进行资源清理是良好的实践,无论任务是正常完成还是被取消。选择 wait 还是 wait_for:asyncio.wait_for 适用于为单个协程设置超时,并在超时时抛出 TimeoutError。asyncio.wait 适用于管理一组协程/任务,并提供更细粒度的控制,如 return_when 参数,以及返回已完成和未完成任务的集合,方便后续处理(如取消未完成的任务)。return_exceptions=True 与 task.result():asyncio.gather 的 return_exceptions=True 会将协程中抛出的异常作为结果返回,而不是直接抛出。对于 asyncio.wait 返回的 done 任务,调用 task.result() 会重新抛出任务内部发生的任何异常,或者返回其结果。这是检查任务是否成功完成的推荐方式。
5. 总结
在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。
以上就是Python Asyncio:优雅地管理与终止长时间运行的任务的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1366350.html
微信扫一扫
支付宝扫一扫