Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

本教程深入探讨如何在Python asyncio中,从任务生成器实现异步任务的无阻塞并发执行。针对在不 await 任务完成的情况下,持续创建并调度新任务的需求,文章详细阐述了 asyncio 协程协作的本质,并提供了两种核心解决方案:通过 await asyncio.sleep(0) 显式让出控制权,以及利用 Python 3.11+ 的 asyncio.TaskGroup 实现更结构化的并发管理,确保任务能够真正地并行运行。

引言

在开发高性能、高并发的应用程序时,python的 asyncio 库提供了一种强大的异步编程范式。特别是在处理需要持续生成和调度任务的场景,例如长轮询服务器、事件驱动系统或数据流处理,如何有效地将这些任务添加到事件循环并确保它们能够并发执行,是一个常见的挑战。本文将深入探讨如何从一个任务生成器中,以异步、非阻塞的方式创建并执行任务,避免因等待单个任务完成而阻塞整个事件循环。

理解异步任务生成的挑战

考虑以下场景:我们有一个任务生成器,它会不断地产生新的任务参数。我们希望为每个参数创建一个异步任务,并将其提交给事件循环,但又不希望主逻辑(即生成任务的部分)停下来等待这些任务完成。

最初的尝试可能如下:

import asyncio, random async def wrapper(word: str):    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 模拟耗时操作    print(f"Finished task for: {word}")def generator():    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager():    loop = asyncio.get_event_loop()    for letter in generator():        loop.create_task(wrapper(letter)) # 创建任务,但不等待        # 问题在于:这里没有让出控制权,事件循环无法调度其他任务async def main():    await manager() # manager是一个无限循环,此处会阻塞if __name__ == '__main__':    # asyncio.run(manager()) # 这样调用会因为manager的无限循环而阻塞    # 需要一种方式让manager能够持续创建任务,同时让其他任务运行    pass # 暂时不运行,因为会阻塞

上述代码的问题在于,manager 协程内部的 for 循环会无限快速地运行,不断地调用 loop.create_task()。虽然 create_task 将 wrapper 协程包装成一个任务并提交给事件循环,但 manager 协身本身并没有任何 await 语句,这意味着它从不主动让出控制权给事件循环。结果是,事件循环没有机会去执行那些被创建的 wrapper 任务,因为 manager 始终占用着CPU。

核心概念回顾

要解决这个问题,我们需要理解 asyncio 的核心工作原理:

立即学习“Python免费学习笔记(深入)”;

事件循环 (Event Loop):asyncio 的核心,负责调度和执行协程。它是一个单线程的循环,通过轮询注册的协程,在协程 await 时暂停当前协程,并选择下一个准备好运行的协程执行。协程 (Coroutines) 与 任务 (Tasks):协程是可暂停和恢复的函数。asyncio.create_task() 将一个协程包装成一个 Task 对象,使其可以被事件循环调度。让出控制权 (Yielding):这是 asyncio 并发实现的关键。一个协程只有在遇到 await 表达式时,才会暂停自身并将控制权交还给事件循环。事件循环才能检查是否有其他任务准备就绪并执行它们。

解决方案一:显式让出控制权

最直接的解决方案是在 manager 协程的循环内部,显式地让出控制权。await asyncio.sleep(0) 是一个常用的技巧,它会立即暂停当前协程,并将控制权交还给事件循环。由于 sleep 的时间是0,事件循环会立即检查是否有其他任务准备就绪,并在下一个循环迭代中重新调度 manager 协程。

