Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

本教程深入探讨如何在Python asyncio中,从任务生成器实现异步任务的无阻塞并发执行。针对在不 await 任务完成的情况下,持续创建并调度新任务的需求,文章详细阐述了 asyncio 协程协作的本质,并提供了两种核心解决方案:通过 await asyncio.sleep(0) 显式让出控制权,以及利用 Python 3.11+ 的 asyncio.TaskGroup 实现更结构化的并发管理,确保任务能够真正地并行运行。

引言

在开发高性能、高并发的应用程序时,python的 asyncio 库提供了一种强大的异步编程范式。特别是在处理需要持续生成和调度任务的场景,例如长轮询服务器、事件驱动系统或数据流处理,如何有效地将这些任务添加到事件循环并确保它们能够并发执行,是一个常见的挑战。本文将深入探讨如何从一个任务生成器中,以异步、非阻塞的方式创建并执行任务,避免因等待单个任务完成而阻塞整个事件循环。

理解异步任务生成的挑战

考虑以下场景:我们有一个任务生成器,它会不断地产生新的任务参数。我们希望为每个参数创建一个异步任务,并将其提交给事件循环,但又不希望主逻辑(即生成任务的部分)停下来等待这些任务完成。

最初的尝试可能如下:

import asyncio, random async def wrapper(word: str):    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 模拟耗时操作    print(f"Finished task for: {word}")def generator():    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager():    loop = asyncio.get_event_loop()    for letter in generator():        loop.create_task(wrapper(letter)) # 创建任务,但不等待        # 问题在于:这里没有让出控制权,事件循环无法调度其他任务async def main():    await manager() # manager是一个无限循环,此处会阻塞if __name__ == '__main__':    # asyncio.run(manager()) # 这样调用会因为manager的无限循环而阻塞    # 需要一种方式让manager能够持续创建任务,同时让其他任务运行    pass # 暂时不运行,因为会阻塞

上述代码的问题在于,manager 协程内部的 for 循环会无限快速地运行,不断地调用 loop.create_task()。虽然 create_task 将 wrapper 协程包装成一个任务并提交给事件循环,但 manager 协身本身并没有任何 await 语句,这意味着它从不主动让出控制权给事件循环。结果是,事件循环没有机会去执行那些被创建的 wrapper 任务,因为 manager 始终占用着CPU。

核心概念回顾

要解决这个问题,我们需要理解 asyncio 的核心工作原理:

立即学习“Python免费学习笔记(深入)”;

事件循环 (Event Loop):asyncio 的核心,负责调度和执行协程。它是一个单线程的循环,通过轮询注册的协程,在协程 await 时暂停当前协程,并选择下一个准备好运行的协程执行。协程 (Coroutines) 与 任务 (Tasks):协程是可暂停和恢复的函数。asyncio.create_task() 将一个协程包装成一个 Task 对象,使其可以被事件循环调度。让出控制权 (Yielding):这是 asyncio 并发实现的关键。一个协程只有在遇到 await 表达式时,才会暂停自身并将控制权交还给事件循环。事件循环才能检查是否有其他任务准备就绪并执行它们。

解决方案一:显式让出控制权

最直接的解决方案是在 manager 协程的循环内部,显式地让出控制权。await asyncio.sleep(0) 是一个常用的技巧,它会立即暂停当前协程,并将控制权交还给事件循环。由于 sleep 的时间是0,事件循环会立即检查是否有其他任务准备就绪,并在下一个循环迭代中重新调度 manager 协程。

