File size: 4,168 Bytes
1a9e2c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""请求日志审计 - 记录近期请求"""

import time
import asyncio
import orjson
from typing import List, Dict, Deque
from collections import deque
from dataclasses import dataclass, asdict
from pathlib import Path

from app.core.logger import logger

@dataclass
class RequestLog:
    id: str
    time: str
    timestamp: float
    ip: str
    model: str
    duration: float
    status: int
    key_name: str
    token_suffix: str
    error: str = ""

class RequestLogger:
    """请求日志记录器"""
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, max_len: int = 1000):
        if hasattr(self, '_initialized'):
            return
            
        self.file_path = Path(__file__).parents[2] / "data" / "logs.json"
        self._logs: Deque[Dict] = deque(maxlen=max_len)
        self._lock = asyncio.Lock()
        self._loaded = False
        
        self._initialized = True

    async def init(self):
        """初始化加载数据"""
        if not self._loaded:
            await self._load_data()

    async def _load_data(self):
        """从磁盘加载日志数据"""
        if self._loaded:
            return

        if not self.file_path.exists():
            self._loaded = True
            return

        try:
            async with self._lock:
                content = await asyncio.to_thread(self.file_path.read_bytes)
                if content:
                    data = orjson.loads(content)
                    if isinstance(data, list):
                        self._logs.clear()
                        self._logs.extend(data)
                    self._loaded = True
                    logger.debug(f"[Logger] 加载日志成功: {len(self._logs)} 条")
        except Exception as e:
            logger.error(f"[Logger] 加载日志失败: {e}")
            self._loaded = True

    async def _save_data(self):
        """保存日志数据到磁盘"""
        if not self._loaded:
            return

        try:
            # 确保目录存在
            self.file_path.parent.mkdir(parents=True, exist_ok=True)
            
            async with self._lock:
                # 转换为列表保存
                content = orjson.dumps(list(self._logs))
                await asyncio.to_thread(self.file_path.write_bytes, content)
        except Exception as e:
            logger.error(f"[Logger] 保存日志失败: {e}")

    async def add_log(self, 
                     ip: str, 
                     model: str, 
                     duration: float, 
                     status: int, 
                     key_name: str, 
                     token_suffix: str = "",
                     error: str = ""):
        """添加日志"""
        if not self._loaded:
            await self.init()
            
        try:
            now = time.time()
            # 格式化时间
            time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
            
            log = {
                "id": str(int(now * 1000)),
                "time": time_str,
                "timestamp": now,
                "ip": ip,
                "model": model,
                "duration": round(duration, 2),
                "status": status,
                "key_name": key_name,
                "token_suffix": token_suffix,
                "error": error
            }
            
            async with self._lock:
                self._logs.appendleft(log) # 最新的在前
                
            # 异步保存
            asyncio.create_task(self._save_data())
                
        except Exception as e:
            logger.error(f"[Logger] 记录日志失败: {e}")

    async def get_logs(self, limit: int = 1000) -> List[Dict]:
        """获取日志"""
        async with self._lock:
            return list(self._logs)[:limit]
    
    async def clear_logs(self):
        """清空日志"""
        async with self._lock:
            self._logs.clear()
        await self._save_data()


# 全局实例
request_logger = RequestLogger()