File size: 1,940 Bytes
1a9e2c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""认证模块 - API令牌验证"""

from typing import Optional, Dict
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from app.core.config import setting
from app.core.logger import logger
from app.services.api_keys import api_key_manager


# Bearer安全方案
security = HTTPBearer(auto_error=False)


def _build_error(message: str, code: str = "invalid_token") -> dict:
    """构建认证错误"""
    return {
        "error": {
            "message": message,
            "type": "authentication_error",
            "code": code
        }
    }


class AuthManager:
    """认证管理器 - 验证API令牌"""

    @staticmethod
    async def verify(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict:
        """验证令牌,返回 Key 信息"""
        api_key = setting.grok_config.get("api_key")
        
        # 初始化检查
        if not hasattr(api_key_manager, '_keys'):
           await api_key_manager.init()

        # 检查令牌
        if not credentials:
            # 如果未设置全局Key且没有多Key,则跳过(开发模式)
            if not api_key and not api_key_manager.get_all_keys():
                logger.debug("[Auth] 未设置API_KEY,跳过验证")
                return {"key": None, "name": "Anonymous"}
                
            raise HTTPException(
                status_code=401,
                detail=_build_error("缺少认证令牌", "missing_token")
            )

        token = credentials.credentials
        
        # 验证令牌 (支持多 Key)
        key_info = api_key_manager.validate_key(token)
        
        if key_info:
            return key_info

        raise HTTPException(
            status_code=401,
            detail=_build_error(f"令牌无效,长度: {len(token)}", "invalid_token")
        )


# 全局实例
auth_manager = AuthManager()