test-w / utils.py
letterm's picture
Upload 13 files
47d8ca8 verified
raw
history blame
4.51 kB
"""
工具函数模块
包含通用的工具函数和辅助方法
"""
import uuid
import time
import logging
from typing import Optional
from loguru import logger
from config import Config
class Utils:
"""工具函数集合"""
@staticmethod
def generate_request_id() -> str:
"""生成请求ID"""
return str(uuid.uuid4())
@staticmethod
def generate_token_id(refresh_token: str) -> str:
"""生成token的唯一标识符(用于日志显示)"""
# 使用token的前8位和后8位作为标识符
if len(refresh_token) >= 16:
return f"TOKEN_{refresh_token[:8]}...{refresh_token[-8:]}"
else:
return f"TOKEN_{refresh_token[:8]}..."
@staticmethod
def get_current_timestamp() -> int:
"""获取当前时间戳"""
return int(time.time())
@staticmethod
def validate_api_key(api_key: str) -> bool:
"""验证API密钥"""
config = Config()
return api_key in config.VALID_API_KEYS
@staticmethod
def extract_bearer_token(auth_header: str) -> Optional[str]:
"""从Authorization头中提取Bearer token"""
if not auth_header:
return None
parts = auth_header.split(" ")
if len(parts) != 2 or parts[0].lower() != "bearer":
return None
return parts[1]
@staticmethod
def setup_logging():
"""设置loguru日志配置"""
# 移除默认的日志处理器
logger.remove()
# 添加控制台日志输出,带颜色和格式
logger.add(
sink=lambda x: print(x, end=""),
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
colorize=True
)
# 根据环境变量决定是否启用文件日志
if Config.enable_file_logging():
logger.add(
"warp_api_server.log",
rotation="10 MB",
retention="7 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
encoding="utf-8"
)
logger.info("📝 文件日志已启用: warp_api_server.log")
else:
logger.info("📝 文件日志已禁用 (可通过环境变量 ENABLE_FILE_LOGGING=true 启用)")
# 配置标准logging的loguru处理器
class InterceptHandler(logging.Handler):
def emit(self, record):
# 获取对应的loguru级别
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# 查找调用者位置
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
# 替换所有标准logging处理器
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logger.info("🚀 Loguru日志系统初始化完成")
@staticmethod
def deduplicate_tokens(tokens: list) -> list:
"""去重token列表,保持顺序"""
seen = set()
result = []
for token in tokens:
if token and token not in seen:
seen.add(token)
result.append(token)
return result
@staticmethod
def validate_refresh_token_format(token: str) -> bool:
"""验证refresh token格式是否有效"""
if not token or not isinstance(token, str):
return False
# 基本长度检查
if len(token) < 100:
return False
# 检查是否包含必要的字符
if not any(c.isalnum() for c in token):
return False
return True