
本教程详细阐述了如何在 fastapi 中实现多种认证机制(如 basic auth 和 jwt auth),并允许客户端任选其一进行认证。核心方法是修改各个认证依赖项,使其在认证失败时返回 `none` 而非立即抛出异常,从而使一个组合认证依赖能够基于“或”逻辑判断任一认证是否成功,最终实现灵活的多重认证支持。
在构建现代 Web API 时,支持多种认证方式以满足不同客户端或场景的需求是常见的实践。例如,您可能希望同时支持传统的 HTTP Basic 认证和基于令牌的 JWT 认证。然而,在 FastAPI 中直接将多个认证依赖项组合使用时,默认行为可能会导致问题:FastAPI 会强制所有依赖项都通过验证,如果其中一个依赖项在解析阶段抛出 HTTPException,那么后续的依赖项将不会被执行,从而无法实现“任选其一”的逻辑。
理解 FastAPI 依赖注入的默认行为
FastAPI 的依赖注入系统在处理多个 Depends 依赖时,遵循严格的顺序和错误处理机制。当一个依赖项(例如,一个认证函数)内部抛出 HTTPException 时,FastAPI 会立即捕获该异常并返回相应的 HTTP 响应,而不会继续执行后续的依赖项或路由处理函数。这意味着,如果您尝试通过一个自定义的组合依赖函数来包装多个认证依赖,并期望它们能像逻辑“或”一样工作,那么默认情况下是行不通的。
例如,以下尝试实现“或”逻辑的组合认证依赖:
# 假设 jwt_logged_user 和 basic_logged_user 在认证失败时会抛出 HTTPExceptiondef auth_user(jwt_auth: Any = Depends(jwt_logged_user), basic_auth: Any = Depends(basic_logged_user)): if jwt_auth or basic_auth: return jwt_auth or basic_auth raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Credentials')
如果 jwt_logged_user 认证失败并抛出异常,那么 basic_auth 依赖将永远不会被解析,auth_user 函数体也永远不会执行,从而无法检查 Basic Auth 是否有效。
核心解决方案:优雅地处理认证失败
要实现“任选其一”的认证逻辑,关键在于修改各个独立的认证依赖项,使其在认证失败时不再立即抛出 HTTPException,而是返回一个指示失败的值(例如 None)。这样,组合认证依赖就可以接收到所有独立认证的结果,并根据这些结果进行逻辑判断。
1. 修改 HTTP Basic 认证依赖
对于 HTTP Basic 认证,FastAPI 提供了 HTTPBasic 类。通过设置 auto_error=False,可以禁用其在认证失败时自动抛出 HTTPException 的行为。
首先,更新 HTTPBasic 实例:
from fastapi.security import HTTPBasic, HTTPBasicCredentialsfrom typing import Annotated, Optionalimport secrets# ... 其他导入 ...# 将 auto_error 设置为 Falsesecurity = HTTPBasic(auto_error=False)def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]): """ 处理 HTTP Basic 认证。 如果认证失败,返回 None 而非抛出异常。 """ if credentials is None: # 没有提供 Basic 认证凭据,或者凭据格式不正确 (由 auto_error=False 处理) return None current_username_bytes = credentials.username.encode("utf8") correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8") is_correct_username = secrets.compare_digest( current_username_bytes, correct_username_bytes ) current_password_bytes = credentials.password.encode("utf8") correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8") is_correct_password = secrets.compare_digest( current_password_bytes, correct_password_bytes ) if not (is_correct_username and is_correct_password): # 凭据不匹配,返回 None return None # 认证成功,返回用户名 return credentials.username
关键点:
security = HTTPBasic(auto_error=False):禁用 HTTPBasic 在凭据缺失或格式错误时自动抛出 401 异常。credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]:声明 credentials 为 Optional 类型,因为 security 在 auto_error=False 时可能返回 None。if credentials is None::检查是否提供了凭据。if not (is_correct_username and is_correct_password): return None:在自定义逻辑中,如果凭据无效,也返回 None。
2. 修改 JWT 认证依赖
对于 JWT 认证,通常会使用 OAuth2PasswordBearer。同样,通过设置 auto_error=False 来禁用其自动错误处理。此外,您的令牌验证函数 (utils.verify_token) 也应该被修改,以在验证失败时返回 None 或被 try-except 块包裹。
假设 utils.OAuth2_scheme 是 OAuth2PasswordBearer 的实例,并且 utils.verify_token 函数负责验证 JWT。
from fastapi.security import OAuth2PasswordBearerfrom sqlalchemy.orm import Sessionfrom typing import Annotated, Optionalfrom jose import JWTError, jwt # 假设您使用 jose 库from pydantic import BaseModel# ... 其他导入 ...# 假设 utils 模块包含 OAuth2_scheme 和 verify_token# utils.OAuth2_scheme 应该被初始化为 auto_error=False# 例如:# class TokenData(BaseModel):# username: Optional[str] = None# class Utils:# OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)# SECRET_KEY = "your-secret-key" # 替换为实际的密钥# ALGORITHM = "HS256"# def verify_token(self, token: str) -> dict:# try:# payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])# username: str = payload.get("sub")# if username is None:# raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")# return {"username": username}# except JWTError:# raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")# utils = Utils()def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)], db: Session = Depends(db_session)): """ 处理 JWT 认证。 如果认证失败,返回 None 而非抛出异常。 """ if token is None: # 没有提供 JWT 令牌,或者令牌格式不正确 (由 auto_error=False 处理) return None try: # 尝试验证令牌,并从 payload 中获取用户名 # 假设 utils.verify_token 在验证失败时会抛出 HTTPException 或 JWTError payload = utils.verify_token(token) # 假设返回一个包含用户信息的字典 username = payload.get("username") # 根据实际 payload 结构调整 if username is None: return None # 令牌有效但没有找到用户名信息 # 查询数据库以验证用户是否存在 user = db.query(User).filter(User.username == username).first() if user is None: return None # 用户不存在 # 认证成功,返回用户名 return user.username except HTTPException: # 捕获 verify_token 内部抛出的 HTTPException return None except JWTError: # 捕获 jose 库的 JWT 错误 return None except Exception: # 捕获其他未知错误 return None
关键点:
utils.OAuth2_scheme = OAuth2PasswordBearer(tokenUrl=”token”, auto_error=False):确保 OAuth2PasswordBearer 实例设置了 auto_error=False。token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)]:声明 token 为 Optional 类型。if token is None::检查是否提供了令牌。try…except 块:包裹 utils.verify_token 调用,捕获可能抛出的异常,并在捕获到异常时返回 None。return user.username:认证成功后返回用户名,与 Basic 认证的返回类型保持一致。
3. 实现组合认证逻辑
现在,当 basic_logged_user 和 jwt_logged_user 认证失败时都返回 None,我们可以创建一个新的组合依赖项来检查任一认证是否成功。
from fastapi import HTTPException, status, Dependsfrom typing import Annotated, Optional, Union# ... 其他导入 ...def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)], basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str: """ 组合认证依赖,允许通过 JWT 或 Basic Auth 中的任一种进行认证。 如果任一认证成功,返回对应的用户名。 如果两种认证都失败,则抛出 401 异常。 """ if jwt_username: # JWT 认证成功 return jwt_username if basic_username: # Basic Auth 认证成功 return basic_username # 如果两种认证都失败,则抛出未授权异常 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Credentials', headers={"WWW-Authenticate": "Basic, Bearer"}, # 提示支持的认证方式 )# 路由使用组合认证依赖@router.get("/users/")async def get_users(db: Session = Depends(db_session), logged_username: Annotated[str, Depends(auth_user)]): """ 获取用户列表,需要通过 JWT 或 Basic Auth 认证。 """ # 假设您需要根据用户名进行某些操作,例如权限检查 print(f"Authenticated user: {logged_username}") query_users = db.query(User).all() return query_users
关键点:
jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)] 和 basic_username: Annotated[Optional[str], Depends(basic_logged_user)]:接收两个子认证依赖的结果,它们现在可以是 None。if jwt_username: return jwt_username:如果 JWT 认证成功(返回了用户名),则直接返回该用户名。if basic_username: return basic_username:如果 JWT 认证失败但 Basic Auth 成功,则返回 Basic Auth 的用户名。raise HTTPException(…):只有当所有子认证都失败时,才由 auth_user 统一抛出 401 异常。这样确保了只有在没有任何有效凭据的情况下才拒绝访问。headers={“WWW-Authenticate”: “Basic, Bearer”}:在 401 响应头中明确告知客户端支持的认证方式。
完整示例代码结构
为了更好地理解,以下是一个整合了上述修改的 FastAPI 应用片段:
from fastapi import APIRouter, Depends, HTTPException, statusfrom fastapi.security import HTTPBasic, HTTPBasicCredentials, OAuth2PasswordBearerfrom sqlalchemy.orm import Sessionfrom typing import Annotated, Optional, Unionimport secretsfrom jose import JWTError, jwt # 假设使用 jose 库# 假设这些是您的配置和模型class Settings: SESSION_LOGIN_USER = "admin" SESSION_LOGIN_PASS = "securepassword" JWT_SECRET_KEY = "your-super-secret-jwt-key" # 替换为实际的密钥 JWT_ALGORITHM = "HS256"settings = Settings()# 假设 db_session 是您的数据库会话依赖# 假设 User 是您的 SQLAlchemy 用户模型class User: # 简化示例 def __init__(self, username): self.username = username def __repr__(self): return f""class MockDBSession: # 模拟数据库会话 def query(self, model): return self def filter(self, *args, **kwargs): return self def first(self): return User("testuser") # 模拟返回一个用户 def all(self): return [User("user1"), User("user2")]def get_db(): db = MockDBSession() try: yield db finally: pass # 实际应用中会关闭会话db_session = Depends(get_db)# JWT 相关的工具类class Utils: OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", auto_error=False) SECRET_KEY = settings.JWT_SECRET_KEY ALGORITHM = settings.JWT_ALGORITHM def verify_token(self, token: str) -> dict: try: payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM]) username: str = payload.get("sub") if username is None: raise JWTError("Invalid token payload: no subject") return {"username": username} except JWTError as e: # 在这里可以记录错误,但不再抛出 HTTPException raise e # 重新抛出 JWTError,由 jwt_logged_user 捕获 except Exception as e: raise eutils = Utils()router = APIRouter()# --- 独立认证依赖项 ---# Basic Authsecurity = HTTPBasic(auto_error=False)def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]) -> Optional[str]: if credentials is None: return None current_username_bytes = credentials.username.encode("utf8") correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8") is_correct_username = secrets.compare_digest( current_username_bytes, correct_username_bytes ) current_password_bytes = credentials.password.encode("utf8") correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8") is_correct_password = secrets.compare_digest( current_password_bytes, correct_password_bytes ) if not (is_correct_username and is_correct_password): return None return credentials.username# JWT Authdef jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)], db: Session = Depends(db_session)) -> Optional[str]: if token is None: return None try: payload = utils.verify_token(token) username = payload.get("username") if username is None: return None user = db.query(User).filter(User.username == username).first() if user is None: return None return user.username except JWTError: return None except Exception: return None# --- 组合认证依赖项 ---def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)], basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str: if jwt_username: return jwt_username if basic_username: return basic_username raise HTTPException( status_code=status.
以上就是FastAPI 多种认证方式(任选其一)实现指南的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1382815.html
微信扫一扫
支付宝扫一扫