FastAPI启动事件中AsyncGenerator依赖注入的正确实践

FastAPI启动事件中AsyncGenerator依赖注入的正确实践

本文探讨了在FastAPI应用的startup事件中直接使用Depends()与AsyncGenerator进行资源(如Redis连接)初始化时遇到的问题,并指出Depends()不适用于此场景。核心内容是提供并详细解释了如何通过FastAPI的lifespan上下文管理器来正确、优雅地管理异步生成器依赖,确保应用启动时资源正确初始化,避免AttributeError。

问题背景:在应用启动时初始化异步资源

在构建基于fastapi的异步应用时,我们经常需要在应用启动时初始化一些全局资源,例如数据库连接池、消息队列客户端或缓存连接。fastapi提供了@app.on_event(“startup”)装饰器来处理这些启动任务。同时,为了更好地管理资源生命周期,我们通常会使用异步生成器(asyncgenerator)来创建和关闭这些资源,并结合fastapi的依赖注入系统depends()。然而,当尝试在startup事件中直接将asyncgenerator与depends()结合使用时,可能会遇到意料之外的错误。

考虑一个场景,我们需要在FastAPI应用启动时获取一个Redis异步客户端,并将其用于初始化一个全局的任务队列。我们可能尝试编写如下代码:

import uvicornfrom fastapi import FastAPI, Dependsimport redis.asyncio as redisfrom redis.asyncio import Redisfrom typing import AsyncGeneratorfrom rq import Queue # 假设rq是任务队列库# 配置Redis连接REDIS_HOST = "localhost"REDIS_PORT = 6379redis_pool = redis.ConnectionPool.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")async def get_async_redis_client() -> AsyncGenerator[Redis, None]:    """    异步生成器,用于提供Redis客户端连接。    """    async with Redis.from_pool(redis_pool) as client:        yield clientdef process_data(data: str):    """    模拟一个处理数据的函数。    """    print(f"Processing data: {data}")def create_app():    app = FastAPI(docs_url='/')    task_queue: Queue = None # 声明为None,稍后初始化    @app.on_event("startup")    async def startup_event(redis_conn: redis.asyncio.Redis = Depends(get_async_redis_client)):        """        尝试在startup事件中使用Depends()注入Redis连接。        """        nonlocal task_queue        task_queue = Queue("task_queue", connection=redis_conn)        print("Redis connection initialized in startup event.")    @app.post("/add_data")    async def add_data(data: str):        """        添加数据到任务队列。        """        if task_queue:            task_queue.enqueue(process_data, data)            return {"message": "Book in processing"}        return {"message": "Task queue not initialized", "status": "error"}    @app.get("/get_data")    async def get_data():        """        示例接口。        """        return {"data": "kek"}    return appdef main():    uvicorn.run(        f"{__name__}:create_app",        host='0.0.0.0', port=8888,        reload=True    )if __name__ == '__main__':    main()

当运行上述代码并尝试向/add_data端点发送POST请求时,会收到一个AttributeError: ‘Depends’ object has no attribute ‘pipeline’的错误。这表明在startup_event函数中,redis_conn变量并没有被解析成实际的redis.asyncio.Redis对象,而仍然是一个Depends对象。

理解FastAPI的依赖注入与启动事件

FastAPI的Depends()机制主要设计用于请求处理函数中的依赖解析。在请求处理的生命周期中,FastAPI会负责调用依赖函数(包括异步生成器),获取其yield出的值,并在请求结束后执行生成器中yield之后的清理代码。

然而,@app.on_event(“startup”)装饰器下的函数,其执行时机在整个应用开始接受请求之前,并且它不属于标准的请求-响应循环。FastAPI的依赖注入系统并不会像处理路由函数那样,自动解析startup事件函数参数中的Depends()。因此,redis_conn变量接收到的不是get_async_redis_client生成器yield出的Redis客户端实例,而是Depends(get_async_redis_client)这个Depends对象本身。当rq库尝试对这个Depends对象调用pipeline()方法时,自然会抛出AttributeError。

正确实践:利用lifespan管理异步生成器依赖

为了在应用启动时正确地初始化和管理异步生成器提供的资源,FastAPI推荐使用lifespan上下文管理器。lifespan是FastAPI 0.65.0版本引入的一种更现代、更灵活的应用生命周期管理方式,它允许我们定义一个异步上下文管理器,在应用启动前执行设置代码,并在应用关闭前执行清理代码。

通过lifespan,我们可以手动调用异步生成器,获取其yield出的资源,并将其存储在应用实例或全局变量中,供其他部分使用。

