File size: 2,638 Bytes
e42f78b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""

Uptime 实时监控追踪器

类似 Uptime Kuma 的心跳监控,显示最近请求状态

"""

from collections import deque
from datetime import datetime, timezone, timedelta
from typing import Dict, List

# 北京时区 UTC+8
BEIJING_TZ = timezone(timedelta(hours=8))

# 每个服务保留最近 60 条心跳记录
MAX_HEARTBEATS = 60

# 服务配置
SERVICES = {
    "api_service": {"name": "API 服务", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "account_pool": {"name": "服务资源", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-2.5-flash": {"name": "Gemini 2.5 Flash", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-2.5-pro": {"name": "Gemini 2.5 Pro", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-3-flash-preview": {"name": "Gemini 3 Flash Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
    "gemini-3-pro-preview": {"name": "Gemini 3 Pro Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
}

SUPPORTED_MODELS = ["gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview"]


def record_request(service: str, success: bool):
    """记录请求心跳"""
    if service not in SERVICES:
        return

    SERVICES[service]["heartbeats"].append({
        "time": datetime.now(BEIJING_TZ).strftime("%H:%M:%S"),
        "success": success
    })


def get_realtime_status() -> Dict:
    """获取实时状态数据"""
    result = {"services": {}}

    for service_id, service_data in SERVICES.items():
        heartbeats = list(service_data["heartbeats"])
        total = len(heartbeats)
        success = sum(1 for h in heartbeats if h["success"])

        # 计算可用率
        uptime = (success / total * 100) if total > 0 else 100.0

        # 最近状态
        last_status = "unknown"
        if heartbeats:
            last_status = "up" if heartbeats[-1]["success"] else "down"

        result["services"][service_id] = {
            "name": service_data["name"],
            "status": last_status,
            "uptime": round(uptime, 1),
            "total": total,
            "success": success,
            "heartbeats": heartbeats[-MAX_HEARTBEATS:]  # 最近的心跳
        }

    result["updated_at"] = datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
    return result


# 兼容旧接口
async def get_uptime_summary(days: int = 90) -> Dict:
    """兼容旧接口,返回实时数据"""
    return get_realtime_status()


async def uptime_aggregation_task():
    """后台任务(保留兼容性,实际不需要聚合)"""
    pass