Python asyncio:实现从生成器非阻塞地执行异步任务

Python asyncio:实现从生成器非阻塞地执行异步任务

本文探讨了如何在Python中使用asyncio从生成器高效、非阻塞地调度和执行异步任务。核心在于理解asyncio事件循环的运行机制,通过周期性地将控制权交还给事件循环(例如使用await asyncio.sleep(0)),确保已调度的任务能够获得执行机会。文章还介绍了Python 3.11+中asyncio.TaskGroup的使用,以实现更结构化的并发任务管理。

挑战:从生成器调度异步任务的非阻塞执行

在异步编程中,我们经常需要从一个数据源(例如一个生成器)持续获取数据,并为每个数据项启动一个独立的异步任务进行处理。一个常见的误解是,简单地在同步循环中使用asyncio.create_task()就能实现并发执行。然而,create_task()仅仅是将一个协程函数包装成一个任务并将其调度到事件循环中,它本身并不会立即执行该任务,也不会自动将控制权交还给事件循环。

考虑以下场景,我们有一个生成器持续产生数据,并希望为每个数据项启动一个模拟耗时操作的异步任务:

import asyncioimport randomasync def wrapper(word: str):    """模拟一个耗时的异步任务"""    print(f"开始处理: {word}")    await asyncio.sleep(random.uniform(0.5, 2)) # 模拟异步I/O或计算    print(f"完成处理: {word}")def generator():    """一个无限生成器,持续产生随机字母"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)        # 实际应用中可能从外部源(如队列、网络)获取数据

如果manager函数尝试通过同步循环来调度任务,可能会遇到任务无法并发执行的问题:

async def manager_initial_attempt():    loop = asyncio.get_event_loop()    print("开始调度任务 (初始尝试)")    for _ in range(10): # 仅为示例,实际生成器是无限的        letter = next(generator())        loop.create_task(wrapper(letter)) # 任务被调度,但事件循环未获得执行机会    print("所有任务已调度 (初始尝试)")    # 此时,如果manager_initial_attempt不await任何东西,    # 整个程序可能会在任务完成前退出,或者任务不会并发执行。    # 即使程序不退出,由于没有await,事件循环也无法切换上下文。

在上述manager_initial_attempt中,for循环会快速迭代,连续调用create_task。由于create_task是非阻塞的,manager函数本身并未暂停,因此事件循环没有机会运行这些新创建的任务。这导致任务看起来是同步执行的,或者根本不执行,因为它从未将控制权交还给事件循环。

核心解决方案:显式让出控制权

要解决这个问题,关键在于在每次调度任务后,显式地将控制权交还给asyncio事件循环,哪怕只是短暂的一瞬间。await关键字是实现这一点的核心机制。我们可以使用await asyncio.sleep(0)来达到目的。asyncio.sleep(0)会立即暂停当前协程的执行,并将控制权交还给事件循环。事件循环随后会检查是否有其他已就绪的任务可以运行(包括我们刚刚使用create_task调度的任务),然后再重新安排当前协程的执行。

立即学习“Python免费学习笔记(深入)”;

通过在每次create_task后添加await asyncio.sleep(0),manager函数就可以确保事件循环有机会执行其他并发任务:

async def manager_with_yield():    loop = asyncio.get_event_loop()    print("开始调度任务 (显式让出控制权)")    tasks = []    for _ in range(10): # 仅为示例,实际生成器是无限的        letter = next(generator())        task = loop.create_task(wrapper(letter))        tasks.append(task)        await asyncio.sleep(0) # 关键:让出控制权给事件循环    print("所有任务已调度 (显式让出控制权)")    await asyncio.gather(*tasks) # 等待所有任务完成

在这个改进后的manager_with_yield函数中,每次调度一个wrapper任务后,await asyncio.sleep(0)都会让当前manager协程暂停,并允许事件循环运行其他已就绪的任务,包括刚刚创建的wrapper任务。这样就实现了真正的并发调度和执行。

进阶解决方案:使用 asyncio.TaskGroup (Python 3.11+)

对于Python 3.11及更高版本,asyncio.TaskGroup提供了一种更现代、更结构化的方式来管理一组并发任务。它不仅简化了任务的创建和等待,还提供了更好的错误处理机制,确保在任务组退出时所有子任务都已完成或被妥善处理。

使用asyncio.TaskGroup,我们的manager函数可以变得更加简洁和健壮:

async def manager_with_taskgroup():    print("开始调度任务 (使用 TaskGroup)")    # TaskGroup 提供了一个上下文管理器,在其退出时会自动等待所有子任务完成    async with asyncio.TaskGroup() as tg:        for _ in range(10): # 仅为示例,实际生成器是无限的            letter = next(generator())            tg.create_task(wrapper(letter)) # 使用 TaskGroup 创建任务            await asyncio.sleep(0) # 仍然需要让出控制权    print("所有任务已调度并完成 (使用 TaskGroup)")    # TaskGroup 退出时会自动等待所有任务完成,无需额外的 asyncio.gather

重要提示: 即使使用asyncio.TaskGroup,在for循环内部从生成器获取数据并创建任务时,仍然需要await asyncio.sleep(0)来周期性地让出控制权。TaskGroup负责任务的生命周期管理和等待,但它并不会改变事件循环的调度机制——即只有当当前运行的协程await时,事件循环才有机会切换到其他任务。

完整示例代码

结合上述概念,以下是一个完整的示例,演示如何从一个生成器非阻塞地调度和执行异步任务,并使用asyncio.TaskGroup进行结构化管理:

import asyncioimport randomasync def wrapper(word: str):    """    模拟一个耗时的异步任务。    打印开始和完成信息,并模拟随机延迟。    """    delay = random.uniform(0.5, 2)    print(f"[{asyncio.current_task().get_name()}] 开始处理: {word} (预计 {delay:.2f}s)")    await asyncio.sleep(delay)    print(f"[{asyncio.current_task().get_name()}] 完成处理: {word}")def generator():    """    一个无限生成器,持续产生随机字母。    在实际应用中,这可能是一个从外部源(如消息队列、API)获取数据的协程。    """    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)        # 在实际的异步生成器中,这里可能有一个 await 操作来获取下一个数据        # 例如: await queue.get()async def manager():    """    主管理器协程,负责从生成器获取数据并调度异步任务。    使用 asyncio.TaskGroup 实现结构化并发。    """    print("--- 启动任务管理器 ---")    # 使用 TaskGroup 确保所有子任务在管理器退出前完成    async with asyncio.TaskGroup() as tg:        print("TaskGroup 已启动,开始调度任务...")        # 循环从生成器获取数据并创建任务        for i in range(15): # 模拟处理15个事件            letter = next(generator()) # 从同步生成器获取数据            # 创建一个异步任务并添加到 TaskGroup            # 为任务指定名称,方便调试            tg.create_task(wrapper(letter), name=f"Task-{i:02d}-{letter}")            # 关键步骤:让出控制权给事件循环            # 允许其他已调度的任务(包括刚刚创建的 wrapper 任务)运行            await asyncio.sleep(0)             # 如果需要,可以在这里添加一些日志或条件判断            # print(f"已调度任务 {i+1} for letter '{letter}'")    print("--- 所有任务已调度并完成 ---")if __name__ == "__main__":    # 运行主管理器协程    # asyncio.run() 会启动事件循环并运行顶层协程    asyncio.run(manager())

运行上述代码,你将看到任务的开始和完成信息是交错出现的,这证明了它们是并发执行的。

注意事项与最佳实践

await的重要性: 在asyncio中,只有当一个协程await另一个协程或一个可等待对象时,事件循环才有机会切换到其他已就绪的任务。这是实现并发的关键。asyncio.sleep(0)的用途: 它是将控制权交还给事件循环的最简单方式,即使没有实际的延迟需求,也能确保事件循环有机会处理其他任务。在某些情况下,如果循环体内部已经有其他await操作(例如await queue.get()),那么可能就不需要额外的await asyncio.sleep(0)了。asyncio.TaskGroup:结构化并发: 强烈推荐在Python 3.11+中使用TaskGroup来管理一组相关的并发任务。它提供了清晰的生命周期管理,并在退出时自动等待所有子任务完成。错误处理: TaskGroup还能更好地处理子任务中的异常,当一个子任务失败时,它会取消其他任务并重新抛出异常,从而避免资源泄露。替代方案: 在Python 3.11之前,可以使用asyncio.gather()来等待一组任务,但需要手动收集任务列表。生成器与异步:如果你的生成器本身是异步的(例如,它内部有await操作来获取数据),那么它应该是一个异步生成器(async def),并且你需要使用async for来迭代它。在这种情况下,async for循环内部的await操作自然会把控制权交还给事件循环。如果生成器是同步的(如示例中的generator()),那么在async函数中可以直接使用for … in …迭代,但需要手动处理await asyncio.sleep(0)来确保并发。任务生命周期管理: 对于长时间运行或可能失败的任务,考虑更复杂的任务管理策略,例如设置超时、取消任务或实现重试逻辑。

总结