以下是使用lifespan解决上述问题的正确方法:

import uvicornfrom fastapi import FastAPIimport redis.asyncio as redisfrom redis.asyncio import Redisfrom typing import AsyncGeneratorfrom rq import Queue # 假设rq是任务队列库from contextlib import asynccontextmanager# 配置Redis连接REDIS_HOST = "localhost"REDIS_PORT = 6379redis_pool = redis.ConnectionPool.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")async def get_async_redis_client() -> AsyncGenerator[Redis, None]:    """    异步生成器,用于提供Redis客户端连接。    """    print("Opening Redis connection...")    async with Redis.from_pool(redis_pool) as client:        yield client    print("Closing Redis connection...") # 应用关闭时执行def process_data(data: str):    """    模拟一个处理数据的函数。    """    print(f"Processing data: {data}")# 定义一个全局变量来存储任务队列task_queue: Queue = None@asynccontextmanagerasync def lifespan(app: FastAPI):    """    FastAPI应用生命周期管理器。    在应用启动时初始化资源,在应用关闭时清理资源。    """    global task_queue # 声明使用全局变量    # 手动调用异步生成器以获取Redis连接    # 注意:这里直接调用get_async_redis_client(),并迭代它    # app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)    # 这一步是为了兼容可能存在的依赖覆盖,确保获取到的是最终的依赖函数    redis_generator_func = app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)    async for redis_conn in redis_generator_func():        # 在这里,redis_conn已经是实际的Redis客户端对象        task_queue = Queue("task_queue", connection=redis_conn)        print("Redis connection and Task Queue initialized via lifespan.")        yield # 应用在此处启动并处理请求    # 应用关闭时,生成器会继续执行,清理Redis连接    print("Application shutdown: Resources released.")def create_app():    app = FastAPI(        docs_url='/',        lifespan=lifespan # 将lifespan上下文管理器传递给FastAPI    )    @app.post("/add_data")    async def add_data(data: str):        """        添加数据到任务队列。        """        if task_queue:            task_queue.enqueue(process_data, data)            return {"message": "Book in processing"}        return {"message": "Task queue not initialized", "status": "error"}    @app.get("/get_data")    async def get_data():        """        示例接口。        """        return {"data": "kek"}    return appdef main():    uvicorn.run(        f"{__name__}:create_app",        host='0.0.0.0', port=8888,        reload=True    )if __name__ == '__main__':    main()

在这个修正后的代码中:

@asynccontextmanager装饰器: 我们使用contextlib.asynccontextmanager装饰器将lifespan函数转换为一个异步上下文管理器。lifespan函数: 这个函数现在负责整个应用的生命周期。在yield之前,它执行应用启动时的初始化逻辑。我们在这里手动调用get_async_redis_client()异步生成器,并通过async for循环获取yield出的redis_conn对象。app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)这一步是为了确保即使get_async_redis_client被覆盖(例如在测试环境中),lifespan也能获取到正确的依赖函数。task_queue被正确地用解析后的redis_conn对象初始化。yield语句将控制权交还给FastAPI,此时应用开始处理请求。当应用关闭时(例如通过发送中断信号),yield之后的代码会被执行,从而触发get_async_redis_client生成器中的清理逻辑(async with Redis.from_pool(redis_pool) as client:块的退出)。FastAPI(lifespan=lifespan): 在创建FastAPI应用实例时,通过lifespan参数注册我们定义的生命周期管理器。

注意事项

全局变量管理: 在lifespan函数中修改全局变量(如task_queue)时,务必使用global关键字来指示您正在修改全局作用域的变量,而不是创建局部变量。依赖覆盖兼容性: app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)是一个健壮的做法。它首先检查get_async_redis_client是否被app.dependency_overrides覆盖。如果没有,它就使用原始的get_async_redis_client函数。这对于测试和更复杂的应用场景非常有用。资源清理: 使用AsyncGenerator结合async with语句是管理异步资源生命周期的推荐方式。lifespan上下文管理器确保了AsyncGenerator的清理部分在应用关闭时被正确执行。startup事件与lifespan: 尽管@app.on_event(“startup”)仍然可用,但对于需要复杂初始化和清理逻辑的资源,或者需要与依赖注入系统交互的场景,lifespan提供了更强大、更清晰的机制。

总结

