Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

本教程深入探讨如何在Python asyncio中,从任务生成器实现异步任务的无阻塞并发执行。针对在不 await 任务完成的情况下,持续创建并调度新任务的需求,文章详细阐述了 asyncio 协程协作的本质,并提供了两种核心解决方案:通过 await asyncio.sleep(0) 显式让出控制权,以及利用 Python 3.11+ 的 asyncio.TaskGroup 实现更结构化的并发管理,确保任务能够真正地并行运行。

引言

在开发高性能、高并发的应用程序时,python的 asyncio 库提供了一种强大的异步编程范式。特别是在处理需要持续生成和调度任务的场景,例如长轮询服务器、事件驱动系统或数据流处理,如何有效地将这些任务添加到事件循环并确保它们能够并发执行,是一个常见的挑战。本文将深入探讨如何从一个任务生成器中,以异步、非阻塞的方式创建并执行任务,避免因等待单个任务完成而阻塞整个事件循环。

理解异步任务生成的挑战

考虑以下场景:我们有一个任务生成器,它会不断地产生新的任务参数。我们希望为每个参数创建一个异步任务,并将其提交给事件循环,但又不希望主逻辑(即生成任务的部分)停下来等待这些任务完成。

最初的尝试可能如下:

import asyncio, random async def wrapper(word: str):    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 模拟耗时操作    print(f"Finished task for: {word}")def generator():    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager():    loop = asyncio.get_event_loop()    for letter in generator():        loop.create_task(wrapper(letter)) # 创建任务,但不等待        # 问题在于:这里没有让出控制权,事件循环无法调度其他任务async def main():    await manager() # manager是一个无限循环,此处会阻塞if __name__ == '__main__':    # asyncio.run(manager()) # 这样调用会因为manager的无限循环而阻塞    # 需要一种方式让manager能够持续创建任务,同时让其他任务运行    pass # 暂时不运行,因为会阻塞

上述代码的问题在于,manager 协程内部的 for 循环会无限快速地运行,不断地调用 loop.create_task()。虽然 create_task 将 wrapper 协程包装成一个任务并提交给事件循环,但 manager 协身本身并没有任何 await 语句,这意味着它从不主动让出控制权给事件循环。结果是,事件循环没有机会去执行那些被创建的 wrapper 任务,因为 manager 始终占用着CPU。

核心概念回顾

要解决这个问题,我们需要理解 asyncio 的核心工作原理:

立即学习“Python免费学习笔记(深入)”;

事件循环 (Event Loop):asyncio 的核心,负责调度和执行协程。它是一个单线程的循环,通过轮询注册的协程,在协程 await 时暂停当前协程,并选择下一个准备好运行的协程执行。协程 (Coroutines) 与 任务 (Tasks):协程是可暂停和恢复的函数。asyncio.create_task() 将一个协程包装成一个 Task 对象,使其可以被事件循环调度。让出控制权 (Yielding):这是 asyncio 并发实现的关键。一个协程只有在遇到 await 表达式时,才会暂停自身并将控制权交还给事件循环。事件循环才能检查是否有其他任务准备就绪并执行它们。

解决方案一:显式让出控制权

最直接的解决方案是在 manager 协程的循环内部,显式地让出控制权。await asyncio.sleep(0) 是一个常用的技巧,它会立即暂停当前协程,并将控制权交还给事件循环。由于 sleep 的时间是0,事件循环会立即检查是否有其他任务准备就绪,并在下一个循环迭代中重新调度 manager 协程。

