"""请求统计模块 - 按小时/天统计请求数据""" import time import asyncio import orjson from datetime import datetime from typing import Dict, Any from pathlib import Path from collections import defaultdict from app.core.logger import logger class RequestStats: """请求统计管理器(单例)""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if hasattr(self, '_initialized'): return self.file_path = Path(__file__).parents[2] / "data" / "stats.json" # 统计数据 self._hourly: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) self._daily: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) self._models: Dict[str, int] = defaultdict(int) # 保留策略 self._hourly_keep = 48 # 保留48小时 self._daily_keep = 30 # 保留30天 self._lock = asyncio.Lock() self._loaded = False self._initialized = True async def init(self): """初始化加载数据""" if not self._loaded: await self._load_data() async def _load_data(self): """从磁盘加载统计数据""" if self._loaded: return if not self.file_path.exists(): self._loaded = True return try: async with self._lock: content = await asyncio.to_thread(self.file_path.read_bytes) if content: data = orjson.loads(content) # 恢复 defaultdict 结构 self._hourly = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) self._hourly.update(data.get("hourly", {})) self._daily = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) self._daily.update(data.get("daily", {})) self._models = defaultdict(int) self._models.update(data.get("models", {})) self._loaded = True logger.debug(f"[Stats] 加载统计数据成功") except Exception as e: logger.error(f"[Stats] 加载数据失败: {e}") self._loaded = True # 防止覆盖 async def _save_data(self): """保存统计数据到磁盘""" if not self._loaded: return try: # 确保目录存在 self.file_path.parent.mkdir(parents=True, exist_ok=True) async with self._lock: data = { "hourly": dict(self._hourly), "daily": dict(self._daily), "models": dict(self._models) } content = orjson.dumps(data) await asyncio.to_thread(self.file_path.write_bytes, content) except Exception as e: logger.error(f"[Stats] 保存数据失败: {e}") async def record_request(self, model: str, success: bool) -> None: """记录一次请求""" if not self._loaded: await self.init() now = datetime.now() hour_key = now.strftime("%Y-%m-%dT%H") day_key = now.strftime("%Y-%m-%d") # 小时统计 self._hourly[hour_key]["total"] += 1 if success: self._hourly[hour_key]["success"] += 1 else: self._hourly[hour_key]["failed"] += 1 # 天统计 self._daily[day_key]["total"] += 1 if success: self._daily[day_key]["success"] += 1 else: self._daily[day_key]["failed"] += 1 # 模型统计 self._models[model] += 1 # 定期清理旧数据 self._cleanup() # 异步保存 asyncio.create_task(self._save_data()) def _cleanup(self) -> None: """清理过期数据""" now = datetime.now() # 清理小时数据 hour_keys = list(self._hourly.keys()) if len(hour_keys) > self._hourly_keep: for key in sorted(hour_keys)[:-self._hourly_keep]: del self._hourly[key] # 清理天数据 day_keys = list(self._daily.keys()) if len(day_keys) > self._daily_keep: for key in sorted(day_keys)[:-self._daily_keep]: del self._daily[key] def get_stats(self, hours: int = 24, days: int = 7) -> Dict[str, Any]: """获取统计数据""" now = datetime.now() # 获取最近N小时数据 hourly_data = [] for i in range(hours - 1, -1, -1): from datetime import timedelta dt = now - timedelta(hours=i) key = dt.strftime("%Y-%m-%dT%H") data = self._hourly.get(key, {"total": 0, "success": 0, "failed": 0}) hourly_data.append({ "hour": dt.strftime("%H:00"), "date": dt.strftime("%m-%d"), **data }) # 获取最近N天数据 daily_data = [] for i in range(days - 1, -1, -1): from datetime import timedelta dt = now - timedelta(days=i) key = dt.strftime("%Y-%m-%d") data = self._daily.get(key, {"total": 0, "success": 0, "failed": 0}) daily_data.append({ "date": dt.strftime("%m-%d"), **data }) # 模型统计(取 Top 10) model_data = sorted(self._models.items(), key=lambda x: x[1], reverse=True)[:10] # 总计 total_requests = sum(d["total"] for d in self._hourly.values()) total_success = sum(d["success"] for d in self._hourly.values()) total_failed = sum(d["failed"] for d in self._hourly.values()) return { "hourly": hourly_data, "daily": daily_data, "models": [{"model": m, "count": c} for m, c in model_data], "summary": { "total": total_requests, "success": total_success, "failed": total_failed, "success_rate": round(total_success / total_requests * 100, 1) if total_requests > 0 else 0 } } async def reset(self) -> None: """重置所有统计""" self._hourly.clear() self._daily.clear() self._models.clear() await self._save_data() # 全局实例 request_stats = RequestStats()