gcli2api / src /panel /utils.py
a3216's picture
sync: github -> hf space
c50496f
"""
共享工具模块 - 包含WebSocket连接管理、工具函数等
"""
import os
import time
from collections import deque
from typing import Set
from fastapi import HTTPException, WebSocket
from starlette.websockets import WebSocketState
import config
from log import log
# =============================================================================
# WebSocket连接管理
# =============================================================================
class ConnectionManager:
def __init__(self, max_connections: int = 3): # 进一步降低最大连接数
# 使用双端队列严格限制内存使用
self.active_connections: deque = deque(maxlen=max_connections)
self.max_connections = max_connections
self._last_cleanup = 0
self._cleanup_interval = 120 # 120秒清理一次死连接
async def connect(self, websocket: WebSocket):
# 自动清理死连接
self._auto_cleanup()
# 限制最大连接数,防止内存无限增长
if len(self.active_connections) >= self.max_connections:
await websocket.close(code=1008, reason="Too many connections")
return False
await websocket.accept()
self.active_connections.append(websocket)
log.debug(f"WebSocket连接建立,当前连接数: {len(self.active_connections)}")
return True
def disconnect(self, websocket: WebSocket):
# 使用更高效的方式移除连接
try:
self.active_connections.remove(websocket)
except ValueError:
pass # 连接已不存在
log.debug(f"WebSocket连接断开,当前连接数: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
try:
await websocket.send_text(message)
except Exception:
self.disconnect(websocket)
async def broadcast(self, message: str):
# 使用更高效的方式处理广播,避免索引操作
dead_connections = []
for conn in self.active_connections:
try:
await conn.send_text(message)
except Exception:
dead_connections.append(conn)
# 批量移除死连接
for dead_conn in dead_connections:
self.disconnect(dead_conn)
def _auto_cleanup(self):
"""自动清理死连接"""
current_time = time.time()
if current_time - self._last_cleanup > self._cleanup_interval:
self.cleanup_dead_connections()
self._last_cleanup = current_time
def cleanup_dead_connections(self):
"""清理已断开的连接"""
original_count = len(self.active_connections)
# 使用列表推导式过滤活跃连接,更高效
alive_connections = deque(
[
conn
for conn in self.active_connections
if hasattr(conn, "client_state")
and conn.client_state != WebSocketState.DISCONNECTED
],
maxlen=self.max_connections,
)
self.active_connections = alive_connections
cleaned = original_count - len(self.active_connections)
if cleaned > 0:
log.debug(f"清理了 {cleaned} 个死连接,剩余连接数: {len(self.active_connections)}")
# =============================================================================
# 工具函数
# =============================================================================
def is_mobile_user_agent(user_agent: str) -> bool:
"""检测是否为移动设备用户代理"""
if not user_agent:
return False
user_agent_lower = user_agent.lower()
mobile_keywords = [
"mobile",
"android",
"iphone",
"ipad",
"ipod",
"blackberry",
"windows phone",
"samsung",
"htc",
"motorola",
"nokia",
"palm",
"webos",
"opera mini",
"opera mobi",
"fennec",
"minimo",
"symbian",
"psp",
"nintendo",
"tablet",
]
return any(keyword in user_agent_lower for keyword in mobile_keywords)
def validate_mode(mode: str = "geminicli") -> str:
"""
验证 mode 参数
Args:
mode: 模式字符串 ("geminicli" 或 "antigravity")
Returns:
str: 验证后的 mode 字符串
Raises:
HTTPException: 如果 mode 参数无效
"""
if mode not in ["geminicli", "antigravity"]:
raise HTTPException(
status_code=400,
detail=f"无效的 mode 参数: {mode},只支持 'geminicli' 或 'antigravity'"
)
return mode
def get_env_locked_keys() -> Set:
"""获取被环境变量锁定的配置键集合"""
env_locked_keys = set()
# 使用 config.py 中统一维护的映射表
for env_key, config_key in config.ENV_MAPPINGS.items():
if os.getenv(env_key):
env_locked_keys.add(config_key)
return env_locked_keys