File size: 6,855 Bytes
1a9e2c2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | """请求统计模块 - 按小时/天统计请求数据"""
import time
import asyncio
import orjson
from datetime import datetime
from typing import Dict, Any
from pathlib import Path
from collections import defaultdict
from app.core.logger import logger
class RequestStats:
"""请求统计管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self.file_path = Path(__file__).parents[2] / "data" / "stats.json"
# 统计数据
self._hourly: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0})
self._daily: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0})
self._models: Dict[str, int] = defaultdict(int)
# 保留策略
self._hourly_keep = 48 # 保留48小时
self._daily_keep = 30 # 保留30天
self._lock = asyncio.Lock()
self._loaded = False
self._initialized = True
async def init(self):
"""初始化加载数据"""
if not self._loaded:
await self._load_data()
async def _load_data(self):
"""从磁盘加载统计数据"""
if self._loaded:
return
if not self.file_path.exists():
self._loaded = True
return
try:
async with self._lock:
content = await asyncio.to_thread(self.file_path.read_bytes)
if content:
data = orjson.loads(content)
# 恢复 defaultdict 结构
self._hourly = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0})
self._hourly.update(data.get("hourly", {}))
self._daily = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0})
self._daily.update(data.get("daily", {}))
self._models = defaultdict(int)
self._models.update(data.get("models", {}))
self._loaded = True
logger.debug(f"[Stats] 加载统计数据成功")
except Exception as e:
logger.error(f"[Stats] 加载数据失败: {e}")
self._loaded = True # 防止覆盖
async def _save_data(self):
"""保存统计数据到磁盘"""
if not self._loaded:
return
try:
# 确保目录存在
self.file_path.parent.mkdir(parents=True, exist_ok=True)
async with self._lock:
data = {
"hourly": dict(self._hourly),
"daily": dict(self._daily),
"models": dict(self._models)
}
content = orjson.dumps(data)
await asyncio.to_thread(self.file_path.write_bytes, content)
except Exception as e:
logger.error(f"[Stats] 保存数据失败: {e}")
async def record_request(self, model: str, success: bool) -> None:
"""记录一次请求"""
if not self._loaded:
await self.init()
now = datetime.now()
hour_key = now.strftime("%Y-%m-%dT%H")
day_key = now.strftime("%Y-%m-%d")
# 小时统计
self._hourly[hour_key]["total"] += 1
if success:
self._hourly[hour_key]["success"] += 1
else:
self._hourly[hour_key]["failed"] += 1
# 天统计
self._daily[day_key]["total"] += 1
if success:
self._daily[day_key]["success"] += 1
else:
self._daily[day_key]["failed"] += 1
# 模型统计
self._models[model] += 1
# 定期清理旧数据
self._cleanup()
# 异步保存
asyncio.create_task(self._save_data())
def _cleanup(self) -> None:
"""清理过期数据"""
now = datetime.now()
# 清理小时数据
hour_keys = list(self._hourly.keys())
if len(hour_keys) > self._hourly_keep:
for key in sorted(hour_keys)[:-self._hourly_keep]:
del self._hourly[key]
# 清理天数据
day_keys = list(self._daily.keys())
if len(day_keys) > self._daily_keep:
for key in sorted(day_keys)[:-self._daily_keep]:
del self._daily[key]
def get_stats(self, hours: int = 24, days: int = 7) -> Dict[str, Any]:
"""获取统计数据"""
now = datetime.now()
# 获取最近N小时数据
hourly_data = []
for i in range(hours - 1, -1, -1):
from datetime import timedelta
dt = now - timedelta(hours=i)
key = dt.strftime("%Y-%m-%dT%H")
data = self._hourly.get(key, {"total": 0, "success": 0, "failed": 0})
hourly_data.append({
"hour": dt.strftime("%H:00"),
"date": dt.strftime("%m-%d"),
**data
})
# 获取最近N天数据
daily_data = []
for i in range(days - 1, -1, -1):
from datetime import timedelta
dt = now - timedelta(days=i)
key = dt.strftime("%Y-%m-%d")
data = self._daily.get(key, {"total": 0, "success": 0, "failed": 0})
daily_data.append({
"date": dt.strftime("%m-%d"),
**data
})
# 模型统计(取 Top 10)
model_data = sorted(self._models.items(), key=lambda x: x[1], reverse=True)[:10]
# 总计
total_requests = sum(d["total"] for d in self._hourly.values())
total_success = sum(d["success"] for d in self._hourly.values())
total_failed = sum(d["failed"] for d in self._hourly.values())
return {
"hourly": hourly_data,
"daily": daily_data,
"models": [{"model": m, "count": c} for m, c in model_data],
"summary": {
"total": total_requests,
"success": total_success,
"failed": total_failed,
"success_rate": round(total_success / total_requests * 100, 1) if total_requests > 0 else 0
}
}
async def reset(self) -> None:
"""重置所有统计"""
self._hourly.clear()
self._daily.clear()
self._models.clear()
await self._save_data()
# 全局实例
request_stats = RequestStats()
|