Spaces:
Sleeping
Sleeping
File size: 4,520 Bytes
2974aa2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """
Uptime 实时监控与心跳历史持久化。
"""
from collections import deque
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional
import json
import os
from threading import Lock
# 北京时区 UTC+8
BEIJING_TZ = timezone(timedelta(hours=8))
# 每个服务保留最近 60 条心跳
MAX_HEARTBEATS = 60
SLOW_THRESHOLD_MS = 40000
WARNING_STATUS_CODES = {429}
_storage_path: Optional[str] = None
_storage_lock = Lock()
# 服务注册表
SERVICES = {
"api_service": {"name": "API 服务", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"account_pool": {"name": "服务资源", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-2.5-flash": {"name": "Gemini 2.5 Flash", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-2.5-pro": {"name": "Gemini 2.5 Pro", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-3-flash-preview": {"name": "Gemini 3 Flash Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-3-pro-preview": {"name": "Gemini 3 Pro Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
}
SUPPORTED_MODELS = ["gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview"]
def configure_storage(path: Optional[str]) -> None:
"""配置心跳持久化路径。"""
global _storage_path
_storage_path = path
def _classify_level(success: bool, status_code: Optional[int], latency_ms: Optional[int]) -> str:
if status_code in WARNING_STATUS_CODES:
return "warn"
if success and latency_ms is not None and latency_ms >= SLOW_THRESHOLD_MS:
return "warn"
return "up" if success else "down"
def _save_heartbeats() -> None:
if not _storage_path:
return
try:
payload = {}
for service_id, service_data in SERVICES.items():
payload[service_id] = list(service_data["heartbeats"])
os.makedirs(os.path.dirname(_storage_path), exist_ok=True)
with _storage_lock, open(_storage_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=True, indent=2)
except Exception:
return
def load_heartbeats() -> None:
if not _storage_path or not os.path.exists(_storage_path):
return
try:
with _storage_lock, open(_storage_path, "r", encoding="utf-8") as f:
payload = json.load(f)
for service_id, heartbeats in payload.items():
if service_id not in SERVICES:
continue
SERVICES[service_id]["heartbeats"].clear()
for beat in heartbeats[-MAX_HEARTBEATS:]:
SERVICES[service_id]["heartbeats"].append(beat)
except Exception:
return
def record_request(
service: str,
success: bool,
latency_ms: Optional[int] = None,
status_code: Optional[int] = None
):
"""记录一次心跳。"""
if service not in SERVICES:
return
level = _classify_level(success, status_code, latency_ms)
heartbeat = {
"time": datetime.now(BEIJING_TZ).strftime("%H:%M:%S"),
"success": success,
"level": level,
}
if latency_ms is not None:
heartbeat["latency_ms"] = latency_ms
if status_code is not None:
heartbeat["status_code"] = status_code
SERVICES[service]["heartbeats"].append(heartbeat)
_save_heartbeats()
def get_realtime_status() -> Dict:
"""返回实时监控数据。"""
result = {"services": {}}
for service_id, service_data in SERVICES.items():
heartbeats = list(service_data["heartbeats"])
total = len(heartbeats)
success = sum(1 for h in heartbeats if h.get("success"))
uptime = (success / total * 100) if total > 0 else 100.0
last_status = "unknown"
if heartbeats:
last_level = heartbeats[-1].get("level")
if last_level in {"up", "down", "warn"}:
last_status = last_level
else:
last_status = "up" if heartbeats[-1].get("success") else "down"
result["services"][service_id] = {
"name": service_data["name"],
"status": last_status,
"uptime": round(uptime, 1),
"total": total,
"success": success,
"heartbeats": heartbeats[-MAX_HEARTBEATS:],
}
result["updated_at"] = datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
return result
async def get_uptime_summary(days: int = 90) -> Dict:
"""兼容旧接口。"""
return get_realtime_status()
|