import asyncio, random async def wrapper(word: str):    """模拟一个耗时操作的异步任务"""    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 任务模拟    print(f"Finished task for: {word}")def generator():    """一个无限生成随机字母的生成器"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager_with_yield():    """    负责从生成器获取任务并调度,通过await asyncio.sleep(0)显式让出控制权。    """    loop = asyncio.get_event_loop()    print("Manager started, generating tasks...")    for i, letter in enumerate(generator()):        loop.create_task(wrapper(letter))        print(f"Task {i+1} created for '{letter}'")        await asyncio.sleep(0) # 关键:让出控制权,允许其他任务运行        # 实际应用中,可以根据需要增加一个短暂的等待,例如 await asyncio.sleep(0.01)        # 或者在处理一定数量任务后才让出控制权,以平衡调度开销和响应性。        if i >= 10: # 示例:限制生成任务的数量,否则会无限运行            print("Generated 10 tasks, stopping manager.")            breakasync def main_with_yield():    """主入口点,运行带有显式让出控制权的manager"""    await manager_with_yield()    # 等待所有已创建的wrapper任务完成    print("Manager finished, waiting for remaining tasks...")    await asyncio.sleep(2) # 给剩余任务一些时间完成if __name__ == '__main__':    print("--- Running Solution 1: Explicit Yielding ---")    asyncio.run(main_with_yield())    print("--- Solution 1 Finished ---")

注意事项:

await asyncio.sleep(0) 是一种有效的让出控制权的方式,它确保事件循环有机会处理其他已调度的任务。在实际应用中,manager 协程通常不会是无限循环,或者会有一个退出条件。如果它是无限循环,并且没有其他机制(如 asyncio.run 的 timeout 参数或外部信号)来停止它,程序将持续运行。这种方法虽然有效,但在语义上 sleep(0) 可能感觉像是一个“技巧”。

解决方案二:使用 asyncio.TaskGroup (Python 3.11+)

Python 3.11 引入了 asyncio.TaskGroup,这是一种更现代、更结构化的并发管理方式。TaskGroup 提供了一个上下文管理器,可以在其中创建任务。它会自动管理这些任务的生命周期,并在退出上下文时等待所有在其内部创建的任务完成(或处理异常)。更重要的是,TaskGroup 在内部会自动处理任务的调度和让出控制权,使得代码更加简洁和健壮。

import asyncio, random async def wrapper(word: str):    """模拟一个耗时操作的异步任务"""    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 任务模拟    print(f"Finished task for: {word}")def generator():    """一个无限生成随机字母的生成器"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager_with_taskgroup():    """    负责从生成器获取任务并调度,使用asyncio.TaskGroup进行结构化并发管理。    """    print("Manager started with TaskGroup, generating tasks...")    async with asyncio.TaskGroup() as tg: # 使用TaskGroup上下文管理器        for i, letter in enumerate(generator()):            tg.create_task(wrapper(letter)) # 在TaskGroup中创建任务            print(f"Task {i+1} created for '{letter}'")            # TaskGroup在内部会处理调度和让出控制权,通常无需额外的await asyncio.sleep(0)            # 但如果生成任务的速度极快,且任务本身耗时很短,            # 偶尔添加 await asyncio.sleep(0) 仍可能优化响应性。            if i >= 10: # 示例:限制生成任务的数量                print("Generated 10 tasks, stopping manager.")                break    print("TaskGroup exited. All tasks created within it should have completed or been cancelled.")async def main_with_taskgroup():    """主入口点,运行带有TaskGroup的manager"""    await manager_with_taskgroup()if __name__ == '__main__':    print("n--- Running Solution 2: Using asyncio.TaskGroup (Python 3.11+) ---")    # 确保Python版本 >= 3.11    if hasattr(asyncio, 'TaskGroup'):        asyncio.run(main_with_taskgroup())    else:        print("Warning: asyncio.TaskGroup requires Python 3.11 or later. Skipping this example.")    print("--- Solution 2 Finished ---")

TaskGroup 的优势:

结构化并发 (Structured Concurrency):所有在 TaskGroup 中创建的任务都在其作用域内管理。当 TaskGroup 退出时,它会等待所有子任务完成,或者在发生异常时优雅地取消它们。这极大地简化了错误处理和资源管理。隐式让出控制权:TaskGroup 的实现通常会确保事件循环得到足够的调度机会,减少了手动 await asyncio.sleep(0) 的必要性。代码清晰:通过上下文管理器,任务的生命周期和相互关系变得一目了然。

版本要求: asyncio.TaskGroup 需要 Python 3.11 或更高版本。对于旧版本Python,解决方案一仍然是可行的。

完整示例与最佳实践

结合上述两种方法,以下是一个更完整的示例,展示了如何从生成器高效地调度异步任务,并包含一些最佳实践的思考。我们优先推荐使用 TaskGroup。

import asyncioimport randomimport timeasync def process_item(item_id: int, data: str):    """模拟一个异步处理任务,打印处理信息并模拟耗时"""    start_time = time.time()    print(f"[{item_id}] Processing item: '{data}'...")    await asyncio.sleep(random.uniform(0.5, 2.0)) # 模拟随机耗时    end_time = time.time()    print(f"[{item_id}] Finished item: '{data}' in {end_time - start_time:.2f}s")def item_generator(max_items: int = 20):    """一个生成器,生成带ID的随机数据"""    abc = 'abcdefghijklmnopqrstuvwxyz'    for i in range(1, max_items + 1):        yield i, random.choice(abc) * random.randint(3, 8) # 生成随机长度的字符串async def task_dispatcher():    """    任务调度器,从生成器获取数据并创建异步任务。    优先使用TaskGroup,如果不可用则回退到显式让出控制权。    """    print("--- Task Dispatcher Started ---")    item_count = 0    if hasattr(asyncio, 'TaskGroup'):        print("Using asyncio.TaskGroup for task management.")        async with asyncio.TaskGroup() as tg:            for item_id, data in item_generator():                tg.create_task(process_item(item_id, data))                print(f"Dispatched task {item_id} for data '{data}'")                item_count += 1                # 即使使用TaskGroup,如果生成任务的速度远超任务执行速度,                # 也可以考虑在此处加入一个短暂的await,以避免内存中积压过多未开始的任务。                # 例如: if item_count % 5 == 0: await asyncio.sleep(0.01)        print(f"--- TaskGroup Finished. All {item_count} tasks completed or cancelled. ---")    else:        print("asyncio.TaskGroup not available (Python < 3.11). Falling back to explicit yield.")        loop = asyncio.get_event_loop()        for item_id, data in item_generator():            loop.create_task(process_item(item_id, data))            print(f"Dispatched task {item_id} for data '{data}'")            item_count += 1            await asyncio.sleep(0) # 显式让出控制权        print(f"--- Dispatcher Finished creating {item_count} tasks. Waiting for them to complete. ---")        # 由于是手动创建任务且没有TaskGroup等待,需要额外等待所有任务完成        await asyncio.sleep(3) # 粗略等待,实际应用中可能需要更精细的等待机制async def main():    """主程序入口"""    await task_dispatcher()    print("All dispatching and processing should be complete.")if __name__ == '__main__':    asyncio.run(main())

最佳实践:

选择合适的并发工具Python 3.11+:优先使用 asyncio.TaskGroup,它提供了结构化并发的优势,简化了任务管理、错误处理和资源清理。Python :使用 loop.create_task() 结合 await asyncio.sleep(0) 是实现非阻塞任务调度的有效方法。流量控制与背压:如果任务生成器产生任务的速度远超事件循环处理任务的速度,可能会导致内存占用过高或系统负载过大。在这种情况下,需要引入流量控制机制,例如:信号量 (Semaphore):限制同时运行的并发任务数量。队列 (Queue):将任务参数放入 asyncio.Queue,由固定数量的消费者协程从队列中取出并执行任务。批处理:一次性生成并调度一批任务,然后等待这批任务完成或达到某个阈值后再生成下一批。错误处理:在 TaskGroup 中,如果任何子任务抛出异常,TaskGroup 会捕获它并在退出上下文时重新抛出 ExceptionGroup(Python 3.11+)。这使得集中处理错误变得容易。对于手动 create_task 的情况,需要单独管理任务的异常,例如通过 task.add_done_callback() 或收集任务引用并在稍后 await 它们以捕获异常。任务生命周期管理:如果需要取消正在运行的任务,或者获取任务的结果,需要保留 Task 对象的引用。TaskGroup 在退出时会自动处理取消和等待,但在更复杂的场景中,可能仍需手动管理任务引用。

总结

在 asyncio 中从任务生成器实现高效异步并发执行的核心在于理解事件循环的协作式调度机制。仅仅通过 create_task() 创建任务不足以实现并发,关键在于主调度逻辑必须周期性地让出控制权给事件循环。

对于 Python 3.11 及更高版本,推荐使用 asyncio.TaskGroup,它提供了一种结构化、健壮且易于管理任务生命周期和错误处理的并发模式。对于 Python 3.10 及更低版本,await asyncio.sleep(0) 是一个有效的技巧,能够强制协程让出控制权,从而允许事件循环调度其他已创建的任务。

无论采用哪种方法,理解 await 的作用以及事件循环的工作原理,是构建高效、响应式 asyncio 应用程序的基础。在实际应用中,还需要结合流量控制、错误处理等机制,确保系统的稳定性和可扩展性。

以上就是Python asyncio:从任务生成器实现高效异步并发执行的原理与实践的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369671.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
PyTorch CNN训练后只输出单一结果的解决方法
上一篇 2025年12月14日 09:51:50
使用TaskGroup实现异步任务生成器的任务执行
下一篇 2025年12月14日 09:52:00

相关推荐

  • composer require-dev和require有什么不同_Composer Require与Require-Dev区别解析

    require用于声明项目运行必需的依赖,如框架、数据库组件和第三方SDK,这些包会随项目部署到生产环境;2. require-dev用于声明仅在开发和测试阶段需要的工具,如PHPUnit、PHPStan、Faker等,不会默认部署到生产环境;3. 安装时composer install根据环境决定…

    2026年5月10日
    1000
  • Matplotlib 地图中多类型图例的创建与优化

    Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化

    本教程旨在解决matplotlib地图可视化中,如何在一个图例中同时展示颜色块(如区域分类)和自定义标记(如特定兴趣点)的问题。文章详细介绍了当传统`patch`对象无法正确显示标记时,如何利用`matplotlib.lines.line2d`创建标记图例句柄,并将其与颜色块图例句柄合并,从而生成一…

    2026年5月10日 用户投稿
    100
  • Golang JSON序列化:控制敏感字段暴露的最佳实践

    本教程探讨golang中如何高效控制结构体字段在json序列化时的可见性。当需要将包含敏感信息的结构体数组转换为json响应时,通过利用`encoding/json`包提供的结构体标签,特别是`json:”-“`,可以轻松实现对特定字段的忽略,从而避免敏感数据泄露,确保api…

    2026年5月10日
    000
  • 利用海象运算符简化条件赋值:Python教程与最佳实践

    本文旨在探讨Python中海象运算符(:=)在条件赋值场景下的应用。通过对比传统if/else语句与海象运算符,以及条件表达式,分析海象运算符在简化代码、提高可读性方面的优势与局限性。并通过具体示例,展示如何在列表推导式等场景下合理使用海象运算符,同时强调其潜在的复杂性及替代方案,帮助开发者更好地掌…

    2026年5月10日
    100
  • Debian syslog性能优化技巧有哪些

    提升Debian系统syslog (通常基于rsyslog)性能,关键在于精简配置和高效处理日志。以下策略能有效优化日志管理,提升系统整体性能: 精简配置,高效加载: 在rsyslog配置文件中,仅加载必要的输入、输出和解析模块。 使用全局指令设置日志级别和格式,避免不必要的处理。 自定义模板: 创…

    2026年5月10日
    000
  • 比特币新手教程 比特币交易平台有哪些

    比特币是一种去中心化的数字货币,基于区块链技术实现点对点交易,具有匿名性、有限发行和不可篡改等特点;新手可通过交易所购买,P2P交易获得比特币,常用平台包括Binance、OKX和Huobi;交易流程包括注册账户、实名认证、绑定支付方式、充值法币并下单购买,可选择市价单或限价单;比特币存储方式有交易…

    2026年5月10日
    000
  • c++中的SFINAE技术是什么_c++模板编程中的SFINAE原理与应用

    SFINAE 是“替换失败不是错误”的原则,指模板实例化时若参数替换导致错误,只要存在其他合法候选,编译器不报错而是继续重载决议。它用于条件启用模板、类型检测等场景,如通过 decltype 或 enable_if 控制函数重载,实现类型特征判断。尽管 C++20 引入 Concepts 简化了部分…

    2026年5月10日
    000
  • Go语言mgo查询构建:深入理解bson.M与日期范围查询的正确实践

    本文旨在解决go语言mgo库中构建复杂查询时,特别是涉及嵌套`bson.m`和日期范围筛选的常见错误。我们将深入剖析`bson.m`的类型特性,解释为何直接索引`interface{}`会导致“invalid operation”错误,并提供一种推荐的、结构清晰的代码重构方案,以确保查询条件能够正确…

    2026年5月10日
    100
  • RichHandler与Rich Progress集成:解决显示冲突的教程

    在使用rich库的`richhandler`进行日志输出并同时使用`progress`组件时,可能会遇到显示错乱或溢出问题。这通常是由于为`richhandler`和`progress`分别创建了独立的`console`实例导致的。解决方案是确保日志处理器和进度条组件共享同一个`console`实例…

    2026年5月10日
    000
  • Golang goroutine与channel调试技巧

    使用go run -race检测数据竞争,结合runtime.NumGoroutine监控协程数量,通过pprof分析阻塞调用栈,利用select超时避免永久阻塞,有效排查goroutine泄漏、死锁和数据竞争问题。 Go语言的goroutine和channel是并发编程的核心,但它们也带来了调试上…

    2026年5月10日
    000
  • 使用 Jupyter Notebook 进行探索性数据分析

    Jupyter Notebook通过单元格实现代码与Markdown结合,支持数据导入(pandas)、清洗(fillna)、探索(matplotlib/seaborn可视化)、统计分析(describe/corr)和特征工程,便于记录与分享分析过程。 Jupyter Notebook 是进行探索性…

    2026年5月10日
    000
  • 《魔兽世界》将于6月11日开启国服回归技术测试

    《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试

    《%ign%ignore_a_1%re_a_1%》官方宣布,将于6月11日开启国服回归技术测试,时间为7天,并称可以在6月内正式开服,玩家们可以访问官网下载战网客户端并预下载“巫妖王之怒”客户端,技术测试详情见下图。 WordAi WordAI是一个AI驱动的内容重写平台 53 查看详情 以上就是《…

    2026年5月10日 用户投稿
    200
  • php常量怎么用_PHP常量(define/const)定义与使用方法

    PHP中可通过define函数和const关键字定义常量,用于存储不可变值。define适用于全局作用域,支持动态名称和条件定义,如define(‘SITE_NAME’, ‘MyWebsite’);const在编译时生效,语法简洁但限制多,只能在类或全…

    2026年5月10日
    000
  • 如何在HTML中插入表单元素_HTML表单控件与输入类型使用指南

    HTML表单通过标签构建,包含action和method属性定义数据提交目标与方式,常用input类型如text、password、email等适配不同输入需求,配合label、required、placeholder提升可用性,结合textarea、select、button等控件实现完整交互,是…

    2026年5月10日
    000
  • 网站标题关键词更新后,搜索引擎为何仍显示旧标题?

    网站标题更新后,搜索引擎为何显示旧标题? 网站SEO优化中,站长常修改网站标题关键词,期望搜索结果显示自定义标题。然而,即使更新标签、meta keywords、meta description和结构化数据中的name属性后,搜索结果仍显示旧标题,这令人费解。本文将对此进行解释。 问题:站长修改了网…

    2026年5月10日
    100
  • 创建指定大小并填充特定数据的Golang文件教程

    本文将介绍如何使用Golang创建一个指定大小的文件,并用特定数据填充它。我们将使用 `os` 包提供的函数来创建和截断文件,从而实现快速生成大文件的目的。示例代码展示了如何创建一个10MB的文件,并将其填充为全零数据。掌握这些方法,可以方便地在例如日志系统或磁盘队列等场景中,预先创建测试文件或初始…

    2026年5月10日
    000
  • Python命令怎样使用profile分析脚本性能 Python命令性能分析的基础教程

    使用Python的cProfile模块分析脚本性能最直接的方式是通过命令行执行python -m cProfile your_script.py,它会输出每个函数的调用次数、总耗时、累积耗时等关键指标,帮助定位性能瓶颈;为进一步分析,可将结果保存为文件python -m cProfile -o ou…

    2026年5月10日
    000
  • 使用 WebCodecs VideoDecoder 实现精确逐帧回退

    本文档旨在解决在使用 WebCodecs VideoDecoder 进行视频解码时,实现精确逐帧回退的问题。通过比较帧的时间戳与目标帧的时间戳,可以避免渲染中间帧,从而提高用户体验。本文将提供详细的解决方案和示例代码,帮助开发者实现精确的视频帧控制。 在使用 WebCodecs VideoDecod…

    2026年5月10日
    000
  • 如何插入查询结果数据_SQL插入Select查询结果方法

    如何插入查询结果数据_SQL插入Select查询结果方法如何插入查询结果数据_SQL插入Select查询结果方法如何插入查询结果数据_SQL插入Select查询结果方法如何插入查询结果数据_SQL插入Select查询结果方法

    使用INSERT INTO…SELECT语句可高效插入数据,通过NOT EXISTS、LEFT JOIN、MERGE语句或唯一约束避免重复;表结构不一致时可通过别名、类型转换、默认值或计算字段处理;结合存储过程可提升可维护性,支持参数化与动态SQL。 将查询结果数据插入到另一个表中,可以…

    2026年5月10日 用户投稿
    000
  • Discord.py 交互按钮超时与持久化解决方案

    本教程旨在解决Discord.py中交互按钮在一段时间后出现“This Interaction Failed”错误的问题。我们将深入探讨视图(View)的超时机制,并提供通过正确设置timeout参数以及利用bot.add_view()方法实现按钮持久化的具体方案,确保您的机器人交互功能稳定可靠,即…

    2026年5月10日
    000

发表回复

登录后才能评论
关注微信