【Python】FastAPI:Token认证

FastAPI:Token认证

本教程通过 FastAPI 实现用户登录和基于 JWT(JSON Web Token) 的认证与授权,适合初学者到进阶用户。教程特别关注 DependsOAuth2PasswordBearer 等非基础操作的详细讲解,帮助你全面掌握相关技术。

环境准备

首先,确保安装必要的依赖库:

pip install fastapi uvicorn PyJWT

什么是 JWT

JWT (JSON Web Token) 是一种轻量级的认证机制,常用于客户端与服务器之间的通信。它主要包含三部分:

  • Header:算法与类型。
  • Payload:包含用户信息及声明。
  • Signature:用于校验数据完整性。
  • JWT 示例:

    eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
    .eyJzdWIiOiJ1c2VyMSIsImV4cCI6MTY5MjMyMzAwMH0
    .Wu6HB7pCDHgVvmD3_a8Ev9fGzY1Kc0FnVmCvO1Wl1qM
    

    用户登录的基础实现

    创建一个简单的用户登录接口,验证用户名和密码。首先实现模拟的用户数据库和基本的登录验证:

    from fastapi import FastAPI, HTTPException
    from fastapi.security import OAuth2PasswordRequestForm
    
    app = FastAPI()
    
    # 模拟用户数据库
    USERS_DB = {
        "admin": "password123",
        "user": "mypassword"
    }
    
    @app.post("/login")
    async def login(form_data: OAuth2PasswordRequestForm):
        """
        登录接口,验证用户名和密码是否匹配。
        """
        username = form_data.username
        password = form_data.password
    
        if username not in USERS_DB or USERS_DB[username] != password:
            raise HTTPException(status_code=401, detail="用户名或密码错误")
    
        return {"message": f"欢迎回来,{username}!"}
    

    启动服务:

    uvicorn main:app --reload
    

    访问 http://127.0.0.1:8000/docs,测试 /login 接口。

    JWT 的生成与验证

    引入 PyJWT 生成令牌。我们将为成功登录的用户生成一个包含身份信息和过期时间的 JWT。

    import jwt
    import datetime
    
    SECRET_KEY = "your_secret_key"  # 替换为你的密钥
    ALGORITHM = "HS256"            # 签名算法
    TOKEN_EXPIRE_HOURS = 1         # 令牌有效时间(小时)
    
    def create_jwt(username: str) -> str:
        """
        生成 JWT。
        """
        payload = {
            "sub": username,  # 用户名
            "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=TOKEN_EXPIRE_HOURS)  # 过期时间
        }
        return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
    
    @app.post("/login")
    async def login(form_data: OAuth2PasswordRequestForm):
        """
        登录接口:验证用户名密码并返回 JWT。
        """
        username = form_data.username
        password = form_data.password
    
        if username not in USERS_DB or USERS_DB[username] != password:
            raise HTTPException(status_code=401, detail="用户名或密码错误")
    
        token = create_jwt(username)
        return {"access_token": token, "token_type": "bearer"}
    

    FastAPI 的认证工具

    什么是 Depends

    Depends 是 FastAPI 中的依赖注入工具,用于在路由中动态引入逻辑。例如,可以通过它获取登录用户的 Token,或者验证 Token 的合法性。

    什么是 OAuth2PasswordBearer

    OAuth2PasswordBearer 是 FastAPI 提供的一个工具,帮助我们解析 Bearer Token。它会从请求头的 Authorization 字段中提取 Token。

    使用示例

    from fastapi.security import OAuth2PasswordBearer
    
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")  # 指定获取 Token 的登录端点
    
  • 当我们在路由中依赖 oauth2_scheme 时,它会自动解析 Token,并作为参数传递给路由函数。
  • 实现受保护路由和 Token 验证

    我们在这里验证 Token 的有效性,并返回用户信息。

    from fastapi import Depends
    
    def decode_jwt(token: str):
        """
        验证并解码 JWT。
        """
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            return payload
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="令牌已过期")
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail="令牌无效")
    
    @app.get("/protected")
    async def protected_route(token: str = Depends(oauth2_scheme)):
        """
        受保护的路由,需提供有效 JWT。
        """
        payload = decode_jwt(token)
        username = payload.get("sub")
        if not username:
            raise HTTPException(status_code=401, detail="令牌无效")
    
        return {"message": f"欢迎回来,{username}!这是一个受保护的路由。"}
    
    1. 使用 /login 接口获取 Token。
    2. 访问 /protected,并在 Authorization 请求头中添加 Bearer <your_token>

    优化代码结构(解耦与扩展)

    为了更好的维护和扩展,将用户管理和 JWT 逻辑拆分到独立模块中。

    # services/user_service.py
    class UserService:
        """
        用户服务类:处理用户验证。
        """
        def __init__(self):
            self.users_db = {
                "admin": "password123",
                "user": "mypassword"
            }
    
        def authenticate(self, username: str, password: str) -> bool:
            """
            验证用户名和密码是否匹配。
            """
            return self.users_db.get(username) == password
    
    import jwt
    import datetime
    from fastapi import HTTPException
    
    class JWTHandler:
        """
        JWT 工具类:负责生成和验证 JWT。
        """
        SECRET_KEY = "your_secret_key"
        ALGORITHM = "HS256"
    
        @staticmethod
        def create_token(username: str) -> str:
            payload = {
                "sub": username,
                "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
            }
            return jwt.encode(payload, JWTHandler.SECRET_KEY, algorithm=JWTHandler.ALGORITHM)
    
        @staticmethod
        def decode_token(token: str):
            try:
                return jwt.decode(token, JWTHandler.SECRET_KEY, algorithms=[JWTHandler.ALGORITHM])
            except jwt.ExpiredSignatureError:
                raise HTTPException(status_code=401, detail="令牌已过期")
            except jwt.InvalidTokenError:
                raise HTTPException(status_code=401, detail="令牌无效")
    
    from fastapi import FastAPI, Depends, HTTPException
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    from user_service import UserService
    from jwt_handler import JWTHandler
    
    app = FastAPI()
    user_service = UserService()
    jwt_handler = JWTHandler()
    
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
    @app.post("/login")
    async def login(form_data:OAuth2PasswordRequestForm = Depends()):
        if not user_service.verify_user(form_data.username, form_data.password):
            raise HTTPException(status_code=401, detail="用户名或密码错误")
        token = jwt_handler.create_token(form_data.username)
        return {"access_token": token, "token_type": "bearer"}
    
    @app.get("/protected")
    async def protected_route(token: str = Depends(oauth2_scheme)):
        payload = jwt_handler.decode_token(token)
        username = payload.get("sub")
        return {"message": f"欢迎回来,{username}!"}
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="127.0.0.1", port=8000)
    

    总结

    通过本教程,你学会了:

    1. 基于 FastAPI 实现用户登录和 JWT Token 认证。
    2. 使用 DependsOAuth2PasswordBearer 实现认证逻辑。
    3. 解耦代码结构,提升可扩展性。

    可以进一步扩展功能,比如引入数据库存储用户信息或添加刷新 Token 的机制。

    作者:T0uken

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】FastAPI:Token认证

    发表回复