从生成器非阻塞地调度异步任务是asyncio应用中的一个常见模式。关键在于理解事件循环的工作原理:create_task()仅仅是调度,真正的并发执行需要通过await操作来显式地让出控制权。无论是使用await asyncio.sleep(0)配合asyncio.create_task(),还是采用Python 3.11+中更强大的asyncio.TaskGroup,核心思想都是确保事件循环有机会在任务调度之间切换上下文,从而实现高效的并发处理。掌握这一机制,将能更有效地构建响应迅速、高吞吐量的异步应用。

以上就是Python asyncio:实现从生成器非阻塞地执行异步任务的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369617.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 09:49:11
下一篇 2025年12月14日 09:49:23

相关推荐

  • 基于字符偏移的文本解码技术:使用While循环实现动态索引

    本文详细介绍了如何利用Python的while循环和字符的ASCII值实现一种动态索引的文本解码技术。通过定义一个findNext函数来根据当前字符类型计算下一个字符的偏移量,然后在一个主解码函数中循环迭代处理编码文本,逐步构建出原始消息。这种方法避免了使用with open语句,并展示了在不规则文…

    2025年12月14日
    000
  • Playwright 教程:高效处理浏览器新窗口与弹出页

    本教程详细介绍了如何使用 Playwright 捕获并操作浏览器新打开的窗口或弹出页。核心在于利用 page.expect_popup() 上下文管理器,确保在触发弹出事件前做好监听准备,并在弹出后获取其页面对象,进而进行元素定位与交互,确保自动化流程的顺畅执行。 捕获新窗口与弹出页的核心机制 在进…

    2025年12月14日
    000
  • 解决PyTorch CNN训练中批次大小不匹配错误的实用指南

    本文旨在解决PyTorch卷积神经网络(CNN)训练过程中常见的“批次大小不匹配”错误。核心问题通常源于模型架构中全连接层输入尺寸的计算错误以及特征图展平方式不当。通过修正ConvNet模型中全连接层的输入维度、采用动态批次展平方法X.view(X.size(0), -1),并优化损失函数计算lab…

    2025年12月14日
    000
  • PyTorch CNN训练中批次大小不匹配与维度错误:诊断与解决方案

    本文旨在解决PyTorch卷积神经网络(CNN)训练过程中常见的维度不匹配问题,特别是由于模型架构中全连接层输入尺寸计算错误、特征图展平方式不当以及损失函数目标张量形状不符所导致的RuntimeError。文章将详细分析这些问题,并提供经过优化的代码示例与调试技巧,确保模型训练流程的稳定与正确性。 …

    2025年12月14日
    000
  • Playwright自动化测试中如何高效处理新窗口与弹窗

    本文详细讲解了在Playwright自动化测试中如何高效、准确地处理新窗口(Popup)的场景。通过利用page.expect_popup()上下文管理器,可以捕获并控制由用户操作触发的新浏览器窗口。教程将提供具体的代码示例,指导读者如何在新窗口中定位元素、执行操作,并强调了在实际应用中处理弹窗的注…

    2025年12月14日
    000
  • PyTorch CNN训练中的批次大小不匹配错误:深度解析与修复

    本教程详细探讨了PyTorch卷积神经网络(CNN)训练中常见的“批次大小不匹配”错误,并提供了全面的解决方案。我们将重点关注模型架构中的全连接层输入维度计算、数据扁平化策略、损失函数标签处理以及训练与验证循环中的指标统计,旨在帮助开发者构建更健壮、高效的PyTorch模型。在PyTorch中训练深…

    2025年12月14日
    000
  • sympy.solve 在解方程组时的变量指定策略与常见陷阱

    sympy.solve 在处理多元方程组时,其 symbols 参数的指定方式对求解结果至关重要。本文通过拉格朗日乘数法的实际案例,揭示了当 symbols 参数未完全包含所有自由变量时可能导致空解的现象,并提供了正确指定变量或省略变量参数以获取预期解的有效方法,帮助用户避免求解器误用。 1. sy…

    2025年12月14日
    000
  • PyTorch CNN训练批次大小不匹配错误:诊断与修复

    本教程详细阐述了PyTorch卷积神经网络训练中常见的“批次大小不匹配”错误及其解决方案。通过修正模型全连接层输入维度、优化数据展平操作、调整交叉熵损失函数调用方式,并规范验证阶段指标统计,旨在帮助开发者构建稳定高效的深度学习训练流程,避免因维度不匹配导致的运行时错误。 在pytorch中训练卷积神…

    2025年12月14日
    000
  • 重构Python嵌套字典:实现“轴向”层级交换

    本文旨在解决Python中嵌套字典的层级重构问题,特别是如何像numpy.rollaxis一样交换内部和外部键的顺序。我们将通过一个具体的示例,详细讲解如何通过引用赋值和清理操作,将model -> epoch -> dataset的结构转换为model -> dataset -&…

    2025年12月14日
    000
  • Python 跨模块异常处理与自定义异常实践指南

    本文深入探讨了Python中跨模块异常处理的机制与实践。我们将学习如何定义和正确地在不同模块中引发自定义异常,并确保这些异常能在主程序中被捕获和处理。同时,文章还将讨论模块导入的最佳实践,帮助开发者构建结构清晰、健壮的Python应用。 Python 异常的跨模块传播机制 python的异常处理机制…

    2025年12月14日
    000
  • Python 跨模块异常处理:自定义异常的定义与捕获实践

    Python 允许在不同模块间有效地引发和捕获异常,这对于构建健壮、可维护的应用程序至关重要。本教程将深入探讨如何在 Python 中定义自定义异常、跨模块引发异常并进行捕获处理,以及在导入和使用自定义异常时的最佳实践,旨在帮助开发者实现更精细的错误管理和更清晰的代码结构。 理解 Python 异常…

    2025年12月14日
    000
  • 理解 Python 赋值语句的语法结构

    赋值语句是任何编程语言的基础,Python 也不例外。为了理解 Python 赋值语句的底层语法结构,我们需要深入研究其 Backus-Naur 范式(BNF)定义。很多人在初次接触 Python 语法定义时,可能会对复杂的 BNF 表达式感到困惑,尤其是当试图将一个简单的赋值语句,例如 a = 9…

    2025年12月14日
    000
  • Python跨模块异常处理与自定义异常实践

    本文深入探讨了Python中跨模块处理异常的机制,特别是如何有效捕获和处理在不同模块中抛出的自定义异常。文章详细解释了try…except块的正确使用方式,强调了自定义异常的定义与导入策略,并提供了清晰的代码示例,旨在帮助开发者构建更健壮、可维护的Python应用。 在python编程中…

    2025年12月14日
    000
  • 深入理解Python赋值语句的BNF结构

    本文旨在深入解析Python赋值语句的巴科斯-诺尔范式(BNF)结构,特别是针对初学者常遇到的困惑:一个简单的数字字面量(如9)如何符合复杂的右侧表达式语法。通过详细追溯从starred_expression到literal的完整解析路径,并强调BNF中可选语法元素的设计,揭示Python语法解析的…

    2025年12月14日
    000
  • 深入理解Python赋值语句的BNF语法解析

    本文深入探讨Python赋值语句的BNF(巴科斯-瑙尔范式)语法结构,重点解析了简单赋值操作如a=9中,右侧数值9是如何通过starred_expression递归匹配到expression,并最终解析为literal中的integer类型。通过逐层剖析Python表达式的BNF定义,揭示了许多语法…

    2025年12月14日
    000
  • 深入理解Python赋值语句的BNF语法结构

    Python赋值语句的BNF语法初看复杂,尤其是像a=9这样的简单赋值,其右侧的数字字面量9如何匹配starred_expression或yield_expression。核心在于starred_expression可直接是expression,而expression通过一系列递归定义最终涵盖了li…

    2025年12月14日
    000
  • # 使用 Setuptools 注册多个 Pluggy 插件

    本文介绍了如何使用 Setuptools 正确注册多个 Pluggy 插件,以便它们可以协同工作。核心在于理解 Pluggy 插件的命名规则,以及如何通过 Entry Points 将插件正确地注册到 PluginManager 中。通过修改 `pyproject.toml` 文件中的 Entry …

    2025年12月14日
    000
  • Pluggy多插件管理:Setuptools入口点配置深度解析

    本文深入探讨了如何使用Setuptools正确注册和管理多个Pluggy插件。针对常见问题,即仅最后一个注册插件生效,教程详细阐述了Setuptools入口点名称与Pluggy插件名称的对应关系,并提供了正确的配置示例,确保所有实现同一钩子规范的插件都能被Pluggy管理器发现并按序执行,从而构建健…

    2025年12月14日
    000
  • 掌握pluggy与setuptools多插件注册机制

    本文深入探讨了如何利用pluggy和setuptools正确注册和管理多个Python插件。核心在于理解pluggy中插件名称与钩子名称的区别,并确保每个插件通过setuptools入口点以独有的名称进行注册。通过修改pyproject.toml配置和在插件管理器中添加钩子规范,可以实现多个插件对同…

    2025年12月14日
    000
  • 如何使用 Setuptools 为 Pluggy 注册多个插件

    本文旨在解决使用 Setuptools entry points 注册多个 Pluggy 插件时遇到的常见冲突问题。核心在于理解 Pluggy 如何通过 entry point 名称识别插件,并指出当多个插件尝试使用相同的 entry point 名称时,只有最后一个注册的插件会生效。教程将详细阐述…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信