| """
|
| Uptime 实时监控与心跳历史持久化。
|
| """
|
|
|
| from collections import deque
|
| from datetime import datetime, timezone, timedelta
|
| from typing import Dict, List, Optional
|
| import json
|
| import os
|
| from threading import Lock
|
|
|
|
|
| BEIJING_TZ = timezone(timedelta(hours=8))
|
|
|
|
|
| MAX_HEARTBEATS = 60
|
| SLOW_THRESHOLD_MS = 40000
|
| WARNING_STATUS_CODES = {429}
|
|
|
| _storage_path: Optional[str] = None
|
| _storage_lock = Lock()
|
|
|
|
|
| SERVICES = {
|
| "api_service": {"name": "API 服务", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
|
| "account_pool": {"name": "服务资源", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
|
| "gemini-2.5-flash": {"name": "Gemini 2.5 Flash", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
|
| "gemini-2.5-pro": {"name": "Gemini 2.5 Pro", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
|
| "gemini-3-flash-preview": {"name": "Gemini 3 Flash Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
|
| "gemini-3-pro-preview": {"name": "Gemini 3 Pro Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
|
| }
|
|
|
| SUPPORTED_MODELS = ["gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview"]
|
|
|
|
|
| def configure_storage(path: Optional[str]) -> None:
|
| """配置心跳持久化路径。"""
|
| global _storage_path
|
| _storage_path = path
|
|
|
|
|
| def _classify_level(success: bool, status_code: Optional[int], latency_ms: Optional[int]) -> str:
|
| if status_code in WARNING_STATUS_CODES:
|
| return "warn"
|
| if success and latency_ms is not None and latency_ms >= SLOW_THRESHOLD_MS:
|
| return "warn"
|
| return "up" if success else "down"
|
|
|
|
|
| def _save_heartbeats() -> None:
|
| if not _storage_path:
|
| return
|
| try:
|
| payload = {}
|
| for service_id, service_data in SERVICES.items():
|
| payload[service_id] = list(service_data["heartbeats"])
|
| os.makedirs(os.path.dirname(_storage_path), exist_ok=True)
|
| with _storage_lock, open(_storage_path, "w", encoding="utf-8") as f:
|
| json.dump(payload, f, ensure_ascii=True, indent=2)
|
| except Exception:
|
| return
|
|
|
|
|
| def load_heartbeats() -> None:
|
| if not _storage_path or not os.path.exists(_storage_path):
|
| return
|
| try:
|
| with _storage_lock, open(_storage_path, "r", encoding="utf-8") as f:
|
| payload = json.load(f)
|
| for service_id, heartbeats in payload.items():
|
| if service_id not in SERVICES:
|
| continue
|
| SERVICES[service_id]["heartbeats"].clear()
|
| for beat in heartbeats[-MAX_HEARTBEATS:]:
|
| SERVICES[service_id]["heartbeats"].append(beat)
|
| except Exception:
|
| return
|
|
|
|
|
| def record_request(
|
| service: str,
|
| success: bool,
|
| latency_ms: Optional[int] = None,
|
| status_code: Optional[int] = None
|
| ):
|
| """记录一次心跳。"""
|
| if service not in SERVICES:
|
| return
|
|
|
| level = _classify_level(success, status_code, latency_ms)
|
| heartbeat = {
|
| "time": datetime.now(BEIJING_TZ).strftime("%H:%M:%S"),
|
| "success": success,
|
| "level": level,
|
| }
|
| if latency_ms is not None:
|
| heartbeat["latency_ms"] = latency_ms
|
| if status_code is not None:
|
| heartbeat["status_code"] = status_code
|
|
|
| SERVICES[service]["heartbeats"].append(heartbeat)
|
| _save_heartbeats()
|
|
|
|
|
| def get_realtime_status() -> Dict:
|
| """返回实时监控数据。"""
|
| result = {"services": {}}
|
|
|
| for service_id, service_data in SERVICES.items():
|
| heartbeats = list(service_data["heartbeats"])
|
| total = len(heartbeats)
|
| success = sum(1 for h in heartbeats if h.get("success"))
|
|
|
| uptime = (success / total * 100) if total > 0 else 100.0
|
|
|
| last_status = "unknown"
|
| if heartbeats:
|
| last_level = heartbeats[-1].get("level")
|
| if last_level in {"up", "down", "warn"}:
|
| last_status = last_level
|
| else:
|
| last_status = "up" if heartbeats[-1].get("success") else "down"
|
|
|
| result["services"][service_id] = {
|
| "name": service_data["name"],
|
| "status": last_status,
|
| "uptime": round(uptime, 1),
|
| "total": total,
|
| "success": success,
|
| "heartbeats": heartbeats[-MAX_HEARTBEATS:],
|
| }
|
|
|
| result["updated_at"] = datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
| return result
|
|
|
|
|
| async def get_uptime_summary(days: int = 90) -> Dict:
|
| """兼容旧接口。"""
|
| return get_realtime_status()
|
|
|