File size: 4,681 Bytes
ce9e9da
93d79eb
 
 
ce9e9da
 
93d79eb
 
 
 
 
 
 
 
 
 
 
 
ce9e9da
ac10cac
93d79eb
 
 
 
 
 
 
 
 
ce9e9da
93d79eb
 
 
 
7db4283
93d79eb
 
 
ff54322
93d79eb
5cb387e
ff54322
 
 
 
 
 
 
 
 
 
5cb387e
ff54322
 
 
 
93d79eb
ce9e9da
 
 
 
 
93d79eb
 
 
 
ce9e9da
 
 
 
 
 
ac10cac
 
 
 
 
 
 
 
 
 
 
 
 
ce9e9da
 
 
 
 
 
 
5cb387e
ce9e9da
 
 
 
7db4283
93d79eb
ce9e9da
5cb387e
ce9e9da
 
 
 
 
 
 
 
 
 
 
ff54322
 
ce9e9da
93d79eb
 
 
 
 
 
 
 
5cb387e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import asyncio
from collections import deque
from typing import Dict, List, Optional

from loguru import logger

from ..utils import g_config
from ..utils.singleton import Singleton
from .client import GeminiClientWrapper


class GeminiClientPool(metaclass=Singleton):
    """Pool of GeminiClient instances identified by unique ids."""

    def __init__(self) -> None:
        self._clients: List[GeminiClientWrapper] = []
        self._id_map: Dict[str, GeminiClientWrapper] = {}
        self._round_robin: deque[GeminiClientWrapper] = deque()
        self._restart_locks: Dict[str, asyncio.Lock] = {}
        self._round_robin_lock = asyncio.Lock()  # Lock for thread-safe round-robin

        if len(g_config.gemini.clients) == 0:
            raise ValueError("No Gemini clients configured")

        for c in g_config.gemini.clients:
            client = GeminiClientWrapper(
                client_id=c.id,
                secure_1psid=c.secure_1psid,
                secure_1psidts=c.secure_1psidts,
                proxy=c.proxy,
            )
            self._clients.append(client)
            self._id_map[c.id] = client
            self._round_robin.append(client)
            self._restart_locks[c.id] = asyncio.Lock()

    async def init(self) -> None:
        """Initialize all clients in the pool."""
        success_count = 0
        for client in self._clients:
            if not client.running():
                try:
                    await client.init(
                        timeout=g_config.gemini.timeout,
                        auto_refresh=g_config.gemini.auto_refresh,
                        verbose=g_config.gemini.verbose,
                        refresh_interval=g_config.gemini.refresh_interval,
                    )
                except Exception:
                    logger.exception(f"Failed to initialize client {client.id}")

            if client.running():
                success_count += 1

        if success_count == 0:
            raise RuntimeError("Failed to initialize any Gemini clients")

    async def acquire(self, client_id: Optional[str] = None) -> GeminiClientWrapper:
        """Return a healthy client by id or using round-robin."""
        if not self._round_robin:
            raise RuntimeError("No Gemini clients configured")

        if client_id:
            client = self._id_map.get(client_id)
            if not client:
                raise ValueError(f"Client id {client_id} not found")
            if await self._ensure_client_ready(client):
                return client
            raise RuntimeError(
                f"Gemini client {client_id} is not running and could not be restarted"
            )

        # Thread-safe round-robin: try each client once
        tried_clients = set()
        while len(tried_clients) < len(self._round_robin):
            # Atomically get next client
            async with self._round_robin_lock:
                client = self._round_robin[0]
                self._round_robin.rotate(-1)
                if client in tried_clients:
                    # Already tried all clients
                    break
                tried_clients.add(client)
            
            # Check readiness outside lock to avoid blocking other requests
            if await self._ensure_client_ready(client):
                return client

        raise RuntimeError("No Gemini clients are currently available")

    async def _ensure_client_ready(self, client: GeminiClientWrapper) -> bool:
        """Make sure the client is running, attempting a restart if needed."""
        if client.running():
            return True

        lock = self._restart_locks.get(client.id)
        if lock is None:
            return False

        async with lock:
            if client.running():
                return True

            try:
                await client.init(
                    timeout=g_config.gemini.timeout,
                    auto_refresh=g_config.gemini.auto_refresh,
                    verbose=g_config.gemini.verbose,
                    refresh_interval=g_config.gemini.refresh_interval,
                )
                logger.info(f"Restarted Gemini client {client.id} after it stopped.")
                return True
            except Exception:
                logger.exception(f"Failed to restart Gemini client {client.id}")
                return False

    @property
    def clients(self) -> List[GeminiClientWrapper]:
        """Return managed clients."""
        return self._clients

    def status(self) -> Dict[str, bool]:
        """Return running status for each client."""
        return {client.id: client.running() for client in self._clients}