G_AI / app /services /pool.py
superxu520's picture
update: 更新聊天服务、客户端连接池和辅助工具
ac10cac
import asyncio
from collections import deque
from typing import Dict, List, Optional
from loguru import logger
from ..utils import g_config
from ..utils.singleton import Singleton
from .client import GeminiClientWrapper
class GeminiClientPool(metaclass=Singleton):
"""Pool of GeminiClient instances identified by unique ids."""
def __init__(self) -> None:
self._clients: List[GeminiClientWrapper] = []
self._id_map: Dict[str, GeminiClientWrapper] = {}
self._round_robin: deque[GeminiClientWrapper] = deque()
self._restart_locks: Dict[str, asyncio.Lock] = {}
self._round_robin_lock = asyncio.Lock() # Lock for thread-safe round-robin
if len(g_config.gemini.clients) == 0:
raise ValueError("No Gemini clients configured")
for c in g_config.gemini.clients:
client = GeminiClientWrapper(
client_id=c.id,
secure_1psid=c.secure_1psid,
secure_1psidts=c.secure_1psidts,
proxy=c.proxy,
)
self._clients.append(client)
self._id_map[c.id] = client
self._round_robin.append(client)
self._restart_locks[c.id] = asyncio.Lock()
async def init(self) -> None:
"""Initialize all clients in the pool."""
success_count = 0
for client in self._clients:
if not client.running():
try:
await client.init(
timeout=g_config.gemini.timeout,
auto_refresh=g_config.gemini.auto_refresh,
verbose=g_config.gemini.verbose,
refresh_interval=g_config.gemini.refresh_interval,
)
except Exception:
logger.exception(f"Failed to initialize client {client.id}")
if client.running():
success_count += 1
if success_count == 0:
raise RuntimeError("Failed to initialize any Gemini clients")
async def acquire(self, client_id: Optional[str] = None) -> GeminiClientWrapper:
"""Return a healthy client by id or using round-robin."""
if not self._round_robin:
raise RuntimeError("No Gemini clients configured")
if client_id:
client = self._id_map.get(client_id)
if not client:
raise ValueError(f"Client id {client_id} not found")
if await self._ensure_client_ready(client):
return client
raise RuntimeError(
f"Gemini client {client_id} is not running and could not be restarted"
)
# Thread-safe round-robin: try each client once
tried_clients = set()
while len(tried_clients) < len(self._round_robin):
# Atomically get next client
async with self._round_robin_lock:
client = self._round_robin[0]
self._round_robin.rotate(-1)
if client in tried_clients:
# Already tried all clients
break
tried_clients.add(client)
# Check readiness outside lock to avoid blocking other requests
if await self._ensure_client_ready(client):
return client
raise RuntimeError("No Gemini clients are currently available")
async def _ensure_client_ready(self, client: GeminiClientWrapper) -> bool:
"""Make sure the client is running, attempting a restart if needed."""
if client.running():
return True
lock = self._restart_locks.get(client.id)
if lock is None:
return False
async with lock:
if client.running():
return True
try:
await client.init(
timeout=g_config.gemini.timeout,
auto_refresh=g_config.gemini.auto_refresh,
verbose=g_config.gemini.verbose,
refresh_interval=g_config.gemini.refresh_interval,
)
logger.info(f"Restarted Gemini client {client.id} after it stopped.")
return True
except Exception:
logger.exception(f"Failed to restart Gemini client {client.id}")
return False
@property
def clients(self) -> List[GeminiClientWrapper]:
"""Return managed clients."""
return self._clients
def status(self) -> Dict[str, bool]:
"""Return running status for each client."""
return {client.id: client.running() for client in self._clients}