File size: 3,798 Bytes
60016b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
from fastapi import APIRouter
from datetime import datetime, timedelta
from app.utils import (
log_manager,
ResponseCacheManager,
ActiveRequestsManager,
clean_expired_stats
)
from app.config.settings import (
api_call_stats,
client_request_history,
API_KEY_DAILY_LIMIT
)
from app.services import GeminiClient
# 创建路由器
dashboard_router = APIRouter(prefix="/api", tags=["dashboard"])
# 全局变量引用,将在init_dashboard_router中设置
key_manager = None
response_cache_manager = None
active_requests_manager = None
def init_dashboard_router(
key_mgr,
cache_mgr,
active_req_mgr
):
"""初始化仪表盘路由器"""
global key_manager, response_cache_manager, active_requests_manager
key_manager = key_mgr
response_cache_manager = cache_mgr
active_requests_manager = active_req_mgr
return dashboard_router
@dashboard_router.get("/dashboard-data")
async def get_dashboard_data():
"""获取仪表盘数据的API端点,用于动态刷新"""
# 先清理过期数据,确保统计数据是最新的
clean_expired_stats(api_call_stats)
response_cache_manager.clean_expired() # 使用管理器清理缓存
active_requests_manager.clean_completed() # 使用管理器清理活跃请求
# 获取当前统计数据
now = datetime.now()
# 计算过去24小时的调用总数
last_24h_calls = sum(api_call_stats['last_24h']['total'].values())
# 计算过去一小时内的调用总数
one_hour_ago = now - timedelta(hours=1)
hourly_calls = 0
for hour_key, count in api_call_stats['hourly']['total'].items():
try:
hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
if hour_time >= one_hour_ago:
hourly_calls += count
except ValueError:
continue
# 计算过去一分钟内的调用总数
one_minute_ago = now - timedelta(minutes=1)
minute_calls = 0
for minute_key, count in api_call_stats['minute']['total'].items():
try:
minute_time = datetime.strptime(minute_key, '%Y-%m-%d %H:%M')
if minute_time >= one_minute_ago:
minute_calls += count
except ValueError:
continue
# 获取API密钥使用统计
api_key_stats = []
for api_key in key_manager.api_keys:
# 获取API密钥前8位作为标识
api_key_id = api_key[:8]
# 计算24小时内的调用次数
calls_24h = 0
if 'by_endpoint' in api_call_stats['last_24h'] and api_key in api_call_stats['last_24h']['by_endpoint']:
calls_24h = sum(api_call_stats['last_24h']['by_endpoint'][api_key].values())
# 计算使用百分比
usage_percent = (calls_24h / API_KEY_DAILY_LIMIT) * 100 if API_KEY_DAILY_LIMIT > 0 else 0
# 添加到结果列表
api_key_stats.append({
'api_key': api_key_id,
'calls_24h': calls_24h,
'limit': API_KEY_DAILY_LIMIT,
'usage_percent': round(usage_percent, 2)
})
# 按使用百分比降序排序
api_key_stats.sort(key=lambda x: x['usage_percent'], reverse=True)
# 获取最近的日志
recent_logs = log_manager.get_recent_logs(50) # 获取最近50条日志
# 返回JSON格式的数据
return {
"key_count": len(key_manager.api_keys),
"model_count": len(GeminiClient.AVAILABLE_MODELS),
"retry_count": len(key_manager.api_keys),
"last_24h_calls": last_24h_calls,
"hourly_calls": hourly_calls,
"minute_calls": minute_calls,
"current_time": datetime.now().strftime('%H:%M:%S'),
"logs": recent_logs,
"api_key_stats": api_key_stats
} |