File size: 13,005 Bytes
6d8f1e0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 | """
Gemi2Api Server 管理面板后端
提供状态监控、配置管理、日志查看等功能
"""
import hashlib
import importlib.metadata
import os
import subprocess
import sys
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
def _get_version() -> str:
try:
return importlib.metadata.version("gemi2api-server")
except importlib.metadata.PackageNotFoundError:
return "unknown"
# 创建路由器
router = APIRouter(prefix="/admin", tags=["admin"])
# 全局状态跟踪
_start_time = time.time()
_request_log = deque(maxlen=100) # 保留最近100条日志
def mask_cookie(value: str) -> str:
"""对 Cookie 值进行脱敏显示:前4位 + *** + 后4位"""
if not value or len(value) <= 8:
return value or ""
return value[:4] + "***" + value[-4:]
_stats = {
"total_requests": 0,
"error_count": 0,
"total_response_time": 0.0,
}
# 管理面板会话存储 { token: expire_timestamp }
_admin_sessions = {}
SESSION_EXPIRE_HOURS = 12
# 环境变量路径
ENV_FILE = Path(__file__).parent / ".env"
class ConfigUpdate(BaseModel):
"""配置更新请求"""
host: Optional[str] = None
port: Optional[int] = None
api_key: Optional[str] = None
feature: Optional[str] = None
enabled: Optional[bool] = None
def log_request(method: str, path: str, status: int, response_time: float = 0):
"""记录请求日志"""
_stats["total_requests"] += 1
_stats["total_response_time"] += response_time
if status >= 400:
_stats["error_count"] += 1
_request_log.appendleft(
{
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"method": method,
"path": path,
"status": status,
"response_time": round(response_time * 1000, 2),
}
)
def format_uptime(seconds: float) -> str:
"""格式化运行时间"""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
if days > 0:
return f"{days}天 {hours}小时"
elif hours > 0:
return f"{hours}小时 {minutes}分钟"
else:
return f"{minutes}分钟"
def read_env() -> dict:
"""读取 .env 文件"""
env_vars = {}
if ENV_FILE.exists():
with open(ENV_FILE, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, value = line.partition("=")
env_vars[key.strip()] = value.strip().strip('"').strip("'")
return env_vars
def write_env(updates: dict):
"""更新 .env 文件"""
env_vars = read_env()
env_vars.update(updates)
with open(ENV_FILE, "w") as f:
for key, value in env_vars.items():
f.write(f'{key}="{value}"\n')
@router.get("/", response_class=HTMLResponse)
async def admin_page():
"""返回管理面板页面"""
html_path = Path(__file__).parent / "templates" / "admin.html"
if html_path.exists():
return HTMLResponse(content=html_path.read_text(encoding="utf-8"))
return HTMLResponse(content="<h1>管理面板文件未找到</h1>", status_code=404)
class LoginRequest(BaseModel):
"""登录请求"""
api_key: str
def _generate_token(api_key: str) -> str:
"""生成会话 token"""
raw = f"{api_key}:{time.time()}:{os.urandom(16).hex()}"
return hashlib.sha256(raw.encode()).hexdigest()
def _clean_expired_sessions():
"""清理过期会话"""
now = time.time()
expired = [t for t, exp in _admin_sessions.items() if exp < now]
for t in expired:
del _admin_sessions[t]
@router.post("/api/login")
async def admin_login(req: LoginRequest):
"""管理面板登录验证"""
from main import API_KEY
# 未配置 API_KEY 时不允许登录
if not API_KEY:
raise HTTPException(status_code=400, detail="未配置 API_KEY,管理面板不可用。请在 .env 中设置 API_KEY 后重启服务。")
if req.api_key != API_KEY:
raise HTTPException(status_code=401, detail="API_KEY 无效")
# 生成 token,12小时有效
_clean_expired_sessions()
token = _generate_token(req.api_key)
_admin_sessions[token] = time.time() + SESSION_EXPIRE_HOURS * 3600
return {
"success": True,
"token": token,
"expires_in": SESSION_EXPIRE_HOURS * 3600,
"message": f"登录成功,会话有效期 {SESSION_EXPIRE_HOURS} 小时",
}
@router.get("/api/check")
async def admin_check(token: str):
"""检查会话是否有效"""
_clean_expired_sessions()
if token not in _admin_sessions:
raise HTTPException(status_code=401, detail="会话无效或已过期")
expire_at = _admin_sessions[token]
remaining = int(expire_at - time.time())
return {"valid": True, "remaining_seconds": remaining, "remaining_hours": round(remaining / 3600, 1)}
async def verify_admin_token(request: Request):
"""验证管理面板 token 的依赖注入"""
token = request.headers.get("X-Admin-Token") or request.query_params.get("token")
if not token:
raise HTTPException(status_code=401, detail="缺少管理面板 token")
_clean_expired_sessions()
if token not in _admin_sessions:
raise HTTPException(status_code=401, detail="会话无效或已过期")
return token
@router.get("/api/status")
async def get_status(token: str = Depends(verify_admin_token)):
"""获取服务状态"""
from main import API_KEY, AUTO_DELETE_CHAT, ENABLE_THINKING, HOST, PORT, SECURE_1PSID, SECURE_1PSIDTS, TEMPORARY_CHAT
# 检查 cookie 是否有效(简单检查是否存在)
cookie_valid = bool(SECURE_1PSID and SECURE_1PSIDTS)
# 计算平均响应时间
avg_response_time = 0
if _stats["total_requests"] > 0:
avg_response_time = round(_stats["total_response_time"] / _stats["total_requests"] * 1000, 2)
# 计算错误率
error_rate = 0
if _stats["total_requests"] > 0:
error_rate = round(_stats["error_count"] / _stats["total_requests"] * 100, 1)
return {
"running": True,
"uptime": format_uptime(time.time() - _start_time),
"total_requests": _stats["total_requests"],
"avg_response_time": avg_response_time,
"error_rate": error_rate,
"host": HOST,
"port": PORT,
"api_key_enabled": bool(API_KEY),
"cookie_valid": cookie_valid,
"secure_1psid_masked": mask_cookie(SECURE_1PSID),
"secure_1psidts_masked": mask_cookie(SECURE_1PSIDTS),
"thinking_enabled": ENABLE_THINKING,
"temporary_chat": TEMPORARY_CHAT,
"auto_delete_chat": AUTO_DELETE_CHAT,
"version": _get_version(),
"start_time": datetime.fromtimestamp(_start_time).strftime("%Y-%m-%d %H:%M:%S"),
}
@router.get("/api/logs")
async def get_logs(token: str = Depends(verify_admin_token)):
"""获取最近的日志"""
return {"logs": list(_request_log)}
@router.post("/api/config")
async def update_config(config: ConfigUpdate, token: str = Depends(verify_admin_token)):
"""更新配置"""
# 更新功能开关
if config.feature and config.enabled is not None:
env_key = None
if config.feature == "thinking":
env_key = "ENABLE_THINKING"
elif config.feature == "temporary":
env_key = "TEMPORARY_CHAT"
elif config.feature == "autoDelete":
env_key = "AUTO_DELETE_CHAT"
if env_key:
write_env({env_key: str(config.enabled).lower()})
# 更新运行时变量
if config.feature == "thinking":
import main
main.ENABLE_THINKING = config.enabled
elif config.feature == "temporary":
import main
main.TEMPORARY_CHAT = config.enabled
elif config.feature == "autoDelete":
import main
main.AUTO_DELETE_CHAT = config.enabled
return {"success": True, "message": f"功能 {config.feature} 已{'启用' if config.enabled else '禁用'}"}
# 更新网络配置
if config.host or config.port:
updates = {}
if config.host:
updates["HOST"] = config.host
if config.port:
updates["PORT"] = str(config.port)
if config.api_key is not None:
updates["API_KEY"] = config.api_key
write_env(updates)
return {"success": True, "message": "配置已保存,重启服务后生效"}
# 更新 API_KEY
if config.api_key is not None:
write_env({"API_KEY": config.api_key})
return {"success": True, "message": "API_KEY 已更新"}
raise HTTPException(status_code=400, detail="无效的配置请求")
@router.post("/api/config-save-restart")
async def save_config_and_restart(config: ConfigUpdate, token: str = Depends(verify_admin_token)):
"""保存配置并重启服务(一步完成)"""
env_updates = {}
if config.host is not None:
env_updates["HOST"] = config.host
if config.port is not None:
env_updates["PORT"] = str(config.port)
if config.api_key is not None:
env_updates["API_KEY"] = config.api_key
if env_updates:
write_env(env_updates)
# 重启服务
try:
python_path = sys.executable
script_path = os.path.abspath(__file__).replace("admin.py", "main.py")
subprocess.Popen(
[python_path, script_path],
cwd=os.path.dirname(script_path),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
import main
main.os._exit(0)
except Exception as e:
raise HTTPException(status_code=500, detail=f"重启失败: {str(e)}")
@router.post("/api/restart")
async def restart_service(token: str = Depends(verify_admin_token)):
"""重启服务"""
try:
# 获取当前进程的命令行参数
python_path = sys.executable
script_path = os.path.abspath(__file__).replace("admin.py", "main.py")
# 启动新进程
subprocess.Popen(
[python_path, script_path],
cwd=os.path.dirname(script_path),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# 终止当前进程
os._exit(0)
except Exception as e:
raise HTTPException(status_code=500, detail=f"重启失败: {str(e)}")
class CookieUpdate(BaseModel):
"""Cookie 更新请求"""
secure_1psid: str
secure_1psidts: str
@router.post("/api/cookies")
async def update_cookies(cookies: CookieUpdate, token: str = Depends(verify_admin_token)):
"""更新 Gemini Cookie"""
if not cookies.secure_1psid or not cookies.secure_1psidts:
raise HTTPException(status_code=400, detail="Cookie 值不能为空")
# 保存到 .env 文件
write_env(
{
"SECURE_1PSID": cookies.secure_1psid,
"SECURE_1PSIDTS": cookies.secure_1psidts,
}
)
# 更新运行时变量
import main
main.SECURE_1PSID = cookies.secure_1psid
main.SECURE_1PSIDTS = cookies.secure_1psidts
return {"success": True, "message": "Cookie 已保存并生效"}
@router.post("/api/cookies-save-reinit")
async def save_cookies_and_reinit(cookies: CookieUpdate, token: str = Depends(verify_admin_token)):
"""保存 Cookie 并重新连接 Gemini(一步完成)"""
if not cookies.secure_1psid or not cookies.secure_1psidts:
raise HTTPException(status_code=400, detail="Cookie 值不能为空")
# 保存到 .env 文件
write_env(
{
"SECURE_1PSID": cookies.secure_1psid,
"SECURE_1PSIDTS": cookies.secure_1psidts,
}
)
# 更新运行时变量
import main
main.SECURE_1PSID = cookies.secure_1psid
main.SECURE_1PSIDTS = cookies.secure_1psidts
# 重新初始化客户端
async with main.gemini_client_lock:
if main.gemini_client is not None:
try:
await main.gemini_client.close()
except Exception:
pass
main.gemini_client = None
try:
client = await main.get_gemini_client()
if client:
return {"success": True, "message": "Cookie 已保存,Gemini 重新连接成功"}
else:
return {"success": False, "message": "Cookie 已保存,但 Gemini 连接失败"}
except Exception as e:
return {"success": False, "message": f"Cookie 已保存,但 Gemini 重连失败: {str(e)}"}
@router.post("/api/reinit")
async def reinit_client(token: str = Depends(verify_admin_token)):
"""重新初始化 Gemini 客户端"""
import main
# 在锁内关闭旧客户端,避免竞态
async with main.gemini_client_lock:
if main.gemini_client is not None:
try:
await main.gemini_client.close()
except Exception:
pass
main.gemini_client = None
# 尝试重新初始化(get_gemini_client 内部也会获取锁)
try:
client = await main.get_gemini_client()
if client:
return {"success": True, "message": "Gemini 客户端重新连接成功"}
else:
raise HTTPException(status_code=500, detail="客户端初始化返回空值")
except Exception as e:
raise HTTPException(status_code=500, detail=f"重新连接失败: {str(e)}")
def setup_middleware(app):
"""设置请求日志中间件"""
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.time()
# 跳过静态资源和管理面板的请求
path = request.url.path
if path.startswith("/admin") or path.startswith("/static"):
return await call_next(request)
response = await call_next(request)
# 记录 API 请求
if path.startswith("/v1/"):
duration = time.time() - start
log_request(request.method, path, response.status_code, duration)
return response
app.add_middleware(RequestLoggingMiddleware)
|