File size: 4,520 Bytes
2974aa2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Uptime 实时监控与心跳历史持久化。
"""

from collections import deque
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional
import json
import os
from threading import Lock

# 北京时区 UTC+8
BEIJING_TZ = timezone(timedelta(hours=8))

# 每个服务保留最近 60 条心跳
MAX_HEARTBEATS = 60
SLOW_THRESHOLD_MS = 40000
WARNING_STATUS_CODES = {429}

_storage_path: Optional[str] = None
_storage_lock = Lock()

# 服务注册表
SERVICES = {
    "api_service": {"name": "API 服务", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "account_pool": {"name": "服务资源", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-2.5-flash": {"name": "Gemini 2.5 Flash", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-2.5-pro": {"name": "Gemini 2.5 Pro", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-3-flash-preview": {"name": "Gemini 3 Flash Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-3-pro-preview": {"name": "Gemini 3 Pro Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
}

SUPPORTED_MODELS = ["gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview"]


def configure_storage(path: Optional[str]) -> None:
    """配置心跳持久化路径。"""
    global _storage_path
    _storage_path = path


def _classify_level(success: bool, status_code: Optional[int], latency_ms: Optional[int]) -> str:
    if status_code in WARNING_STATUS_CODES:
        return "warn"
    if success and latency_ms is not None and latency_ms >= SLOW_THRESHOLD_MS:
        return "warn"
    return "up" if success else "down"


def _save_heartbeats() -> None:
    if not _storage_path:
        return
    try:
        payload = {}
        for service_id, service_data in SERVICES.items():
            payload[service_id] = list(service_data["heartbeats"])
        os.makedirs(os.path.dirname(_storage_path), exist_ok=True)
        with _storage_lock, open(_storage_path, "w", encoding="utf-8") as f:
            json.dump(payload, f, ensure_ascii=True, indent=2)
    except Exception:
        return


def load_heartbeats() -> None:
    if not _storage_path or not os.path.exists(_storage_path):
        return
    try:
        with _storage_lock, open(_storage_path, "r", encoding="utf-8") as f:
            payload = json.load(f)
        for service_id, heartbeats in payload.items():
            if service_id not in SERVICES:
                continue
            SERVICES[service_id]["heartbeats"].clear()
            for beat in heartbeats[-MAX_HEARTBEATS:]:
                SERVICES[service_id]["heartbeats"].append(beat)
    except Exception:
        return


def record_request(
    service: str,
    success: bool,
    latency_ms: Optional[int] = None,
    status_code: Optional[int] = None
):
    """记录一次心跳。"""
    if service not in SERVICES:
        return

    level = _classify_level(success, status_code, latency_ms)
    heartbeat = {
        "time": datetime.now(BEIJING_TZ).strftime("%H:%M:%S"),
        "success": success,
        "level": level,
    }
    if latency_ms is not None:
        heartbeat["latency_ms"] = latency_ms
    if status_code is not None:
        heartbeat["status_code"] = status_code

    SERVICES[service]["heartbeats"].append(heartbeat)
    _save_heartbeats()


def get_realtime_status() -> Dict:
    """返回实时监控数据。"""
    result = {"services": {}}

    for service_id, service_data in SERVICES.items():
        heartbeats = list(service_data["heartbeats"])
        total = len(heartbeats)
        success = sum(1 for h in heartbeats if h.get("success"))

        uptime = (success / total * 100) if total > 0 else 100.0

        last_status = "unknown"
        if heartbeats:
            last_level = heartbeats[-1].get("level")
            if last_level in {"up", "down", "warn"}:
                last_status = last_level
            else:
                last_status = "up" if heartbeats[-1].get("success") else "down"

        result["services"][service_id] = {
            "name": service_data["name"],
            "status": last_status,
            "uptime": round(uptime, 1),
            "total": total,
            "success": success,
            "heartbeats": heartbeats[-MAX_HEARTBEATS:],
        }

    result["updated_at"] = datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
    return result


async def get_uptime_summary(days: int = 90) -> Dict:
    """兼容旧接口。"""
    return get_realtime_status()