asyncio通过协程实现单线程并发,适用于I/O密集型任务。使用async/await定义和调用协程,通过事件循环调度执行。可用asyncio.run()启动主协程,create_task()并发运行多个协程,gather()等待所有协程完成。异常处理需在await时捕获,未处理异常会存储于Task中。避免阻塞事件循环:使用异步I/O、将CPU密集型任务放入线程或进程池、用wait_for()设置超时、定期调用sleep(0)让出控制权。相比线程和进程,asyncio轻量高效,适合I/O密集场景;CPU密集任务应选进程或多线程;混合任务可结合使用三种方式。

Asyncio允许你编写并发代码,而无需线程或进程的复杂性。它本质上是一个单线程、事件循环的并发框架,通过协程(coroutines)实现非阻塞操作,从而提升I/O密集型任务的效率。
使用asyncio进行异步编程,简单来说,就是用
async
和
await
关键字定义和调用协程,并将它们注册到事件循环中执行。
解决方案:
定义协程: 使用
async def
定义一个协程函数。协程可以暂停执行,等待I/O操作完成,而不会阻塞整个程序。
import asyncioasync def my_coroutine(delay): print(f"协程开始,等待 {delay} 秒") await asyncio.sleep(delay) print("协程结束") return f"等待了 {delay} 秒的结果"
创建事件循环: asyncio的核心是事件循环,它负责调度和执行协程。
loop = asyncio.get_event_loop()
运行协程: 可以使用
loop.run_until_complete()
或
asyncio.run()
来运行协程。
asyncio.run()
简化了事件循环的创建和关闭,更适合简单的脚本。
async def main(): result = await my_coroutine(2) print(result)asyncio.run(main())
使用
await
关键字:
await
关键字用于等待一个
awaitable
对象(例如另一个协程、
Future
或
Task
)完成。
await
只能在
async
函数中使用。
async def another_coroutine(): await asyncio.sleep(1) return "另一个协程的结果"async def main(): result = await another_coroutine() print(result)asyncio.run(main())
创建Tasks:
asyncio.create_task()
函数可以创建一个
Task
对象,它代表一个正在运行的协程。 这允许你并发地运行多个协程。
async def main(): task1 = asyncio.create_task(my_coroutine(1)) task2 = asyncio.create_task(my_coroutine(2)) await task1 await task2 print("所有任务完成")asyncio.run(main())
并发执行多个协程: 使用
asyncio.gather()
可以并发地运行多个协程,并等待它们全部完成。
async def main(): results = await asyncio.gather( my_coroutine(1), my_coroutine(2), another_coroutine() ) print(results)asyncio.run(main())
如何处理asyncio中的异常?
在asyncio中处理异常与在常规Python代码中类似,但需要注意协程的特性。可以使用
try...except
块来捕获协程中抛出的异常。
import asyncioasync def might_fail(): await asyncio.sleep(0.5) raise ValueError("Something went wrong!")async def main(): try: await might_fail() except ValueError as e: print(f"Caught an error: {e}")asyncio.run(main())
如果一个任务(Task)内部抛出了未捕获的异常,该异常会被存储在Task对象中。当使用
await
等待该Task完成时,异常会被重新抛出。如果Task永远没有被
await
,则异常会被事件循环记录下来,但不会中断程序。
import asyncioasync def task_that_fails(): await asyncio.sleep(0.5) raise ValueError("Task failed!")async def main(): task = asyncio.create_task(task_that_fails()) await asyncio.sleep(1) # 让任务有时间完成 try: await task # 重新抛出异常 except ValueError as e: print(f"Caught error from task: {e}")asyncio.run(main())
可以使用
task.result()
方法来获取Task的结果或异常。如果Task已经完成并且没有抛出异常,则返回结果。如果Task抛出了异常,则该方法会重新抛出异常。
import asyncioasync def task_that_fails(): await asyncio.sleep(0.5) raise ValueError("Task failed!")async def main(): task = asyncio.create_task(task_that_fails()) await asyncio.sleep(1) try: task.result() # 重新抛出异常 except ValueError as e: print(f"Caught error from task: {e}")asyncio.run(main())
asyncio中如何避免阻塞事件循环?
避免阻塞asyncio事件循环是保证程序响应性的关键。事件循环是单线程的,如果一个协程执行了耗时的同步操作,它会阻塞整个事件循环,导致其他协程无法运行。
使用异步I/O操作: 这是最基本也是最重要的原则。使用asyncio提供的异步I/O操作,例如
asyncio.sleep()
、
asyncio.open_connection()
、
aiohttp
(异步HTTP客户端)等。这些操作会在等待I/O完成时释放事件循环的控制权,允许其他协程运行。
避免CPU密集型任务: 如果需要执行CPU密集型任务(例如图像处理、加密解密等),应该将这些任务放到单独的进程或线程中执行,避免阻塞事件循环。可以使用
asyncio.to_thread()
或
concurrent.futures
模块来实现。
import asyncioimport concurrent.futuresimport timedef cpu_bound_task(n): time.sleep(n) # 模拟耗时操作 return f"CPU密集型任务完成,耗时 {n} 秒"async def main(): loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_bound_task, 2) # 放在进程池中运行 print(result)asyncio.run(main())
限制协程的运行时间: 使用
asyncio.wait_for()
可以限制一个协程的运行时间。如果协程在指定时间内没有完成,
asyncio.wait_for()
会抛出一个
asyncio.TimeoutError
异常,可以捕获并处理该异常。
import asyncioasync def long_running_task(): await asyncio.sleep(5) return "任务完成"async def main(): try: result = await asyncio.wait_for(long_running_task(), timeout=2) print(result) except asyncio.TimeoutError: print("任务超时")asyncio.run(main())
使用异步库: 尽量使用异步库来替代同步库。例如,使用
aiohttp
替代
requests
,使用
asyncpg
或
aiosqlite
替代
psycopg2
或
sqlite3
。
定期释放控制权: 如果一个协程需要执行大量的计算或I/O操作,可以定期使用
await asyncio.sleep(0)
来释放事件循环的控制权,让其他协程有机会运行。
import asyncioasync def large_task(): for i in range(100000): # 模拟大量计算 _ = i * i if i % 1000 == 0: await asyncio.sleep(0) # 释放控制权 return "任务完成"async def main(): result = await large_task() print(result)asyncio.run(main())
如何选择合适的并发策略:asyncio vs 线程 vs 进程?
选择合适的并发策略取决于任务的性质和程序的需求。Asyncio、线程和进程各有优缺点,适用于不同的场景。
Asyncio:
优点:轻量级: 基于协程,切换开销小,资源占用少。高效I/O: 擅长处理I/O密集型任务,例如网络请求、文件读写等。单线程: 避免了线程锁的复杂性,减少了死锁和竞争条件。缺点:CPU密集型: 不适合CPU密集型任务,因为单线程无法充分利用多核CPU。阻塞操作: 如果有阻塞操作,会阻塞整个事件循环,影响程序的响应性。适用场景:Web服务器、网络爬虫、聊天服务器、实时数据处理等I/O密集型应用。
线程:
优点:并发执行: 可以并发执行CPU密集型任务,充分利用多核CPU。共享内存: 线程之间可以共享内存,方便数据交换。缺点:线程锁: 需要使用线程锁来保护共享资源,增加了编程复杂性。GIL限制: 在CPython中,由于全局解释器锁(GIL)的存在,同一时刻只能有一个线程执行Python字节码,限制了CPU密集型任务的并发性能。上下文切换: 线程切换开销比协程大。适用场景:GUI程序、多线程下载器、需要并发执行CPU密集型任务的应用。
进程:
优点:真正的并行: 可以利用多核CPU实现真正的并行,不受GIL限制。隔离性: 进程之间相互隔离,一个进程崩溃不会影响其他进程。缺点:资源占用: 进程创建和销毁开销大,资源占用多。通信开销: 进程间通信(IPC)开销比线程大。适用场景:科学计算、图像处理、视频编码等CPU密集型应用,需要高可靠性和隔离性的应用。
总结:
I/O密集型任务: 优先选择asyncio,如果asyncio遇到阻塞操作,可以考虑使用线程池或进程池来执行阻塞操作。CPU密集型任务: 优先选择进程,如果需要共享内存,可以考虑使用线程。混合型任务: 可以结合使用asyncio、线程和进程,例如使用asyncio处理I/O,使用线程或进程处理CPU密集型任务。
以上就是如何使用asyncio进行异步编程?的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1369746.html
微信扫一扫
支付宝扫一扫