Spaces:
Paused
Paused
File size: 4,681 Bytes
ce9e9da 93d79eb ce9e9da 93d79eb ce9e9da ac10cac 93d79eb ce9e9da 93d79eb 7db4283 93d79eb ff54322 93d79eb 5cb387e ff54322 5cb387e ff54322 93d79eb ce9e9da 93d79eb ce9e9da ac10cac ce9e9da 5cb387e ce9e9da 7db4283 93d79eb ce9e9da 5cb387e ce9e9da ff54322 ce9e9da 93d79eb 5cb387e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | import asyncio
from collections import deque
from typing import Dict, List, Optional
from loguru import logger
from ..utils import g_config
from ..utils.singleton import Singleton
from .client import GeminiClientWrapper
class GeminiClientPool(metaclass=Singleton):
"""Pool of GeminiClient instances identified by unique ids."""
def __init__(self) -> None:
self._clients: List[GeminiClientWrapper] = []
self._id_map: Dict[str, GeminiClientWrapper] = {}
self._round_robin: deque[GeminiClientWrapper] = deque()
self._restart_locks: Dict[str, asyncio.Lock] = {}
self._round_robin_lock = asyncio.Lock() # Lock for thread-safe round-robin
if len(g_config.gemini.clients) == 0:
raise ValueError("No Gemini clients configured")
for c in g_config.gemini.clients:
client = GeminiClientWrapper(
client_id=c.id,
secure_1psid=c.secure_1psid,
secure_1psidts=c.secure_1psidts,
proxy=c.proxy,
)
self._clients.append(client)
self._id_map[c.id] = client
self._round_robin.append(client)
self._restart_locks[c.id] = asyncio.Lock()
async def init(self) -> None:
"""Initialize all clients in the pool."""
success_count = 0
for client in self._clients:
if not client.running():
try:
await client.init(
timeout=g_config.gemini.timeout,
auto_refresh=g_config.gemini.auto_refresh,
verbose=g_config.gemini.verbose,
refresh_interval=g_config.gemini.refresh_interval,
)
except Exception:
logger.exception(f"Failed to initialize client {client.id}")
if client.running():
success_count += 1
if success_count == 0:
raise RuntimeError("Failed to initialize any Gemini clients")
async def acquire(self, client_id: Optional[str] = None) -> GeminiClientWrapper:
"""Return a healthy client by id or using round-robin."""
if not self._round_robin:
raise RuntimeError("No Gemini clients configured")
if client_id:
client = self._id_map.get(client_id)
if not client:
raise ValueError(f"Client id {client_id} not found")
if await self._ensure_client_ready(client):
return client
raise RuntimeError(
f"Gemini client {client_id} is not running and could not be restarted"
)
# Thread-safe round-robin: try each client once
tried_clients = set()
while len(tried_clients) < len(self._round_robin):
# Atomically get next client
async with self._round_robin_lock:
client = self._round_robin[0]
self._round_robin.rotate(-1)
if client in tried_clients:
# Already tried all clients
break
tried_clients.add(client)
# Check readiness outside lock to avoid blocking other requests
if await self._ensure_client_ready(client):
return client
raise RuntimeError("No Gemini clients are currently available")
async def _ensure_client_ready(self, client: GeminiClientWrapper) -> bool:
"""Make sure the client is running, attempting a restart if needed."""
if client.running():
return True
lock = self._restart_locks.get(client.id)
if lock is None:
return False
async with lock:
if client.running():
return True
try:
await client.init(
timeout=g_config.gemini.timeout,
auto_refresh=g_config.gemini.auto_refresh,
verbose=g_config.gemini.verbose,
refresh_interval=g_config.gemini.refresh_interval,
)
logger.info(f"Restarted Gemini client {client.id} after it stopped.")
return True
except Exception:
logger.exception(f"Failed to restart Gemini client {client.id}")
return False
@property
def clients(self) -> List[GeminiClientWrapper]:
"""Return managed clients."""
return self._clients
def status(self) -> Dict[str, bool]:
"""Return running status for each client."""
return {client.id: client.running() for client in self._clients}
|