import asyncio, random async def wrapper(word: str):    """模拟一个耗时操作的异步任务"""    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 任务模拟    print(f"Finished task for: {word}")def generator():    """一个无限生成随机字母的生成器"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager_with_yield():    """    负责从生成器获取任务并调度,通过await asyncio.sleep(0)显式让出控制权。    """    loop = asyncio.get_event_loop()    print("Manager started, generating tasks...")    for i, letter in enumerate(generator()):        loop.create_task(wrapper(letter))        print(f"Task {i+1} created for '{letter}'")        await asyncio.sleep(0) # 关键:让出控制权,允许其他任务运行        # 实际应用中,可以根据需要增加一个短暂的等待,例如 await asyncio.sleep(0.01)        # 或者在处理一定数量任务后才让出控制权,以平衡调度开销和响应性。        if i >= 10: # 示例:限制生成任务的数量,否则会无限运行            print("Generated 10 tasks, stopping manager.")            breakasync def main_with_yield():    """主入口点,运行带有显式让出控制权的manager"""    await manager_with_yield()    # 等待所有已创建的wrapper任务完成    print("Manager finished, waiting for remaining tasks...")    await asyncio.sleep(2) # 给剩余任务一些时间完成if __name__ == '__main__':    print("--- Running Solution 1: Explicit Yielding ---")    asyncio.run(main_with_yield())    print("--- Solution 1 Finished ---")

注意事项:

await asyncio.sleep(0) 是一种有效的让出控制权的方式,它确保事件循环有机会处理其他已调度的任务。在实际应用中,manager 协程通常不会是无限循环,或者会有一个退出条件。如果它是无限循环,并且没有其他机制(如 asyncio.run 的 timeout 参数或外部信号)来停止它,程序将持续运行。这种方法虽然有效,但在语义上 sleep(0) 可能感觉像是一个“技巧”。

解决方案二:使用 asyncio.TaskGroup (Python 3.11+)

Python 3.11 引入了 asyncio.TaskGroup,这是一种更现代、更结构化的并发管理方式。TaskGroup 提供了一个上下文管理器,可以在其中创建任务。它会自动管理这些任务的生命周期,并在退出上下文时等待所有在其内部创建的任务完成(或处理异常)。更重要的是,TaskGroup 在内部会自动处理任务的调度和让出控制权,使得代码更加简洁和健壮。

import asyncio, random async def wrapper(word: str):    """模拟一个耗时操作的异步任务"""    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 任务模拟    print(f"Finished task for: {word}")def generator():    """一个无限生成随机字母的生成器"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager_with_taskgroup():    """    负责从生成器获取任务并调度,使用asyncio.TaskGroup进行结构化并发管理。    """    print("Manager started with TaskGroup, generating tasks...")    async with asyncio.TaskGroup() as tg: # 使用TaskGroup上下文管理器        for i, letter in enumerate(generator()):            tg.create_task(wrapper(letter)) # 在TaskGroup中创建任务            print(f"Task {i+1} created for '{letter}'")            # TaskGroup在内部会处理调度和让出控制权,通常无需额外的await asyncio.sleep(0)            # 但如果生成任务的速度极快,且任务本身耗时很短,            # 偶尔添加 await asyncio.sleep(0) 仍可能优化响应性。            if i >= 10: # 示例:限制生成任务的数量                print("Generated 10 tasks, stopping manager.")                break    print("TaskGroup exited. All tasks created within it should have completed or been cancelled.")async def main_with_taskgroup():    """主入口点,运行带有TaskGroup的manager"""    await manager_with_taskgroup()if __name__ == '__main__':    print("n--- Running Solution 2: Using asyncio.TaskGroup (Python 3.11+) ---")    # 确保Python版本 >= 3.11    if hasattr(asyncio, 'TaskGroup'):        asyncio.run(main_with_taskgroup())    else:        print("Warning: asyncio.TaskGroup requires Python 3.11 or later. Skipping this example.")    print("--- Solution 2 Finished ---")

TaskGroup 的优势:

结构化并发 (Structured Concurrency):所有在 TaskGroup 中创建的任务都在其作用域内管理。当 TaskGroup 退出时,它会等待所有子任务完成,或者在发生异常时优雅地取消它们。这极大地简化了错误处理和资源管理。隐式让出控制权:TaskGroup 的实现通常会确保事件循环得到足够的调度机会,减少了手动 await asyncio.sleep(0) 的必要性。代码清晰:通过上下文管理器,任务的生命周期和相互关系变得一目了然。

版本要求: asyncio.TaskGroup 需要 Python 3.11 或更高版本。对于旧版本Python,解决方案一仍然是可行的。

完整示例与最佳实践

结合上述两种方法,以下是一个更完整的示例,展示了如何从生成器高效地调度异步任务,并包含一些最佳实践的思考。我们优先推荐使用 TaskGroup。

import asyncioimport randomimport timeasync def process_item(item_id: int, data: str):    """模拟一个异步处理任务,打印处理信息并模拟耗时"""    start_time = time.time()    print(f"[{item_id}] Processing item: '{data}'...")    await asyncio.sleep(random.uniform(0.5, 2.0)) # 模拟随机耗时    end_time = time.time()    print(f"[{item_id}] Finished item: '{data}' in {end_time - start_time:.2f}s")def item_generator(max_items: int = 20):    """一个生成器,生成带ID的随机数据"""    abc = 'abcdefghijklmnopqrstuvwxyz'    for i in range(1, max_items + 1):        yield i, random.choice(abc) * random.randint(3, 8) # 生成随机长度的字符串async def task_dispatcher():    """    任务调度器,从生成器获取数据并创建异步任务。    优先使用TaskGroup,如果不可用则回退到显式让出控制权。    """    print("--- Task Dispatcher Started ---")    item_count = 0    if hasattr(asyncio, 'TaskGroup'):        print("Using asyncio.TaskGroup for task management.")        async with asyncio.TaskGroup() as tg:            for item_id, data in item_generator():                tg.create_task(process_item(item_id, data))                print(f"Dispatched task {item_id} for data '{data}'")                item_count += 1                # 即使使用TaskGroup,如果生成任务的速度远超任务执行速度,                # 也可以考虑在此处加入一个短暂的await,以避免内存中积压过多未开始的任务。                # 例如: if item_count % 5 == 0: await asyncio.sleep(0.01)        print(f"--- TaskGroup Finished. All {item_count} tasks completed or cancelled. ---")    else:        print("asyncio.TaskGroup not available (Python < 3.11). Falling back to explicit yield.")        loop = asyncio.get_event_loop()        for item_id, data in item_generator():            loop.create_task(process_item(item_id, data))            print(f"Dispatched task {item_id} for data '{data}'")            item_count += 1            await asyncio.sleep(0) # 显式让出控制权        print(f"--- Dispatcher Finished creating {item_count} tasks. Waiting for them to complete. ---")        # 由于是手动创建任务且没有TaskGroup等待,需要额外等待所有任务完成        await asyncio.sleep(3) # 粗略等待,实际应用中可能需要更精细的等待机制async def main():    """主程序入口"""    await task_dispatcher()    print("All dispatching and processing should be complete.")if __name__ == '__main__':    asyncio.run(main())

最佳实践:

选择合适的并发工具Python 3.11+:优先使用 asyncio.TaskGroup,它提供了结构化并发的优势,简化了任务管理、错误处理和资源清理。Python :使用 loop.create_task() 结合 await asyncio.sleep(0) 是实现非阻塞任务调度的有效方法。流量控制与背压:如果任务生成器产生任务的速度远超事件循环处理任务的速度,可能会导致内存占用过高或系统负载过大。在这种情况下,需要引入流量控制机制,例如:信号量 (Semaphore):限制同时运行的并发任务数量。队列 (Queue):将任务参数放入 asyncio.Queue,由固定数量的消费者协程从队列中取出并执行任务。批处理:一次性生成并调度一批任务,然后等待这批任务完成或达到某个阈值后再生成下一批。错误处理:在 TaskGroup 中,如果任何子任务抛出异常,TaskGroup 会捕获它并在退出上下文时重新抛出 ExceptionGroup(Python 3.11+)。这使得集中处理错误变得容易。对于手动 create_task 的情况,需要单独管理任务的异常,例如通过 task.add_done_callback() 或收集任务引用并在稍后 await 它们以捕获异常。任务生命周期管理:如果需要取消正在运行的任务,或者获取任务的结果,需要保留 Task 对象的引用。TaskGroup 在退出时会自动处理取消和等待,但在更复杂的场景中,可能仍需手动管理任务引用。

总结

在 asyncio 中从任务生成器实现高效异步并发执行的核心在于理解事件循环的协作式调度机制。仅仅通过 create_task() 创建任务不足以实现并发,关键在于主调度逻辑必须周期性地让出控制权给事件循环。

对于 Python 3.11 及更高版本,推荐使用 asyncio.TaskGroup,它提供了一种结构化、健壮且易于管理任务生命周期和错误处理的并发模式。对于 Python 3.10 及更低版本,await asyncio.sleep(0) 是一个有效的技巧,能够强制协程让出控制权,从而允许事件循环调度其他已创建的任务。

无论采用哪种方法,理解 await 的作用以及事件循环的工作原理,是构建高效、响应式 asyncio 应用程序的基础。在实际应用中,还需要结合流量控制、错误处理等机制,确保系统的稳定性和可扩展性。

以上就是Python asyncio:从任务生成器实现高效异步并发执行的原理与实践的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369671.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 09:51:50
下一篇 2025年12月14日 09:52:00

相关推荐

  • 使用TaskGroup实现异步任务生成器的任务执行

    本文介绍了如何使用异步任务生成器和 asyncio 库在 Python 中实现异步任务执行。核心思想是利用 asyncio.TaskGroup (Python 3.11+) 创建任务组,并使用 create_task 方法将生成器产生的任务添加到任务组中,同时通过 await asyncio.sle…

    好文分享 2025年12月14日
    000
  • PyTorch CNN训练后只输出单一结果的解决方法

    问题背景与摘要 正如摘要中所述,在训练图像分类的CNN模型时,可能会遇到模型在训练过程中输出结果单一的问题,即使损失函数看起来正常下降。这种现象通常表明模型陷入了局部最优解,或者数据存在某些问题导致模型无法有效地学习到不同类别之间的区分性特征。本文将深入探讨这一问题,并提供相应的解决方案。 常见原因…

    2025年12月14日
    000
  • PyTorch CNN训练中模型预测单一类别的调试与优化

    本文旨在解决PyTorch CNN模型在训练过程中出现预测结果单一化、模型收敛异常但损失函数平滑下降的问题。通过分析常见的训练陷阱,如梯度累积、数据归一化缺失及类别不平衡,提供了详细的解决方案和代码示例,包括正确使用optimizer.zero_grad()、实现数据标准化以及利用CrossEntr…

    2025年12月14日
    000
  • 将包含CST时区的字符串转换为datetime对象

    本文介绍如何将包含CST(中国标准时间)时区信息的字符串转换为Python的datetime对象。通过使用pandas库的to_datetime()函数,并结合时区映射,可以有效地处理这类时间字符串的转换,从而方便后续的时间操作和分析。 在处理时间数据时,经常会遇到包含时区信息的字符串。例如,&#8…

    2025年12月14日
    000
  • PyTorch CNN训练输出异常:单一预测与解决方案

    本文探讨PyTorch CNN在训练过程中输出结果趋于单一类别的问题,即使损失函数平稳下降。核心解决方案在于对输入数据进行适当的归一化处理,并针对数据不平衡问题采用加权交叉熵损失函数,以提升模型预测的多样性和准确性,从而避免模型偏向于预测某一特定类别。 问题现象分析 在卷积神经网络(cnn)图像分类…

    2025年12月14日
    000
  • Python slice 对象的高级用法:优雅地实现切片至序列末尾

    本教程探讨了Python slice() 函数在创建切片对象时,如何优雅地处理切片至序列末尾的场景。尽管 slice() 构造器要求 stop 参数,但通过将 None 作为 stop 参数传入,开发者可以灵活地定义等同于 [start:] 的切片行为,从而实现更通用的数据处理和代码复用。 理解 s…

    2025年12月14日
    000
  • Python 类与方法:交易策略模拟实现

    本文旨在解决Python类中实例属性和类属性混淆导致的方法调用问题。通过一个交易策略模拟的例子,详细讲解如何正确定义和使用实例属性,以及如何在方法中修改实例属性的值。本文将提供清晰的代码示例,并解释常见的错误用法,帮助读者更好地理解Python面向对象编程中的关键概念。 理解实例属性与类属性 在Py…

    2025年12月14日
    000
  • Python类与方法:交易员行为模拟

    本文旨在帮助初学者理解Python类和方法的正确使用,特别是实例属性和类属性的区别。通过一个交易员行为模拟的例子,我们将详细讲解如何定义类、初始化实例属性,以及编写能够根据价格采取买入、卖出或持有操作的方法,并更新相应的状态变量。我们将重点关注__init__方法的作用,以及如何使用self关键字来…

    2025年12月14日
    000
  • Python 类与方法:实例属性与类属性的区别及应用

    本文旨在帮助初学者理解Python中类和方法的正确使用,特别是实例属性和类属性的区别。我们将通过一个交易员(trader)类的例子,详细讲解如何定义和使用实例属性,以及如何根据价格采取相应的买卖操作,并更新交易数量。通过学习本文,你将能够避免常见的错误,编写出更加健壮和易于维护的Python代码。 …

    2025年12月14日
    000
  • Python 类与对象:实例属性的正确管理与 self 的应用

    本文深入探讨Python面向对象编程中实例属性与类属性的正确使用。通过一个“交易者”类的实际案例,详细阐述了如何在__init__方法中初始化实例属性,以及如何通过self关键字在类方法中正确访问和修改它们,从而避免因混淆类变量与实例变量而导致的状态管理错误。 在python的面向对象编程中,理解和…

    2025年12月14日
    000
  • Python类与对象:深入理解实例属性和方法的正确使用

    本文深入探讨Python类中实例属性与类属性的正确使用。通过一个交易者类示例,揭示了将可变数据类型作为类属性及未正确使用self访问实例属性的常见错误。文章详细阐述了在__init__方法中初始化实例属性的重要性,并指导如何通过self关键字在方法中正确操作这些属性,以确保每个对象拥有独立的状态,避…

    2025年12月14日
    000
  • Python 统计 CSV 文件中数字个数的实用指南

    这段代码展示了一种统计 CSV 文件中数字个数的有效方法。它通过逐行读取文件,使用逗号分隔每行,并累加分割后的数字数量,最终输出 CSV 文件中所有数字的总数。 file_path = ‘path_to_your_file.csv’count = 0# 打开文件并逐行读取with open(file…

    2025年12月14日
    000
  • Python统计CSV文件中数字数量的教程

    本文将介绍如何使用Python统计CSV文件中数字的个数。我们将逐行读取CSV文件,使用逗号分隔每行数据,并将分隔后的字符串转换为整数,最后统计数字的总数。通过本文的学习,你将掌握处理CSV文件和统计数据的基本技巧。 统计CSV文件中数字数量的步骤 要统计CSV文件中数字的数量,可以按照以下步骤进行…

    2025年12月14日
    000
  • Transformer模型处理长文本:stride参数的正确应用与实践

    本文深入探讨了在Transformer模型中处理长文本时,如何正确使用stride和truncation等参数,以避免预测中断的问题。我们详细阐述了这些参数在AutoTokenizer.__call__方法和pipeline初始化中的正确配置方式,并提供了具体的代码示例,帮助开发者实现对长文档的无缝…

    2025年12月14日
    000
  • Discord Bot集成指南:通过OAuth2授权将机器人添加到服务器

    本教程详细阐述了将Discord机器人添加到服务器的正确方法。与用户“加入”服务器不同,机器人必须由服务器管理员通过Discord OAuth2授权流程进行添加,而非通过代码主动“加入”邀请链接。文章将指导你构建正确的授权URL,并解释其工作原理及授权后的回调处理。 机器人与服务器的交互机制:核心概…

    2025年12月14日
    000
  • Python CSV文件中的数字元素计数教程

    本教程详细介绍了如何使用Python高效准确地统计CSV文件中独立数字元素的总数。文章通过分步解析文件读取、行内容处理、字符串分割及有效数字过滤等核心步骤,提供了一段优化后的Python代码示例,并讨论了处理空行、空字符串等常见场景的注意事项,旨在帮助用户精确统计CSV数据中的数字。 引言 在数据分…

    2025年12月14日
    000
  • Python统计CSV文件中独立数字个数的高效方法

    本教程详细介绍了如何使用Python准确统计CSV文件中独立数字的个数。针对CSV文件中数字可能分布在单行、多行,并以逗号分隔的复杂情况,文章提供了一种逐行读取、智能分割并过滤无效条目的解决方案,确保统计结果的精确性。 理解CSV数字计数的挑战 在处理csv文件时,我们经常需要统计其中特定类型的数据…

    2025年12月14日
    000
  • 针对SQLModel与SQLite应用的测试策略:使用临时数据库的实践指南

    本教程详细阐述了在测试使用SQLModel和SQLite数据库的CLI应用时,如何有效配置和管理临时数据库。核心内容包括解决sqlite3连接字符串与SQLModel引擎初始化时机不匹配的问题,确保测试环境的隔离性与一致性,并通过代码示例展示如何在pytest中使用tmp_path实现数据库的动态替…

    2025年12月14日
    000
  • 在SQLModel CLI应用中实现SQLite临时数据库测试的策略

    本教程旨在解决使用SQLModel和SQLite开发CLI应用时,在测试环节如何有效利用临时数据库的问题。我们将深入探讨在sqlite3模块和SQLModel中正确配置数据库连接字符串,并重点讲解如何动态地重新配置SQLModel的数据库引擎,以确保测试操作在独立的临时数据库上执行,从而避免测试间的…

    2025年12月14日
    000
  • 使用 PyLaTeX 生成带目录的 PDF 时目录为空的解决方案

    在使用 PyLaTeX 创建包含目录的 PDF 文档时,有时会遇到目录页仅显示 “Contents” 字样,而没有实际的章节和页码信息。这通常是因为 LaTeX 需要进行多次编译才能正确生成目录。第一次编译会提取文档中的章节信息并保存到中间文件中,第二次编译才会读取这些信息并…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信