| """请求统计模块 - 按小时/天统计请求数据""" |
|
|
| import time |
| import asyncio |
| import orjson |
| from datetime import datetime |
| from typing import Dict, Any |
| from pathlib import Path |
| from collections import defaultdict |
|
|
| from app.core.logger import logger |
|
|
|
|
| class RequestStats: |
| """请求统计管理器(单例)""" |
| |
| _instance = None |
| |
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| return cls._instance |
| |
| def __init__(self): |
| if hasattr(self, '_initialized'): |
| return |
| |
| self.file_path = Path(__file__).parents[2] / "data" / "stats.json" |
| |
| |
| self._hourly: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) |
| self._daily: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) |
| self._models: Dict[str, int] = defaultdict(int) |
| |
| |
| self._hourly_keep = 48 |
| self._daily_keep = 30 |
| |
| self._lock = asyncio.Lock() |
| self._loaded = False |
| self._initialized = True |
|
|
| async def init(self): |
| """初始化加载数据""" |
| if not self._loaded: |
| await self._load_data() |
|
|
| async def _load_data(self): |
| """从磁盘加载统计数据""" |
| if self._loaded: |
| return |
|
|
| if not self.file_path.exists(): |
| self._loaded = True |
| return |
|
|
| try: |
| async with self._lock: |
| content = await asyncio.to_thread(self.file_path.read_bytes) |
| if content: |
| data = orjson.loads(content) |
| |
| |
| self._hourly = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) |
| self._hourly.update(data.get("hourly", {})) |
| |
| self._daily = defaultdict(lambda: {"total": 0, "success": 0, "failed": 0}) |
| self._daily.update(data.get("daily", {})) |
| |
| self._models = defaultdict(int) |
| self._models.update(data.get("models", {})) |
| |
| self._loaded = True |
| logger.debug(f"[Stats] 加载统计数据成功") |
| except Exception as e: |
| logger.error(f"[Stats] 加载数据失败: {e}") |
| self._loaded = True |
|
|
| async def _save_data(self): |
| """保存统计数据到磁盘""" |
| if not self._loaded: |
| return |
|
|
| try: |
| |
| self.file_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| async with self._lock: |
| data = { |
| "hourly": dict(self._hourly), |
| "daily": dict(self._daily), |
| "models": dict(self._models) |
| } |
| content = orjson.dumps(data) |
| await asyncio.to_thread(self.file_path.write_bytes, content) |
| except Exception as e: |
| logger.error(f"[Stats] 保存数据失败: {e}") |
| |
| async def record_request(self, model: str, success: bool) -> None: |
| """记录一次请求""" |
| if not self._loaded: |
| await self.init() |
| |
| now = datetime.now() |
| hour_key = now.strftime("%Y-%m-%dT%H") |
| day_key = now.strftime("%Y-%m-%d") |
| |
| |
| self._hourly[hour_key]["total"] += 1 |
| if success: |
| self._hourly[hour_key]["success"] += 1 |
| else: |
| self._hourly[hour_key]["failed"] += 1 |
| |
| |
| self._daily[day_key]["total"] += 1 |
| if success: |
| self._daily[day_key]["success"] += 1 |
| else: |
| self._daily[day_key]["failed"] += 1 |
| |
| |
| self._models[model] += 1 |
| |
| |
| self._cleanup() |
| |
| |
| asyncio.create_task(self._save_data()) |
| |
| def _cleanup(self) -> None: |
| """清理过期数据""" |
| now = datetime.now() |
| |
| |
| hour_keys = list(self._hourly.keys()) |
| if len(hour_keys) > self._hourly_keep: |
| for key in sorted(hour_keys)[:-self._hourly_keep]: |
| del self._hourly[key] |
| |
| |
| day_keys = list(self._daily.keys()) |
| if len(day_keys) > self._daily_keep: |
| for key in sorted(day_keys)[:-self._daily_keep]: |
| del self._daily[key] |
| |
| def get_stats(self, hours: int = 24, days: int = 7) -> Dict[str, Any]: |
| """获取统计数据""" |
| now = datetime.now() |
| |
| |
| hourly_data = [] |
| for i in range(hours - 1, -1, -1): |
| from datetime import timedelta |
| dt = now - timedelta(hours=i) |
| key = dt.strftime("%Y-%m-%dT%H") |
| data = self._hourly.get(key, {"total": 0, "success": 0, "failed": 0}) |
| hourly_data.append({ |
| "hour": dt.strftime("%H:00"), |
| "date": dt.strftime("%m-%d"), |
| **data |
| }) |
| |
| |
| daily_data = [] |
| for i in range(days - 1, -1, -1): |
| from datetime import timedelta |
| dt = now - timedelta(days=i) |
| key = dt.strftime("%Y-%m-%d") |
| data = self._daily.get(key, {"total": 0, "success": 0, "failed": 0}) |
| daily_data.append({ |
| "date": dt.strftime("%m-%d"), |
| **data |
| }) |
| |
| |
| model_data = sorted(self._models.items(), key=lambda x: x[1], reverse=True)[:10] |
| |
| |
| total_requests = sum(d["total"] for d in self._hourly.values()) |
| total_success = sum(d["success"] for d in self._hourly.values()) |
| total_failed = sum(d["failed"] for d in self._hourly.values()) |
| |
| return { |
| "hourly": hourly_data, |
| "daily": daily_data, |
| "models": [{"model": m, "count": c} for m, c in model_data], |
| "summary": { |
| "total": total_requests, |
| "success": total_success, |
| "failed": total_failed, |
| "success_rate": round(total_success / total_requests * 100, 1) if total_requests > 0 else 0 |
| } |
| } |
| |
| async def reset(self) -> None: |
| """重置所有统计""" |
| self._hourly.clear() |
| self._daily.clear() |
| self._models.clear() |
| await self._save_data() |
|
|
|
|
| |
| request_stats = RequestStats() |
|
|