Spaces:
Paused
Paused
File size: 4,168 Bytes
1a9e2c2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | """请求日志审计 - 记录近期请求"""
import time
import asyncio
import orjson
from typing import List, Dict, Deque
from collections import deque
from dataclasses import dataclass, asdict
from pathlib import Path
from app.core.logger import logger
@dataclass
class RequestLog:
id: str
time: str
timestamp: float
ip: str
model: str
duration: float
status: int
key_name: str
token_suffix: str
error: str = ""
class RequestLogger:
"""请求日志记录器"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, max_len: int = 1000):
if hasattr(self, '_initialized'):
return
self.file_path = Path(__file__).parents[2] / "data" / "logs.json"
self._logs: Deque[Dict] = deque(maxlen=max_len)
self._lock = asyncio.Lock()
self._loaded = False
self._initialized = True
async def init(self):
"""初始化加载数据"""
if not self._loaded:
await self._load_data()
async def _load_data(self):
"""从磁盘加载日志数据"""
if self._loaded:
return
if not self.file_path.exists():
self._loaded = True
return
try:
async with self._lock:
content = await asyncio.to_thread(self.file_path.read_bytes)
if content:
data = orjson.loads(content)
if isinstance(data, list):
self._logs.clear()
self._logs.extend(data)
self._loaded = True
logger.debug(f"[Logger] 加载日志成功: {len(self._logs)} 条")
except Exception as e:
logger.error(f"[Logger] 加载日志失败: {e}")
self._loaded = True
async def _save_data(self):
"""保存日志数据到磁盘"""
if not self._loaded:
return
try:
# 确保目录存在
self.file_path.parent.mkdir(parents=True, exist_ok=True)
async with self._lock:
# 转换为列表保存
content = orjson.dumps(list(self._logs))
await asyncio.to_thread(self.file_path.write_bytes, content)
except Exception as e:
logger.error(f"[Logger] 保存日志失败: {e}")
async def add_log(self,
ip: str,
model: str,
duration: float,
status: int,
key_name: str,
token_suffix: str = "",
error: str = ""):
"""添加日志"""
if not self._loaded:
await self.init()
try:
now = time.time()
# 格式化时间
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
log = {
"id": str(int(now * 1000)),
"time": time_str,
"timestamp": now,
"ip": ip,
"model": model,
"duration": round(duration, 2),
"status": status,
"key_name": key_name,
"token_suffix": token_suffix,
"error": error
}
async with self._lock:
self._logs.appendleft(log) # 最新的在前
# 异步保存
asyncio.create_task(self._save_data())
except Exception as e:
logger.error(f"[Logger] 记录日志失败: {e}")
async def get_logs(self, limit: int = 1000) -> List[Dict]:
"""获取日志"""
async with self._lock:
return list(self._logs)[:limit]
async def clear_logs(self):
"""清空日志"""
async with self._lock:
self._logs.clear()
await self._save_data()
# 全局实例
request_logger = RequestLogger()
|