xiaoyukkkk's picture
Upload 9 files
e42f78b verified
raw
history blame
2.64 kB
"""
Uptime 实时监控追踪器
类似 Uptime Kuma 的心跳监控,显示最近请求状态
"""
from collections import deque
from datetime import datetime, timezone, timedelta
from typing import Dict, List
# 北京时区 UTC+8
BEIJING_TZ = timezone(timedelta(hours=8))
# 每个服务保留最近 60 条心跳记录
MAX_HEARTBEATS = 60
# 服务配置
SERVICES = {
"api_service": {"name": "API 服务", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"account_pool": {"name": "服务资源", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-2.5-flash": {"name": "Gemini 2.5 Flash", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-2.5-pro": {"name": "Gemini 2.5 Pro", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-3-flash-preview": {"name": "Gemini 3 Flash Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
"gemini-3-pro-preview": {"name": "Gemini 3 Pro Preview", "heartbeats": deque(maxlen=MAX_HEARTBEATS)},
}
SUPPORTED_MODELS = ["gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview"]
def record_request(service: str, success: bool):
"""记录请求心跳"""
if service not in SERVICES:
return
SERVICES[service]["heartbeats"].append({
"time": datetime.now(BEIJING_TZ).strftime("%H:%M:%S"),
"success": success
})
def get_realtime_status() -> Dict:
"""获取实时状态数据"""
result = {"services": {}}
for service_id, service_data in SERVICES.items():
heartbeats = list(service_data["heartbeats"])
total = len(heartbeats)
success = sum(1 for h in heartbeats if h["success"])
# 计算可用率
uptime = (success / total * 100) if total > 0 else 100.0
# 最近状态
last_status = "unknown"
if heartbeats:
last_status = "up" if heartbeats[-1]["success"] else "down"
result["services"][service_id] = {
"name": service_data["name"],
"status": last_status,
"uptime": round(uptime, 1),
"total": total,
"success": success,
"heartbeats": heartbeats[-MAX_HEARTBEATS:] # 最近的心跳
}
result["updated_at"] = datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")
return result
# 兼容旧接口
async def get_uptime_summary(days: int = 90) -> Dict:
"""兼容旧接口,返回实时数据"""
return get_realtime_status()
async def uptime_aggregation_task():
"""后台任务(保留兼容性,实际不需要聚合)"""
pass