Spaces:
Paused
Paused
| from typing import Optional | |
| from fastapi import Header, HTTPException | |
| from app.config.config import settings | |
| from app.log.logger import get_security_logger | |
| logger = get_security_logger() | |
| def verify_auth_token(token: str) -> bool: | |
| return token == settings.AUTH_TOKEN | |
| class SecurityService: | |
| async def verify_key(self, key: str): | |
| if key not in settings.allowed_tokens_list and key != settings.AUTH_TOKEN: | |
| logger.error("Invalid key") | |
| raise HTTPException(status_code=401, detail="Invalid key") | |
| return key | |
| async def verify_authorization( | |
| self, authorization: Optional[str] = Header(None) | |
| ) -> str: | |
| if not authorization: | |
| logger.error("Missing Authorization header") | |
| raise HTTPException(status_code=401, detail="Missing Authorization header") | |
| if not authorization.startswith("Bearer "): | |
| logger.error("Invalid Authorization header format") | |
| raise HTTPException( | |
| status_code=401, detail="Invalid Authorization header format" | |
| ) | |
| token = authorization.replace("Bearer ", "") | |
| if token not in settings.allowed_tokens_list and token != settings.AUTH_TOKEN: | |
| logger.error("Invalid token") | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| return token | |
| async def verify_goog_api_key( | |
| self, x_goog_api_key: Optional[str] = Header(None) | |
| ) -> str: | |
| """验证Google API Key""" | |
| if not x_goog_api_key: | |
| logger.error("Missing x-goog-api-key header") | |
| raise HTTPException(status_code=401, detail="Missing x-goog-api-key header") | |
| if ( | |
| x_goog_api_key not in settings.allowed_tokens_list | |
| and x_goog_api_key != settings.AUTH_TOKEN | |
| ): | |
| logger.error("Invalid x-goog-api-key") | |
| raise HTTPException(status_code=401, detail="Invalid x-goog-api-key") | |
| return x_goog_api_key | |
| async def verify_auth_token( | |
| self, authorization: Optional[str] = Header(None) | |
| ) -> str: | |
| if not authorization: | |
| logger.error("Missing auth_token header") | |
| raise HTTPException(status_code=401, detail="Missing auth_token header") | |
| token = authorization.replace("Bearer ", "") | |
| if token != settings.AUTH_TOKEN: | |
| logger.error("Invalid auth_token") | |
| raise HTTPException(status_code=401, detail="Invalid auth_token") | |
| return token | |
| async def verify_key_or_goog_api_key( | |
| self, key: Optional[str] = None , x_goog_api_key: Optional[str] = Header(None) | |
| ) -> str: | |
| """验证URL中的key或请求头中的x-goog-api-key""" | |
| # 如果URL中的key有效,直接返回 | |
| if key in settings.allowed_tokens_list or key == settings.AUTH_TOKEN: | |
| return key | |
| # 否则检查请求头中的x-goog-api-key | |
| if not x_goog_api_key: | |
| logger.error("Invalid key and missing x-goog-api-key header") | |
| raise HTTPException(status_code=401, detail="Invalid key and missing x-goog-api-key header") | |
| if x_goog_api_key not in settings.allowed_tokens_list and x_goog_api_key != settings.AUTH_TOKEN: | |
| logger.error("Invalid key and invalid x-goog-api-key") | |
| raise HTTPException(status_code=401, detail="Invalid key and invalid x-goog-api-key") | |
| return x_goog_api_key |