File size: 4,507 Bytes
47d8ca8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""

工具函数模块

包含通用的工具函数和辅助方法

"""
import uuid
import time
import logging
from typing import Optional
from loguru import logger
from config import Config


class Utils:
    """工具函数集合"""
    
    @staticmethod
    def generate_request_id() -> str:
        """生成请求ID"""
        return str(uuid.uuid4())
    
    @staticmethod
    def generate_token_id(refresh_token: str) -> str:
        """生成token的唯一标识符(用于日志显示)"""
        # 使用token的前8位和后8位作为标识符
        if len(refresh_token) >= 16:
            return f"TOKEN_{refresh_token[:8]}...{refresh_token[-8:]}"
        else:
            return f"TOKEN_{refresh_token[:8]}..."
    
    @staticmethod
    def get_current_timestamp() -> int:
        """获取当前时间戳"""
        return int(time.time())
    
    @staticmethod
    def validate_api_key(api_key: str) -> bool:
        """验证API密钥"""
        config = Config()
        return api_key in config.VALID_API_KEYS
    
    @staticmethod
    def extract_bearer_token(auth_header: str) -> Optional[str]:
        """从Authorization头中提取Bearer token"""
        if not auth_header:
            return None
        
        parts = auth_header.split(" ")
        if len(parts) != 2 or parts[0].lower() != "bearer":
            return None
            
        return parts[1]
    
    @staticmethod
    def setup_logging():
        """设置loguru日志配置"""
        # 移除默认的日志处理器
        logger.remove()
        
        # 添加控制台日志输出,带颜色和格式
        logger.add(
            sink=lambda x: print(x, end=""),
            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
            level="INFO",
            colorize=True
        )
        
        # 根据环境变量决定是否启用文件日志
        if Config.enable_file_logging():
            logger.add(
                "warp_api_server.log",
                rotation="10 MB",
                retention="7 days",
                level="DEBUG",
                format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
                encoding="utf-8"
            )
            logger.info("📝 文件日志已启用: warp_api_server.log")
        else:
            logger.info("📝 文件日志已禁用 (可通过环境变量 ENABLE_FILE_LOGGING=true 启用)")
        
        # 配置标准logging的loguru处理器
        class InterceptHandler(logging.Handler):
            def emit(self, record):
                # 获取对应的loguru级别
                try:
                    level = logger.level(record.levelname).name
                except ValueError:
                    level = record.levelno
                
                # 查找调用者位置
                frame, depth = logging.currentframe(), 2
                while frame.f_code.co_filename == logging.__file__:
                    frame = frame.f_back
                    depth += 1
                
                logger.opt(depth=depth, exception=record.exc_info).log(
                    level, record.getMessage()
                )
        
        # 替换所有标准logging处理器
        logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
        logging.getLogger("urllib3").setLevel(logging.WARNING)
        
        logger.info("🚀 Loguru日志系统初始化完成")
    
    @staticmethod
    def deduplicate_tokens(tokens: list) -> list:
        """去重token列表,保持顺序"""
        seen = set()
        result = []
        for token in tokens:
            if token and token not in seen:
                seen.add(token)
                result.append(token)
        return result
    
    @staticmethod
    def validate_refresh_token_format(token: str) -> bool:
        """验证refresh token格式是否有效"""
        if not token or not isinstance(token, str):
            return False
        
        # 基本长度检查
        if len(token) < 100:
            return False
            
        # 检查是否包含必要的字符
        if not any(c.isalnum() for c in token):
            return False
            
        return True