| """
|
| 工具函数模块
|
| 包含通用的工具函数和辅助方法
|
| """
|
| import uuid
|
| import time
|
| import logging
|
| from typing import Optional
|
| from loguru import logger
|
| from config import Config
|
|
|
|
|
| class Utils:
|
| """工具函数集合"""
|
|
|
| @staticmethod
|
| def generate_request_id() -> str:
|
| """生成请求ID"""
|
| return str(uuid.uuid4())
|
|
|
| @staticmethod
|
| def generate_token_id(refresh_token: str) -> str:
|
| """生成token的唯一标识符(用于日志显示)"""
|
|
|
| if len(refresh_token) >= 16:
|
| return f"TOKEN_{refresh_token[:8]}...{refresh_token[-8:]}"
|
| else:
|
| return f"TOKEN_{refresh_token[:8]}..."
|
|
|
| @staticmethod
|
| def get_current_timestamp() -> int:
|
| """获取当前时间戳"""
|
| return int(time.time())
|
|
|
| @staticmethod
|
| def validate_api_key(api_key: str) -> bool:
|
| """验证API密钥"""
|
| config = Config()
|
| return api_key in config.VALID_API_KEYS
|
|
|
| @staticmethod
|
| def extract_bearer_token(auth_header: str) -> Optional[str]:
|
| """从Authorization头中提取Bearer token"""
|
| if not auth_header:
|
| return None
|
|
|
| parts = auth_header.split(" ")
|
| if len(parts) != 2 or parts[0].lower() != "bearer":
|
| return None
|
|
|
| return parts[1]
|
|
|
| @staticmethod
|
| def setup_logging():
|
| """设置loguru日志配置"""
|
|
|
| logger.remove()
|
|
|
|
|
| logger.add(
|
| sink=lambda x: print(x, end=""),
|
| format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
|
| level="INFO",
|
| colorize=True
|
| )
|
|
|
|
|
| if Config.enable_file_logging():
|
| logger.add(
|
| "warp_api_server.log",
|
| rotation="10 MB",
|
| retention="7 days",
|
| level="DEBUG",
|
| format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
|
| encoding="utf-8"
|
| )
|
| logger.info("📝 文件日志已启用: warp_api_server.log")
|
| else:
|
| logger.info("📝 文件日志已禁用 (可通过环境变量 ENABLE_FILE_LOGGING=true 启用)")
|
|
|
|
|
| class InterceptHandler(logging.Handler):
|
| def emit(self, record):
|
|
|
| try:
|
| level = logger.level(record.levelname).name
|
| except ValueError:
|
| level = record.levelno
|
|
|
|
|
| frame, depth = logging.currentframe(), 2
|
| while frame.f_code.co_filename == logging.__file__:
|
| frame = frame.f_back
|
| depth += 1
|
|
|
| logger.opt(depth=depth, exception=record.exc_info).log(
|
| level, record.getMessage()
|
| )
|
|
|
|
|
| logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
|
| logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
|
| logger.info("🚀 Loguru日志系统初始化完成")
|
|
|
| @staticmethod
|
| def deduplicate_tokens(tokens: list) -> list:
|
| """去重token列表,保持顺序"""
|
| seen = set()
|
| result = []
|
| for token in tokens:
|
| if token and token not in seen:
|
| seen.add(token)
|
| result.append(token)
|
| return result
|
|
|
| @staticmethod
|
| def validate_refresh_token_format(token: str) -> bool:
|
| """验证refresh token格式是否有效"""
|
| if not token or not isinstance(token, str):
|
| return False
|
|
|
|
|
| if len(token) < 100:
|
| return False
|
|
|
|
|
| if not any(c.isalnum() for c in token):
|
| return False
|
|
|
| return True |