在FastAPI中,Depends()装饰器是为请求处理函数设计的依赖注入机制,不适用于@app.on_event(“startup”)事件。当需要在应用启动时利用AsyncGenerator初始化全局资源时,正确的做法是使用FastAPI的lifespan上下文管理器。通过手动调用异步生成器并将其结果存储在全局变量中,我们可以确保资源在应用启动时被正确初始化,并在应用关闭时被优雅地清理,从而避免因依赖解析不当导致的AttributeError。这种方法不仅解决了特定问题,也体现了FastAPI在应用生命周期管理上的灵活性和强大功能。

以上就是FastAPI启动事件中AsyncGenerator依赖注入的正确实践的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1374492.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月14日 14:11:58
下一篇 2025年12月14日 14:12:13

相关推荐

  • Python多线程如何实现条件变量 Python多线程复杂同步机制详解

    条件变量用于协调多线程执行,解决互斥锁无法处理的等待与通知问题。它结合锁和等待队列,支持线程在条件不满足时挂起并由其他线程唤醒,适用于生产者-消费者等场景。通过 threading.Condition 实现,推荐使用 with 语句管理锁,调用 wait() 前需持有锁,且应使用 while 循环检…

    2025年12月14日
    000
  • pyO3中从Rust检查Python自定义类实例类型的方法

    本文旨在解决在rust中使用pyo3库时,如何准确判断一个`pyany`对象是否为python中定义的自定义类实例的问题。针对用户在尝试使用`pytypeinfo`时遇到的困惑,文章将介绍一种更简洁、安全且推荐的方法:通过动态获取python类类型对象,并结合`pyany::is_instance(…

    2025年12月14日
    000
  • RichHandler与Rich Progress集成:解决显示冲突的教程

    在使用rich库的`richhandler`进行日志输出并同时使用`progress`组件时,可能会遇到显示错乱或溢出问题。这通常是由于为`richhandler`和`progress`分别创建了独立的`console`实例导致的。解决方案是确保日志处理器和进度条组件共享同一个`console`实例…

    2025年12月14日
    000
  • python模块的搜索路径和顺序

    Python导入模块时按顺序搜索路径:先当前脚本目录,再PYTHONPATH环境变量指定的目录,最后是安装默认路径如标准库和site-packages。可通过sys.path查看当前搜索路径列表,其顺序决定模块查找优先级。使用sys.path.insert(0, ‘path’…

    2025年12月14日
    000
  • Python有哪些命令行参数解析模块?

    推荐使用argparse解析命令行参数,它功能完整且用户友好,支持位置与可选参数、子命令、类型检查及自动生成帮助;getopt适用于简单场景或旧代码兼容;optparse已弃用;第三方库click采用装饰器风格,适合复杂CLI应用;fire由Google开发,可快速将函数或类转为命令行接口,适合原型…

    2025年12月14日
    000
  • Python异步中loop抛出异常的解决

    事件循环异常主因是生命周期管理不当和未捕获错误。1. 避免在子线程直接调用get_event_loop(),应使用asyncio.run()自动管理;2. 协程内需用try/except处理异常,gather设return_exceptions=True防中断;3. 禁止重复运行或过早关闭循环,确保…

    2025年12月14日
    000
  • python进程池的使用注意

    答案:使用Python进程池需在if name == ‘__main__’:中创建,合理设置进程数,及时关闭并回收资源,避免传递不可序列化的对象。 使用Python进程池时,关键在于合理管理资源和避免常见陷阱。进程池适合处理CPU密集型任务,但若使用不当,可能导致性能下降甚至…

    2025年12月14日
    000
  • Python中优雅处理函数调用中的冗余关键字参数:以模拟场景为例

    在python中,当函数调用方使用关键字参数,而函数定义方(尤其是模拟对象)不需要这些参数时,会遇到函数签名不匹配的问题。本文将介绍如何利用python的`**kwargs`语法,以一种简洁且符合pythonic的方式,捕获并忽略这些冗余的关键字参数,从而避免linter警告并保持代码的灵活性,尤其…

    2025年12月14日
    000
  • 使用OR-Tools CP-SAT加速大规模指派问题求解

    本文旨在解决使用`ortools.linear_solver`处理大规模指派问题时遇到的性能瓶颈,特别是当问题规模(n)超过40-50时。针对包含复杂定制约束(如特定id分配、id分组及id和限制)以及最小化最高与最低成本差值的目标函数,我们推荐并详细演示如何通过迁移至or-tools的cp-sat…

    2025年12月14日
    000
  • Python中高效合并嵌套字典的策略

    本文将深入探讨在python中高效合并两个或多个可能包含嵌套结构的字典的方法。针对键不完全重叠且需保留所有数据的场景,文章将详细介绍如何利用`setdefault()`和`update()`组合实现深度合并,确保数据完整性,并兼顾大型字典的性能需求,提供清晰的代码示例和原理分析。 理解字典合并的挑战…

    2025年12月14日
    000
  • 解决Windows 7上Python rtmidi库安装错误

    本文旨在帮助解决在Windows 7系统上安装Python rtmidi库时遇到的”Microsoft Visual C++ 14.0 or greater is required”错误。通过升级Python版本到3.11并使用pip安装rtmidi,可以有效解决此问题,从而…

    2025年12月14日
    000
  • 使用 pylintrc 文件为 “unused-argument” 指定参数列表

    本文介绍了如何使用 pylintrc 配置文件,通过 `ignored-argument-names` 选项,为 pylint 的 “unused-argument” 检查器指定需要忽略的参数名称列表,从而避免不必要的警告信息,提高代码检查的效率和准确性。 在 Python …

    2025年12月14日
    000
  • 使用 Snowpark 循环处理数据时避免覆盖先前结果

    本文旨在解决在使用 Snowpark 循环处理数据时,如何避免后续循环元素覆盖先前结果的问题。通过示例代码,展示了如何使用列表聚合的方式,将每次循环的结果添加到结果列表中,最终得到所有结果的并集,避免了结果被覆盖的情况。同时,也提供了使用 `append` 方法在 Pandas DataFrame …

    2025年12月14日
    000
  • 使用Docplex Python API识别和分析模型不可行约束

    本文旨在指导用户如何利用Docplex Python API中的`ConflictRefiner`工具,精确识别优化模型中导致不可行性的具体约束。我们将深入探讨如何从模型求解状态中检测不可行性,并通过`ConflictRefiner`的`display()`和`iter_conflicts()`方法…

    2025年12月14日
    000
  • 从Tkinter用户输入筛选Pandas DataFrame数据

    本文档旨在提供一个清晰、简洁的教程,讲解如何利用Tkinter获取用户输入,并以此为条件筛选Pandas DataFrame中的数据。通过示例代码和详细解释,帮助读者理解如何将用户界面与数据处理相结合,实现动态数据筛选功能。 使用Tkinter获取用户输入并筛选DataFrame 本教程将指导你如何…

    2025年12月14日
    000
  • 解决Pytest与Moto测试中DynamoDB上下文隔离的常见陷阱

    本文旨在探讨在Pytest测试框架中结合Moto库模拟DynamoDB服务时,因不当使用mock_dynamodb()上下文管理器而导致的资源不可见问题。核心内容是揭示Moto上下文的隔离性,并提供正确的实践方法,确保在Pytest fixture中创建的模拟资源能在测试函数中正确访问,从而避免因重…

    2025年12月14日
    000
  • 解决Gemini Pro API内容安全策略阻断回复的正确姿势

    本文旨在解决Gemini Pro API在使用`safety_settings`时仍遭遇内容阻断的问题。核心在于,许多开发者错误地使用字典配置安全设置,而API实际期望的是一个`SafetySetting`对象列表。本教程将详细指导如何正确导入相关类并构建符合API要求的安全设置,确保即使是敏感内容…

    2025年12月14日
    000
  • Django视图中基于用户过滤查询集的最佳实践

    本文旨在探讨在django应用中,如何高效且规范地实现基于当前登录用户的查询过滤。我们将明确django管理器(manager)与请求上下文的职责边界,指出在管理器中直接访问请求数据的弊端。核心解决方案是利用django的类视图mixin机制,创建可复用的逻辑来在视图层处理用户相关的查询过滤,从而避…

    2025年12月14日
    000
  • 合并具有不同字段的数组结构列

    本文档旨在指导读者如何在Spark DataFrame中合并两个具有不同字段的数组结构列。通过使用`transform`和`filter`函数,我们可以高效地将两个数组中的结构体进行匹配和合并,最终生成包含所有所需字段的新数组结构列。本文将提供详细的代码示例和解释,帮助读者理解和应用这一技术。 在处…

    2025年12月14日
    000
  • Python中对复杂JSON数据结构中嵌套对象数组进行日期字段排序的实战指南

    本教程详细讲解如何在python中对复杂json数据结构中嵌套的对象数组进行排序。针对包含特定日期字段(如`startdate`)的数组,我们将通过递归函数遍历json,精确识别并利用`datetime`模块将字符串日期转换为可比较的日期对象,实现从最新到最旧的倒序排列,从而高效地管理和组织深度嵌套…

    2025年12月14日
    000

发表回复

登录后才能评论
关注微信