Spaces:
Paused
Paused
| """请求统计模块 - 按小时/天统计请求数据""" | |
| import time | |
| import asyncio | |
| import orjson | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| from pathlib import Path | |
| from collections import defaultdict | |
| from app.core.logger import logger | |
| class RequestStats: | |
| """请求统计管理器(单例)""" | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if hasattr(self, '_initialized'): | |
| return | |
| self.file_path = Path(__file__).parents[2] / "data" / "stats.json" | |
| # 统计数据 | |
| self._hourly: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) | |
| self._daily: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) | |
| self._models: Dict[str, int] = defaultdict(int) | |
| # 保留策略 | |
| self._hourly_keep = 48 # 保留48小时 | |
| self._daily_keep = 30 # 保留30天 | |
| self._lock = asyncio.Lock() | |
| self._loaded = False | |
| self._initialized = True | |
| async def init(self): | |
| """初始化加载数据""" | |
| if not self._loaded: | |
| await self._load_data() | |
| async def _load_data(self): | |
| """从磁盘加载统计数据""" | |
| if self._loaded: | |
| return | |
| if not self.file_path.exists(): | |
| self._loaded = True | |
| return | |
| try: | |
| async with self._lock: | |
| content = await asyncio.to_thread(self.file_path.read_bytes) | |
| if content: | |
| data = orjson.loads(content) | |
| # 恢复 defaultdict 结构 | |
| self._hourly = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) | |
| self._hourly.update(data.get("hourly", {})) | |
| self._daily = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) | |
| self._daily.update(data.get("daily", {})) | |
| self._models = defaultdict(int) | |
| self._models.update(data.get("models", {})) | |
| self._loaded = True | |
| logger.debug(f"[Stats] 加载统计数据成功") | |
| except Exception as e: | |
| logger.error(f"[Stats] 加载数据失败: {e}") | |
| self._loaded = True # 防止覆盖 | |
| async def _save_data(self): | |
| """保存统计数据到磁盘""" | |
| if not self._loaded: | |
| return | |
| try: | |
| # 确保目录存在 | |
| self.file_path.parent.mkdir(parents=True, exist_ok=True) | |
| async with self._lock: | |
| data = { | |
| "hourly": dict(self._hourly), | |
| "daily": dict(self._daily), | |
| "models": dict(self._models) | |
| } | |
| content = orjson.dumps(data) | |
| await asyncio.to_thread(self.file_path.write_bytes, content) | |
| except Exception as e: | |
| logger.error(f"[Stats] 保存数据失败: {e}") | |
| async def record_request(self, model: str, success: bool) -> None: | |
| """记录一次请求""" | |
| if not self._loaded: | |
| await self.init() | |
| now = datetime.now() | |
| hour_key = now.strftime("%Y-%m-%dT%H") | |
| day_key = now.strftime("%Y-%m-%d") | |
| # 小时统计 | |
| self._hourly[hour_key]["total"] += 1 | |
| if success: | |
| self._hourly[hour_key]["success"] += 1 | |
| else: | |
| self._hourly[hour_key]["failed"] += 1 | |
| # 天统计 | |
| self._daily[day_key]["total"] += 1 | |
| if success: | |
| self._daily[day_key]["success"] += 1 | |
| else: | |
| self._daily[day_key]["failed"] += 1 | |
| # 模型统计 | |
| self._models[model] += 1 | |
| # 定期清理旧数据 | |
| self._cleanup() | |
| # 异步保存 | |
| asyncio.create_task(self._save_data()) | |
| def _cleanup(self) -> None: | |
| """清理过期数据""" | |
| now = datetime.now() | |
| # 清理小时数据 | |
| hour_keys = list(self._hourly.keys()) | |
| if len(hour_keys) > self._hourly_keep: | |
| for key in sorted(hour_keys)[:-self._hourly_keep]: | |
| del self._hourly[key] | |
| # 清理天数据 | |
| day_keys = list(self._daily.keys()) | |
| if len(day_keys) > self._daily_keep: | |
| for key in sorted(day_keys)[:-self._daily_keep]: | |
| del self._daily[key] | |
| def get_stats(self, hours: int = 24, days: int = 7) -> Dict[str, Any]: | |
| """获取统计数据""" | |
| now = datetime.now() | |
| # 获取最近N小时数据 | |
| hourly_data = [] | |
| for i in range(hours - 1, -1, -1): | |
| from datetime import timedelta | |
| dt = now - timedelta(hours=i) | |
| key = dt.strftime("%Y-%m-%dT%H") | |
| data = self._hourly.get(key, {"total": 0, "success": 0, "failed": 0}) | |
| hourly_data.append({ | |
| "hour": dt.strftime("%H:00"), | |
| "date": dt.strftime("%m-%d"), | |
| **data | |
| }) | |
| # 获取最近N天数据 | |
| daily_data = [] | |
| for i in range(days - 1, -1, -1): | |
| from datetime import timedelta | |
| dt = now - timedelta(days=i) | |
| key = dt.strftime("%Y-%m-%d") | |
| data = self._daily.get(key, {"total": 0, "success": 0, "failed": 0}) | |
| daily_data.append({ | |
| "date": dt.strftime("%m-%d"), | |
| **data | |
| }) | |
| # 模型统计(取 Top 10) | |
| model_data = sorted(self._models.items(), key=lambda x: x[1], reverse=True)[:10] | |
| # 总计 | |
| total_requests = sum(d["total"] for d in self._hourly.values()) | |
| total_success = sum(d["success"] for d in self._hourly.values()) | |
| total_failed = sum(d["failed"] for d in self._hourly.values()) | |
| return { | |
| "hourly": hourly_data, | |
| "daily": daily_data, | |
| "models": [{"model": m, "count": c} for m, c in model_data], | |
| "summary": { | |
| "total": total_requests, | |
| "success": total_success, | |
| "failed": total_failed, | |
| "success_rate": round(total_success / total_requests * 100, 1) if total_requests > 0 else 0 | |
| } | |
| } | |
| async def reset(self) -> None: | |
| """重置所有统计""" | |
| self._hourly.clear() | |
| self._daily.clear() | |
| self._models.clear() | |
| await self._save_data() | |
| # 全局实例 | |
| request_stats = RequestStats() | |