|
|
"""
|
|
|
工具函数模块
|
|
|
包含通用的工具函数和辅助方法
|
|
|
"""
|
|
|
import uuid
|
|
|
import time
|
|
|
import logging
|
|
|
from typing import Optional
|
|
|
from loguru import logger
|
|
|
from config import Config
|
|
|
|
|
|
|
|
|
class Utils:
|
|
|
"""工具函数集合"""
|
|
|
|
|
|
@staticmethod
|
|
|
def generate_request_id() -> str:
|
|
|
"""生成请求ID"""
|
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
@staticmethod
|
|
|
def generate_token_id(refresh_token: str) -> str:
|
|
|
"""生成token的唯一标识符(用于日志显示)"""
|
|
|
|
|
|
if len(refresh_token) >= 16:
|
|
|
return f"TOKEN_{refresh_token[:8]}...{refresh_token[-8:]}"
|
|
|
else:
|
|
|
return f"TOKEN_{refresh_token[:8]}..."
|
|
|
|
|
|
@staticmethod
|
|
|
def get_current_timestamp() -> int:
|
|
|
"""获取当前时间戳"""
|
|
|
return int(time.time())
|
|
|
|
|
|
@staticmethod
|
|
|
def validate_api_key(api_key: str) -> bool:
|
|
|
"""验证API密钥"""
|
|
|
config = Config()
|
|
|
return api_key in config.VALID_API_KEYS
|
|
|
|
|
|
@staticmethod
|
|
|
def extract_bearer_token(auth_header: str) -> Optional[str]:
|
|
|
"""从Authorization头中提取Bearer token"""
|
|
|
if not auth_header:
|
|
|
return None
|
|
|
|
|
|
parts = auth_header.split(" ")
|
|
|
if len(parts) != 2 or parts[0].lower() != "bearer":
|
|
|
return None
|
|
|
|
|
|
return parts[1]
|
|
|
|
|
|
@staticmethod
|
|
|
def setup_logging():
|
|
|
"""设置loguru日志配置"""
|
|
|
|
|
|
logger.remove()
|
|
|
|
|
|
|
|
|
logger.add(
|
|
|
sink=lambda x: print(x, end=""),
|
|
|
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
|
|
|
level="INFO",
|
|
|
colorize=True
|
|
|
)
|
|
|
|
|
|
|
|
|
if Config.enable_file_logging():
|
|
|
logger.add(
|
|
|
"warp_api_server.log",
|
|
|
rotation="10 MB",
|
|
|
retention="7 days",
|
|
|
level="DEBUG",
|
|
|
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
|
|
|
encoding="utf-8"
|
|
|
)
|
|
|
logger.info("📝 文件日志已启用: warp_api_server.log")
|
|
|
else:
|
|
|
logger.info("📝 文件日志已禁用 (可通过环境变量 ENABLE_FILE_LOGGING=true 启用)")
|
|
|
|
|
|
|
|
|
class InterceptHandler(logging.Handler):
|
|
|
def emit(self, record):
|
|
|
|
|
|
try:
|
|
|
level = logger.level(record.levelname).name
|
|
|
except ValueError:
|
|
|
level = record.levelno
|
|
|
|
|
|
|
|
|
frame, depth = logging.currentframe(), 2
|
|
|
while frame.f_code.co_filename == logging.__file__:
|
|
|
frame = frame.f_back
|
|
|
depth += 1
|
|
|
|
|
|
logger.opt(depth=depth, exception=record.exc_info).log(
|
|
|
level, record.getMessage()
|
|
|
)
|
|
|
|
|
|
|
|
|
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
|
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
|
|
|
|
logger.info("🚀 Loguru日志系统初始化完成")
|
|
|
|
|
|
@staticmethod
|
|
|
def deduplicate_tokens(tokens: list) -> list:
|
|
|
"""去重token列表,保持顺序"""
|
|
|
seen = set()
|
|
|
result = []
|
|
|
for token in tokens:
|
|
|
if token and token not in seen:
|
|
|
seen.add(token)
|
|
|
result.append(token)
|
|
|
return result
|
|
|
|
|
|
@staticmethod
|
|
|
def validate_refresh_token_format(token: str) -> bool:
|
|
|
"""验证refresh token格式是否有效"""
|
|
|
if not token or not isinstance(token, str):
|
|
|
return False
|
|
|
|
|
|
|
|
|
if len(token) < 100:
|
|
|
return False
|
|
|
|
|
|
|
|
|
if not any(c.isalnum() for c in token):
|
|
|
return False
|
|
|
|
|
|
return True |