Spaces:
Sleeping
Sleeping
| """ | |
| Uptime 实时监控与心跳历史持久化。 | |
| """ | |
| from collections import deque | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Dict, List, Optional | |
| import json | |
| import os | |
| from threading import Lock | |
| # 北京时区 UTC+8 | |
| BEIJING_TZ = timezone(timedelta(hours=8)) | |
| # 每个服务保留最近 60 条心跳 | |
| MAX_HEARTBEATS = 60 | |
| SLOW_THRESHOLD_MS = 40000 | |
| WARNING_STATUS_CODES = {429} | |
| _storage_path: Optional[str] = None | |
| _storage_lock = Lock() | |
| # 服务注册表 | |
| SERVICES = { | |
| "api_service": {"name": "API 服务", "heartbeats": deque(maxlen=MAX_HEARTBEATS)}, | |
| "account_pool": {"name": "服务资源", "heartbeats": deque(maxlen=MAX_HEARTBEATS)}, | |
| "gemini-2.5-flash": {"name": "Gemini 2.5 Flash", "heartbeats": deque(maxlen=MAX_HEARTBEATS)}, | |
| "gemini-2.5-pro": {"name": "Gemini 2.5 Pro", "heartbeats": deque(maxlen=MAX_HEARTBEATS)}, | |
| "gemini-3-flash-preview": {"name": "Gemini 3 Flash Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)}, | |
| "gemini-3-pro-preview": {"name": "Gemini 3 Pro Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)}, | |
| } | |
| SUPPORTED_MODELS = ["gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview"] | |
| def configure_storage(path: Optional[str]) -> None: | |
| """配置心跳持久化路径。""" | |
| global _storage_path | |
| _storage_path = path | |
| def _classify_level(success: bool, status_code: Optional[int], latency_ms: Optional[int]) -> str: | |
| if status_code in WARNING_STATUS_CODES: | |
| return "warn" | |
| if success and latency_ms is not None and latency_ms >= SLOW_THRESHOLD_MS: | |
| return "warn" | |
| return "up" if success else "down" | |
| def _save_heartbeats() -> None: | |
| if not _storage_path: | |
| return | |
| try: | |
| payload = {} | |
| for service_id, service_data in SERVICES.items(): | |
| payload[service_id] = list(service_data["heartbeats"]) | |
| os.makedirs(os.path.dirname(_storage_path), exist_ok=True) | |
| with _storage_lock, open(_storage_path, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, ensure_ascii=True, indent=2) | |
| except Exception: | |
| return | |
| def load_heartbeats() -> None: | |
| if not _storage_path or not os.path.exists(_storage_path): | |
| return | |
| try: | |
| with _storage_lock, open(_storage_path, "r", encoding="utf-8") as f: | |
| payload = json.load(f) | |
| for service_id, heartbeats in payload.items(): | |
| if service_id not in SERVICES: | |
| continue | |
| SERVICES[service_id]["heartbeats"].clear() | |
| for beat in heartbeats[-MAX_HEARTBEATS:]: | |
| SERVICES[service_id]["heartbeats"].append(beat) | |
| except Exception: | |
| return | |
| def record_request( | |
| service: str, | |
| success: bool, | |
| latency_ms: Optional[int] = None, | |
| status_code: Optional[int] = None | |
| ): | |
| """记录一次心跳。""" | |
| if service not in SERVICES: | |
| return | |
| level = _classify_level(success, status_code, latency_ms) | |
| heartbeat = { | |
| "time": datetime.now(BEIJING_TZ).strftime("%H:%M:%S"), | |
| "success": success, | |
| "level": level, | |
| } | |
| if latency_ms is not None: | |
| heartbeat["latency_ms"] = latency_ms | |
| if status_code is not None: | |
| heartbeat["status_code"] = status_code | |
| SERVICES[service]["heartbeats"].append(heartbeat) | |
| _save_heartbeats() | |
| def get_realtime_status() -> Dict: | |
| """返回实时监控数据。""" | |
| result = {"services": {}} | |
| for service_id, service_data in SERVICES.items(): | |
| heartbeats = list(service_data["heartbeats"]) | |
| total = len(heartbeats) | |
| success = sum(1 for h in heartbeats if h.get("success")) | |
| uptime = (success / total * 100) if total > 0 else 100.0 | |
| last_status = "unknown" | |
| if heartbeats: | |
| last_level = heartbeats[-1].get("level") | |
| if last_level in {"up", "down", "warn"}: | |
| last_status = last_level | |
| else: | |
| last_status = "up" if heartbeats[-1].get("success") else "down" | |
| result["services"][service_id] = { | |
| "name": service_data["name"], | |
| "status": last_status, | |
| "uptime": round(uptime, 1), | |
| "total": total, | |
| "success": success, | |
| "heartbeats": heartbeats[-MAX_HEARTBEATS:], | |
| } | |
| result["updated_at"] = datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S") | |
| return result | |
| async def get_uptime_summary(days: int = 90) -> Dict: | |
| """兼容旧接口。""" | |
| return get_realtime_status() | |