import asyncio, random async def wrapper(word: str):    """模拟一个耗时操作的异步任务"""    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 任务模拟    print(f"Finished task for: {word}")def generator():    """一个无限生成随机字母的生成器"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager_with_yield():    """    负责从生成器获取任务并调度,通过await asyncio.sleep(0)显式让出控制权。    """    loop = asyncio.get_event_loop()    print("Manager started, generating tasks...")    for i, letter in enumerate(generator()):        loop.create_task(wrapper(letter))        print(f"Task {i+1} created for '{letter}'")        await asyncio.sleep(0) # 关键:让出控制权,允许其他任务运行        # 实际应用中,可以根据需要增加一个短暂的等待,例如 await asyncio.sleep(0.01)        # 或者在处理一定数量任务后才让出控制权,以平衡调度开销和响应性。        if i >= 10: # 示例:限制生成任务的数量,否则会无限运行            print("Generated 10 tasks, stopping manager.")            breakasync def main_with_yield():    """主入口点,运行带有显式让出控制权的manager"""    await manager_with_yield()    # 等待所有已创建的wrapper任务完成    print("Manager finished, waiting for remaining tasks...")    await asyncio.sleep(2) # 给剩余任务一些时间完成if __name__ == '__main__':    print("--- Running Solution 1: Explicit Yielding ---")    asyncio.run(main_with_yield())    print("--- Solution 1 Finished ---")

注意事项:

await asyncio.sleep(0) 是一种有效的让出控制权的方式,它确保事件循环有机会处理其他已调度的任务。在实际应用中,manager 协程通常不会是无限循环,或者会有一个退出条件。如果它是无限循环,并且没有其他机制(如 asyncio.run 的 timeout 参数或外部信号)来停止它,程序将持续运行。这种方法虽然有效,但在语义上 sleep(0) 可能感觉像是一个“技巧”。

解决方案二:使用 asyncio.TaskGroup (Python 3.11+)

Python 3.11 引入了 asyncio.TaskGroup,这是一种更现代、更结构化的并发管理方式。TaskGroup 提供了一个上下文管理器,可以在其中创建任务。它会自动管理这些任务的生命周期,并在退出上下文时等待所有在其内部创建的任务完成(或处理异常)。更重要的是,TaskGroup 在内部会自动处理任务的调度和让出控制权,使得代码更加简洁和健壮。

import asyncio, random async def wrapper(word: str):    """模拟一个耗时操作的异步任务"""    print(f"Executing task for: {word}")    await asyncio.sleep(1) # 任务模拟    print(f"Finished task for: {word}")def generator():    """一个无限生成随机字母的生成器"""    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'    while True:        yield random.choice(abc)async def manager_with_taskgroup():    """    负责从生成器获取任务并调度,使用asyncio.TaskGroup进行结构化并发管理。    """    print("Manager started with TaskGroup, generating tasks...")    async with asyncio.TaskGroup() as tg: # 使用TaskGroup上下文管理器        for i, letter in enumerate(generator()):            tg.create_task(wrapper(letter)) # 在TaskGroup中创建任务            print(f"Task {i+1} created for '{letter}'")            # TaskGroup在内部会处理调度和让出控制权,通常无需额外的await asyncio.sleep(0)            # 但如果生成任务的速度极快,且任务本身耗时很短,            # 偶尔添加 await asyncio.sleep(0) 仍可能优化响应性。            if i >= 10: # 示例:限制生成任务的数量                print("Generated 10 tasks, stopping manager.")                break    print("TaskGroup exited. All tasks created within it should have completed or been cancelled.")async def main_with_taskgroup():    """主入口点,运行带有TaskGroup的manager"""    await manager_with_taskgroup()if __name__ == '__main__':    print("n--- Running Solution 2: Using asyncio.TaskGroup (Python 3.11+) ---")    # 确保Python版本 >= 3.11    if hasattr(asyncio, 'TaskGroup'):        asyncio.run(main_with_taskgroup())    else:        print("Warning: asyncio.TaskGroup requires Python 3.11 or later. Skipping this example.")    print("--- Solution 2 Finished ---")

TaskGroup 的优势:

结构化并发 (Structured Concurrency):所有在 TaskGroup 中创建的任务都在其作用域内管理。当 TaskGroup 退出时,它会等待所有子任务完成,或者在发生异常时优雅地取消它们。这极大地简化了错误处理和资源管理。隐式让出控制权:TaskGroup 的实现通常会确保事件循环得到足够的调度机会,减少了手动 await asyncio.sleep(0) 的必要性。代码清晰:通过上下文管理器,任务的生命周期和相互关系变得一目了然。

版本要求: asyncio.TaskGroup 需要 Python 3.11 或更高版本。对于旧版本Python,解决方案一仍然是可行的。

完整示例与最佳实践

结合上述两种方法,以下是一个更完整的示例,展示了如何从生成器高效地调度异步任务,并包含一些最佳实践的思考。我们优先推荐使用 TaskGroup。

import asyncioimport randomimport timeasync def process_item(item_id: int, data: str):    """模拟一个异步处理任务,打印处理信息并模拟耗时"""    start_time = time.time()    print(f"[{item_id}] Processing item: '{data}'...")    await asyncio.sleep(random.uniform(0.5, 2.0)) # 模拟随机耗时    end_time = time.time()    print(f"[{item_id}] Finished item: '{data}' in {end_time - start_time:.2f}s")def item_generator(max_items: int = 20):    """一个生成器,生成带ID的随机数据"""    abc = 'abcdefghijklmnopqrstuvwxyz'    for i in range(1, max_items + 1):        yield i, random.choice(abc) * random.randint(3, 8) # 生成随机长度的字符串async def task_dispatcher():    """    任务调度器,从生成器获取数据并创建异步任务。    优先使用TaskGroup,如果不可用则回退到显式让出控制权。    """    print("--- Task Dispatcher Started ---")    item_count = 0    if hasattr(asyncio, 'TaskGroup'):        print("Using asyncio.TaskGroup for task management.")        async with asyncio.TaskGroup() as tg:            for item_id, data in item_generator():                tg.create_task(process_item(item_id, data))                print(f"Dispatched task {item_id} for data '{data}'")                item_count += 1                # 即使使用TaskGroup,如果生成任务的速度远超任务执行速度,                # 也可以考虑在此处加入一个短暂的await,以避免内存中积压过多未开始的任务。                # 例如: if item_count % 5 == 0: await asyncio.sleep(0.01)        print(f"--- TaskGroup Finished. All {item_count} tasks completed or cancelled. ---")    else:        print("asyncio.TaskGroup not available (Python < 3.11). Falling back to explicit yield.")        loop = asyncio.get_event_loop()        for item_id, data in item_generator():            loop.create_task(process_item(item_id, data))            print(f"Dispatched task {item_id} for data '{data}'")            item_count += 1            await asyncio.sleep(0) # 显式让出控制权        print(f"--- Dispatcher Finished creating {item_count} tasks. Waiting for them to complete. ---")        # 由于是手动创建任务且没有TaskGroup等待,需要额外等待所有任务完成        await asyncio.sleep(3) # 粗略等待,实际应用中可能需要更精细的等待机制async def main():    """主程序入口"""    await task_dispatcher()    print("All dispatching and processing should be complete.")if __name__ == '__main__':    asyncio.run(main())

最佳实践:

选择合适的并发工具Python 3.11+:优先使用 asyncio.TaskGroup,它提供了结构化并发的优势,简化了任务管理、错误处理和资源清理。Python :使用 loop.create_task() 结合 await asyncio.sleep(0) 是实现非阻塞任务调度的有效方法。流量控制与背压:如果任务生成器产生任务的速度远超事件循环处理任务的速度,可能会导致内存占用过高或系统负载过大。在这种情况下,需要引入流量控制机制,例如:信号量 (Semaphore):限制同时运行的并发任务数量。队列 (Queue):将任务参数放入 asyncio.Queue,由固定数量的消费者协程从队列中取出并执行任务。批处理:一次性生成并调度一批任务,然后等待这批任务完成或达到某个阈值后再生成下一批。错误处理:在 TaskGroup 中,如果任何子任务抛出异常,TaskGroup 会捕获它并在退出上下文时重新抛出 ExceptionGroup(Python 3.11+)。这使得集中处理错误变得容易。对于手动 create_task 的情况,需要单独管理任务的异常,例如通过 task.add_done_callback() 或收集任务引用并在稍后 await 它们以捕获异常。任务生命周期管理:如果需要取消正在运行的任务,或者获取任务的结果,需要保留 Task 对象的引用。TaskGroup 在退出时会自动处理取消和等待,但在更复杂的场景中,可能仍需手动管理任务引用。

总结

在 asyncio 中从任务生成器实现高效异步并发执行的核心在于理解事件循环的协作式调度机制。仅仅通过 create_task() 创建任务不足以实现并发,关键在于主调度逻辑必须周期性地让出控制权给事件循环。

对于 Python 3.11 及更高版本,推荐使用 asyncio.TaskGroup,它提供了一种结构化、健壮且易于管理任务生命周期和错误处理的并发模式。对于 Python 3.10 及更低版本,await asyncio.sleep(0) 是一个有效的技巧,能够强制协程让出控制权,从而允许事件循环调度其他已创建的任务。

无论采用哪种方法,理解 await 的作用以及事件循环的工作原理,是构建高效、响应式 asyncio 应用程序的基础。在实际应用中,还需要结合流量控制、错误处理等机制,确保系统的稳定性和可扩展性。

以上就是Python asyncio:从任务生成器实现高效异步并发执行的原理与实践的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369671.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 09:51:50
下一篇 2025年12月14日 09:52:00

相关推荐

  • Uniapp 中如何不拉伸不裁剪地展示图片?

    灵活展示图片:如何不拉伸不裁剪 在界面设计中,常常需要以原尺寸展示用户上传的图片。本文将介绍一种在 uniapp 框架中实现该功能的简单方法。 对于不同尺寸的图片,可以采用以下处理方式: 极端宽高比:撑满屏幕宽度或高度,再等比缩放居中。非极端宽高比:居中显示,若能撑满则撑满。 然而,如果需要不拉伸不…

    2025年12月24日
    400
  • 如何让小说网站控制台显示乱码,同时网页内容正常显示?

    如何在不影响用户界面的情况下实现控制台乱码? 当在小说网站上下载小说时,大家可能会遇到一个问题:网站上的文本在网页内正常显示,但是在控制台中却是乱码。如何实现此类操作,从而在不影响用户界面(UI)的情况下保持控制台乱码呢? 答案在于使用自定义字体。网站可以通过在服务器端配置自定义字体,并通过在客户端…

    2025年12月24日
    800
  • 如何在地图上轻松创建气泡信息框?

    地图上气泡信息框的巧妙生成 地图上气泡信息框是一种常用的交互功能,它简便易用,能够为用户提供额外信息。本文将探讨如何借助地图库的功能轻松创建这一功能。 利用地图库的原生功能 大多数地图库,如高德地图,都提供了现成的信息窗体和右键菜单功能。这些功能可以通过以下途径实现: 高德地图 JS API 参考文…

    2025年12月24日
    400
  • 如何使用 scroll-behavior 属性实现元素scrollLeft变化时的平滑动画?

    如何实现元素scrollleft变化时的平滑动画效果? 在许多网页应用中,滚动容器的水平滚动条(scrollleft)需要频繁使用。为了让滚动动作更加自然,你希望给scrollleft的变化添加动画效果。 解决方案:scroll-behavior 属性 要实现scrollleft变化时的平滑动画效果…

    2025年12月24日
    000
  • 如何为滚动元素添加平滑过渡,使滚动条滑动时更自然流畅?

    给滚动元素平滑过渡 如何在滚动条属性(scrollleft)发生改变时为元素添加平滑的过渡效果? 解决方案:scroll-behavior 属性 为滚动容器设置 scroll-behavior 属性可以实现平滑滚动。 html 代码: click the button to slide right!…

    2025年12月24日
    500
  • 如何选择元素个数不固定的指定类名子元素?

    灵活选择元素个数不固定的指定类名子元素 在网页布局中,有时需要选择特定类名的子元素,但这些元素的数量并不固定。例如,下面这段 html 代码中,activebar 和 item 元素的数量均不固定: *n *n 如果需要选择第一个 item元素,可以使用 css 选择器 :nth-child()。该…

    2025年12月24日
    200
  • 使用 SVG 如何实现自定义宽度、间距和半径的虚线边框?

    使用 svg 实现自定义虚线边框 如何实现一个具有自定义宽度、间距和半径的虚线边框是一个常见的前端开发问题。传统的解决方案通常涉及使用 border-image 引入切片图片,但是这种方法存在引入外部资源、性能低下的缺点。 为了避免上述问题,可以使用 svg(可缩放矢量图形)来创建纯代码实现。一种方…

    2025年12月24日
    100
  • 如何解决本地图片在使用 mask JS 库时出现的跨域错误?

    如何跨越localhost使用本地图片? 问题: 在本地使用mask js库时,引入本地图片会报跨域错误。 解决方案: 要解决此问题,需要使用本地服务器启动文件,以http或https协议访问图片,而不是使用file://协议。例如: python -m http.server 8000 然后,可以…

    2025年12月24日
    200
  • 如何让“元素跟随文本高度,而不是撑高父容器?

    如何让 元素跟随文本高度,而不是撑高父容器 在页面布局中,经常遇到父容器高度被子元素撑开的问题。在图例所示的案例中,父容器被较高的图片撑开,而文本的高度没有被考虑。本问答将提供纯css解决方案,让图片跟随文本高度,确保父容器的高度不会被图片影响。 解决方法 为了解决这个问题,需要将图片从文档流中脱离…

    2025年12月24日
    000
  • 为什么 CSS mask 属性未请求指定图片?

    解决 css mask 属性未请求图片的问题 在使用 css mask 属性时,指定了图片地址,但网络面板显示未请求获取该图片,这可能是由于浏览器兼容性问题造成的。 问题 如下代码所示: 立即学习“前端免费学习笔记(深入)”; icon [data-icon=”cloud”] { –icon-cl…

    2025年12月24日
    200
  • 如何利用 CSS 选中激活标签并影响相邻元素的样式?

    如何利用 css 选中激活标签并影响相邻元素? 为了实现激活标签影响相邻元素的样式需求,可以通过 :has 选择器来实现。以下是如何具体操作: 对于激活标签相邻后的元素,可以在 css 中使用以下代码进行设置: li:has(+li.active) { border-radius: 0 0 10px…

    2025年12月24日
    100
  • 如何模拟Windows 10 设置界面中的鼠标悬浮放大效果?

    win10设置界面的鼠标移动显示周边的样式(探照灯效果)的实现方式 在windows设置界面的鼠标悬浮效果中,光标周围会显示一个放大区域。在前端开发中,可以通过多种方式实现类似的效果。 使用css 使用css的transform和box-shadow属性。通过将transform: scale(1.…

    2025年12月24日
    200
  • 为什么我的 Safari 自定义样式表在百度页面上失效了?

    为什么在 Safari 中自定义样式表未能正常工作? 在 Safari 的偏好设置中设置自定义样式表后,您对其进行测试却发现效果不同。在您自己的网页中,样式有效,而在百度页面中却失效。 造成这种情况的原因是,第一个访问的项目使用了文件协议,可以访问本地目录中的图片文件。而第二个访问的百度使用了 ht…

    2025年12月24日
    000
  • 如何用前端实现 Windows 10 设置界面的鼠标移动探照灯效果?

    如何在前端实现 Windows 10 设置界面中的鼠标移动探照灯效果 想要在前端开发中实现 Windows 10 设置界面中类似的鼠标移动探照灯效果,可以通过以下途径: CSS 解决方案 DEMO 1: Windows 10 网格悬停效果:https://codepen.io/tr4553r7/pe…

    2025年12月24日
    000
  • 使用CSS mask属性指定图片URL时,为什么浏览器无法加载图片?

    css mask属性未能加载图片的解决方法 使用css mask属性指定图片url时,如示例中所示: mask: url(“https://api.iconify.design/mdi:apple-icloud.svg”) center / contain no-repeat; 但是,在网络面板中却…

    2025年12月24日
    000
  • 如何用CSS Paint API为网页元素添加时尚的斑马线边框?

    为元素添加时尚的斑马线边框 在网页设计中,有时我们需要添加时尚的边框来提升元素的视觉效果。其中,斑马线边框是一种既醒目又别致的设计元素。 实现斜向斑马线边框 要实现斜向斑马线间隔圆环,我们可以使用css paint api。该api提供了强大的功能,可以让我们在元素上绘制复杂的图形。 立即学习“前端…

    2025年12月24日
    000
  • 图片如何不撑高父容器?

    如何让图片不撑高父容器? 当父容器包含不同高度的子元素时,父容器的高度通常会被最高元素撑开。如果你希望父容器的高度由文本内容撑开,避免图片对其产生影响,可以通过以下 css 解决方法: 绝对定位元素: .child-image { position: absolute; top: 0; left: …

    2025年12月24日
    000
  • 使用 Mask 导入本地图片时,如何解决跨域问题?

    跨域疑难:如何解决 mask 引入本地图片产生的跨域问题? 在使用 mask 导入本地图片时,你可能会遇到令人沮丧的跨域错误。为什么会出现跨域问题呢?让我们深入了解一下: mask 框架假设你以 http(s) 协议加载你的 html 文件,而当使用 file:// 协议打开本地文件时,就会产生跨域…

    2025年12月24日
    200
  • CSS 帮助

    我正在尝试将文本附加到棕色框的左侧。我不能。我不知道代码有什么问题。请帮助我。 css .hero { position: relative; bottom: 80px; display: flex; justify-content: left; align-items: start; color:…

    2025年12月24日 好文分享
    200
  • 前端代码辅助工具:如何选择最可靠的AI工具?

    前端代码辅助工具:可靠性探讨 对于前端工程师来说,在HTML、CSS和JavaScript开发中借助AI工具是司空见惯的事情。然而,并非所有工具都能提供同等的可靠性。 个性化需求 关于哪个AI工具最可靠,这个问题没有一刀切的答案。每个人的使用习惯和项目需求各不相同。以下是一些影响选择的重要因素: 立…

    2025年12月24日
    300

发表回复

登录后才能评论
关注微信