Yongkun Li Copilot commited on
Commit
93d79eb
·
unverified ·
1 Parent(s): 9d88248

:sparkles: Implement Gemini client pool (#7)

Browse files

* 📝 docs: clarify client id usage

* :sparkles: Add logging and checking

* 🔧 fix env override for list items

* :bug: Fix config reading and singleton of client pool

* :loud_sound: Add more logs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

README.md CHANGED
@@ -48,11 +48,13 @@ pip install -e .
48
 
49
  ### Configuration
50
 
51
- Edit `config/config.yaml` and set your Gemini credentials:
52
  ```yaml
53
  gemini:
54
- secure_1psid: "YOUR_SECURE_1PSID_HERE"
55
- secure_1psidts: "YOUR_SECURE_1PSIDTS_HERE"
 
 
56
  ```
57
 
58
  > [!NOTE]
@@ -79,8 +81,9 @@ docker run -p 8000:8000 \
79
  -v $(pwd)/config:/app/config \
80
  -v $(pwd)/data:/app/data \
81
  -e CONFIG_SERVER__API_KEY="your-api-key-here" \
82
- -e CONFIG_GEMINI__SECURE_1PSID="your-secure-1psid" \
83
- -e CONFIG_GEMINI__SECURE_1PSIDTS="your-secure-1psidts" \
 
84
  ghcr.io/nativu5/gemini-fastapi
85
  ```
86
 
@@ -102,8 +105,9 @@ services:
102
  - CONFIG_SERVER__HOST=0.0.0.0
103
  - CONFIG_SERVER__PORT=8000
104
  - CONFIG_SERVER__API_KEY=${API_KEY}
105
- - CONFIG_GEMINI__SECURE_1PSID=${SECURE_1PSID}
106
- - CONFIG_GEMINI__SECURE_1PSIDTS=${SECURE_1PSIDTS}
 
107
  restart: on-failure:3 # Avoid retrying too many times
108
  ```
109
 
@@ -134,14 +138,21 @@ You can override any configuration option using environment variables with the `
134
  # Override server settings
135
  export CONFIG_SERVER__API_KEY="your-secure-api-key"
136
 
137
- # Override Gemini credentials
138
- export CONFIG_GEMINI__SECURE_1PSID="your-secure-1psid"
139
- export CONFIG_GEMINI__SECURE_1PSIDTS="your-secure-1psidts"
 
140
 
141
  # Override conversation storage size limit
142
  export CONFIG_STORAGE__MAX_SIZE=268435456 # 256 MB
143
  ```
144
 
 
 
 
 
 
 
145
  ### Gemini Credentials
146
 
147
  > [!WARNING]
 
48
 
49
  ### Configuration
50
 
51
+ Edit `config/config.yaml` and provide at least one credential pair:
52
  ```yaml
53
  gemini:
54
+ clients:
55
+ - id: "client-a"
56
+ secure_1psid: "YOUR_SECURE_1PSID_HERE"
57
+ secure_1psidts: "YOUR_SECURE_1PSIDTS_HERE"
58
  ```
59
 
60
  > [!NOTE]
 
81
  -v $(pwd)/config:/app/config \
82
  -v $(pwd)/data:/app/data \
83
  -e CONFIG_SERVER__API_KEY="your-api-key-here" \
84
+ -e CONFIG_GEMINI__CLIENTS__0__ID="client-a" \
85
+ -e CONFIG_GEMINI__CLIENTS__0__SECURE_1PSID="your-secure-1psid" \
86
+ -e CONFIG_GEMINI__CLIENTS__0__SECURE_1PSIDTS="your-secure-1psidts" \
87
  ghcr.io/nativu5/gemini-fastapi
88
  ```
89
 
 
105
  - CONFIG_SERVER__HOST=0.0.0.0
106
  - CONFIG_SERVER__PORT=8000
107
  - CONFIG_SERVER__API_KEY=${API_KEY}
108
+ - CONFIG_GEMINI__CLIENTS__0__ID=client-a
109
+ - CONFIG_GEMINI__CLIENTS__0__SECURE_1PSID=${SECURE_1PSID}
110
+ - CONFIG_GEMINI__CLIENTS__0__SECURE_1PSIDTS=${SECURE_1PSIDTS}
111
  restart: on-failure:3 # Avoid retrying too many times
112
  ```
113
 
 
138
  # Override server settings
139
  export CONFIG_SERVER__API_KEY="your-secure-api-key"
140
 
141
+ # Override Gemini credentials (first client)
142
+ export CONFIG_GEMINI__CLIENTS__0__ID="client-a"
143
+ export CONFIG_GEMINI__CLIENTS__0__SECURE_1PSID="your-secure-1psid"
144
+ export CONFIG_GEMINI__CLIENTS__0__SECURE_1PSIDTS="your-secure-1psidts"
145
 
146
  # Override conversation storage size limit
147
  export CONFIG_STORAGE__MAX_SIZE=268435456 # 256 MB
148
  ```
149
 
150
+ ### Client IDs and Conversation Reuse
151
+
152
+ Conversations are stored with the ID of the client that generated them.
153
+ Keep these identifiers stable in your configuration so that sessions remain valid
154
+ when you update the cookie list.
155
+
156
  ### Gemini Credentials
157
 
158
  > [!WARNING]
README.zh.md CHANGED
@@ -49,11 +49,13 @@ pip install -e .
49
 
50
  ### 配置
51
 
52
- 编辑 `config/config.yaml` 并填写 Gemini 凭证:
53
  ```yaml
54
  gemini:
55
- secure_1psid: "YOUR_SECURE_1PSID_HERE"
56
- secure_1psidts: "YOUR_SECURE_1PSIDTS_HERE"
 
 
57
  ```
58
 
59
  > [!NOTE]
@@ -80,8 +82,9 @@ docker run -p 8000:8000 \
80
  -v $(pwd)/config:/app/config \
81
  -v $(pwd)/data:/app/data \
82
  -e CONFIG_SERVER__API_KEY="your-api-key-here" \
83
- -e CONFIG_GEMINI__SECURE_1PSID="your-secure-1psid" \
84
- -e CONFIG_GEMINI__SECURE_1PSIDTS="your-secure-1psidts" \
 
85
  ghcr.io/nativu5/gemini-fastapi
86
  ```
87
 
@@ -103,8 +106,9 @@ services:
103
  - CONFIG_SERVER__HOST=0.0.0.0
104
  - CONFIG_SERVER__PORT=8000
105
  - CONFIG_SERVER__API_KEY=${API_KEY}
106
- - CONFIG_GEMINI__SECURE_1PSID=${SECURE_1PSID}
107
- - CONFIG_GEMINI__SECURE_1PSIDTS=${SECURE_1PSIDTS}
 
108
  restart: on-failure:3 # 避免过多重试
109
  ```
110
 
@@ -136,13 +140,19 @@ docker compose up -d
136
  export CONFIG_SERVER__API_KEY="your-secure-api-key"
137
 
138
  # 覆盖 Gemini Cookie
139
- export CONFIG_GEMINI__SECURE_1PSID="your-secure-1psid"
140
- export CONFIG_GEMINI__SECURE_1PSIDTS="your-secure-1psidts"
 
141
 
142
  # 覆盖对话存储大小限制
143
  export CONFIG_STORAGE__MAX_SIZE=268435456 # 256 MB
144
  ```
145
 
 
 
 
 
 
146
  ### Gemini 凭据
147
 
148
  > [!WARNING]
 
49
 
50
  ### 配置
51
 
52
+ 编辑 `config/config.yaml` 并提供至少一组凭证:
53
  ```yaml
54
  gemini:
55
+ clients:
56
+ - id: "client-a"
57
+ secure_1psid: "YOUR_SECURE_1PSID_HERE"
58
+ secure_1psidts: "YOUR_SECURE_1PSIDTS_HERE"
59
  ```
60
 
61
  > [!NOTE]
 
82
  -v $(pwd)/config:/app/config \
83
  -v $(pwd)/data:/app/data \
84
  -e CONFIG_SERVER__API_KEY="your-api-key-here" \
85
+ -e CONFIG_GEMINI__CLIENTS__0__ID="client-a" \
86
+ -e CONFIG_GEMINI__CLIENTS__0__SECURE_1PSID="your-secure-1psid" \
87
+ -e CONFIG_GEMINI__CLIENTS__0__SECURE_1PSIDTS="your-secure-1psidts" \
88
  ghcr.io/nativu5/gemini-fastapi
89
  ```
90
 
 
106
  - CONFIG_SERVER__HOST=0.0.0.0
107
  - CONFIG_SERVER__PORT=8000
108
  - CONFIG_SERVER__API_KEY=${API_KEY}
109
+ - CONFIG_GEMINI__CLIENTS__0__ID=client-a
110
+ - CONFIG_GEMINI__CLIENTS__0__SECURE_1PSID=${SECURE_1PSID}
111
+ - CONFIG_GEMINI__CLIENTS__0__SECURE_1PSIDTS=${SECURE_1PSIDTS}
112
  restart: on-failure:3 # 避免过多重试
113
  ```
114
 
 
140
  export CONFIG_SERVER__API_KEY="your-secure-api-key"
141
 
142
  # 覆盖 Gemini Cookie
143
+ export CONFIG_GEMINI__CLIENTS__0__ID="client-a"
144
+ export CONFIG_GEMINI__CLIENTS__0__SECURE_1PSID="your-secure-1psid"
145
+ export CONFIG_GEMINI__CLIENTS__0__SECURE_1PSIDTS="your-secure-1psidts"
146
 
147
  # 覆盖对话存储大小限制
148
  export CONFIG_STORAGE__MAX_SIZE=268435456 # 256 MB
149
  ```
150
 
151
+ ### 客户端 ID 与会话重用
152
+
153
+ 会话在保存时会绑定创建它的客户端 ID。请在配置中保持这些 `id` 值稳定,
154
+ 这样在更新 Cookie 列表时依然可以复用旧会话。
155
+
156
  ### Gemini 凭据
157
 
158
  > [!WARNING]
app/main.py CHANGED
@@ -6,20 +6,20 @@ from loguru import logger
6
  from .server.chat import router as chat_router
7
  from .server.health import router as health_router
8
  from .server.middleware import add_cors_middleware, add_exception_handler
9
- from .services.client import SingletonGeminiClient
10
 
11
 
12
  @asynccontextmanager
13
  async def lifespan(app: FastAPI):
14
  try:
15
- client = SingletonGeminiClient()
16
- await client.init()
17
  except Exception as e:
18
- logger.exception(f"Failed to initialize Gemini client: {e}")
19
  raise
20
 
21
- logger.info("Gemini client initialized on server startup.")
22
- logger.info("Gemini API Server ready to serve requests.")
23
  yield
24
 
25
 
 
6
  from .server.chat import router as chat_router
7
  from .server.health import router as health_router
8
  from .server.middleware import add_cors_middleware, add_exception_handler
9
+ from .services.pool import GeminiClientPool
10
 
11
 
12
  @asynccontextmanager
13
  async def lifespan(app: FastAPI):
14
  try:
15
+ pool = GeminiClientPool()
16
+ await pool.init()
17
  except Exception as e:
18
+ logger.exception(f"Failed to initialize Gemini clients: {e}")
19
  raise
20
 
21
+ logger.success(f"Gemini clients initialized: {[c.id for c in pool.clients]}.")
22
+ logger.success("Gemini API Server ready to serve requests.")
23
  yield
24
 
25
 
app/models/models.py CHANGED
@@ -84,6 +84,7 @@ class HealthCheckResponse(BaseModel):
84
 
85
  ok: bool
86
  storage: Optional[Dict[str, str | int]] = None
 
87
  error: Optional[str] = None
88
 
89
 
@@ -95,6 +96,7 @@ class ConversationInStore(BaseModel):
95
 
96
  # NOTE: Gemini Web API do not support changing models once a conversation is created.
97
  model: str = Field(..., description="Model used for the conversation")
 
98
  metadata: list[str | None] = Field(
99
  ..., description="Metadata for Gemini API to locate the conversation"
100
  )
 
84
 
85
  ok: bool
86
  storage: Optional[Dict[str, str | int]] = None
87
+ clients: Optional[Dict[str, bool]] = None
88
  error: Optional[str] = None
89
 
90
 
 
96
 
97
  # NOTE: Gemini Web API do not support changing models once a conversation is created.
98
  model: str = Field(..., description="Model used for the conversation")
99
+ client_id: str = Field(..., description="Identifier of the Gemini client")
100
  metadata: list[str | None] = Field(
101
  ..., description="Metadata for Gemini API to locate the conversation"
102
  )
app/server/chat.py CHANGED
@@ -15,7 +15,11 @@ from ..models import (
15
  ModelData,
16
  ModelListResponse,
17
  )
18
- from ..services import LMDBConversationStore, SingletonGeminiClient
 
 
 
 
19
  from ..utils.helper import estimate_tokens
20
  from .middleware import get_temp_dir, verify_api_key
21
 
@@ -49,7 +53,7 @@ async def create_chat_completion(
49
  api_key: str = Depends(verify_api_key),
50
  tmp_dir: Path = Depends(get_temp_dir),
51
  ):
52
- client = SingletonGeminiClient()
53
  db = LMDBConversationStore()
54
  model = Model.from_name(request.model)
55
 
@@ -61,10 +65,12 @@ async def create_chat_completion(
61
 
62
  # Check if conversation is reusable
63
  session = None
 
64
  if _check_reusable(request.messages):
65
  try:
66
  # Exclude the last message from user
67
  if old_conv := db.find(model.model_name, request.messages[:-1]):
 
68
  session = client.start_chat(metadata=old_conv.metadata, model=model)
69
  except Exception as e:
70
  session = None
@@ -72,15 +78,18 @@ async def create_chat_completion(
72
 
73
  if session:
74
  # Just send the last message to the existing session
75
- model_input, files = await client.process_message(
76
  request.messages[-1], tmp_dir, tagged=False
77
  )
78
  logger.debug(f"Found reusable session: {session.metadata}")
79
  else:
80
  # Start a new session and concat messages into a single string
81
- session = client.start_chat(model=model)
82
  try:
83
- model_input, files = await client.process_conversation(request.messages, tmp_dir)
 
 
 
 
84
  except ValueError as e:
85
  raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
86
  except Exception as e:
@@ -90,21 +99,25 @@ async def create_chat_completion(
90
 
91
  # Generate response
92
  try:
93
- logger.debug(f"Input length: {len(model_input)}, files count: {len(files)}")
 
 
 
94
  response = await session.send_message(model_input, files=files)
95
  except Exception as e:
96
  logger.exception(f"Error generating content from Gemini API: {e}")
97
  raise
98
 
99
  # Format and clean the output
100
- model_output = client.extract_output(response)
101
- stored_output = client.extract_output(response, include_thoughts=False)
102
 
103
  # After cleaning, persist the conversation
104
  try:
105
  last_message = Message(role="assistant", content=stored_output)
106
  conv = ConversationInStore(
107
  model=model.model_name,
 
108
  metadata=session.metadata,
109
  messages=[*request.messages, last_message],
110
  )
 
15
  ModelData,
16
  ModelListResponse,
17
  )
18
+ from ..services import (
19
+ GeminiClientPool,
20
+ GeminiClientWrapper,
21
+ LMDBConversationStore,
22
+ )
23
  from ..utils.helper import estimate_tokens
24
  from .middleware import get_temp_dir, verify_api_key
25
 
 
53
  api_key: str = Depends(verify_api_key),
54
  tmp_dir: Path = Depends(get_temp_dir),
55
  ):
56
+ pool = GeminiClientPool()
57
  db = LMDBConversationStore()
58
  model = Model.from_name(request.model)
59
 
 
65
 
66
  # Check if conversation is reusable
67
  session = None
68
+ client = None
69
  if _check_reusable(request.messages):
70
  try:
71
  # Exclude the last message from user
72
  if old_conv := db.find(model.model_name, request.messages[:-1]):
73
+ client = pool.acquire(old_conv.client_id)
74
  session = client.start_chat(metadata=old_conv.metadata, model=model)
75
  except Exception as e:
76
  session = None
 
78
 
79
  if session:
80
  # Just send the last message to the existing session
81
+ model_input, files = await GeminiClientWrapper.process_message(
82
  request.messages[-1], tmp_dir, tagged=False
83
  )
84
  logger.debug(f"Found reusable session: {session.metadata}")
85
  else:
86
  # Start a new session and concat messages into a single string
 
87
  try:
88
+ client = pool.acquire()
89
+ session = client.start_chat(model=model)
90
+ model_input, files = await GeminiClientWrapper.process_conversation(
91
+ request.messages, tmp_dir
92
+ )
93
  except ValueError as e:
94
  raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
95
  except Exception as e:
 
99
 
100
  # Generate response
101
  try:
102
+ assert session and client, "Session and client not available"
103
+ logger.debug(
104
+ f"Client ID: {client.id}, Input length: {len(model_input)}, files count: {len(files)}"
105
+ )
106
  response = await session.send_message(model_input, files=files)
107
  except Exception as e:
108
  logger.exception(f"Error generating content from Gemini API: {e}")
109
  raise
110
 
111
  # Format and clean the output
112
+ model_output = GeminiClientWrapper.extract_output(response)
113
+ stored_output = GeminiClientWrapper.extract_output(response, include_thoughts=False)
114
 
115
  # After cleaning, persist the conversation
116
  try:
117
  last_message = Message(role="assistant", content=stored_output)
118
  conv = ConversationInStore(
119
  model=model.model_name,
120
+ client_id=client.id,
121
  metadata=session.metadata,
122
  messages=[*request.messages, last_message],
123
  )
app/server/health.py CHANGED
@@ -2,26 +2,32 @@ from fastapi import APIRouter
2
  from loguru import logger
3
 
4
  from ..models import HealthCheckResponse
5
- from ..services import LMDBConversationStore, SingletonGeminiClient
6
 
7
  router = APIRouter()
8
 
9
 
10
  @router.get("/health", response_model=HealthCheckResponse)
11
  async def health_check():
12
- client = SingletonGeminiClient()
13
  db = LMDBConversationStore()
14
 
15
- if not client.running:
16
- try:
17
- await client.init()
18
- except Exception as e:
19
- logger.error(f"Failed to initialize Gemini client: {e}")
20
- return HealthCheckResponse(ok=False, error=str(e))
 
 
 
 
21
 
22
  stat = db.stats()
23
  if not stat:
24
  logger.error("Failed to retrieve LMDB conversation store stats")
25
- return HealthCheckResponse(ok=False, error="LMDB conversation store unavailable")
 
 
26
 
27
- return HealthCheckResponse(ok=True, storage=stat)
 
2
  from loguru import logger
3
 
4
  from ..models import HealthCheckResponse
5
+ from ..services import GeminiClientPool, LMDBConversationStore
6
 
7
  router = APIRouter()
8
 
9
 
10
  @router.get("/health", response_model=HealthCheckResponse)
11
  async def health_check():
12
+ pool = GeminiClientPool()
13
  db = LMDBConversationStore()
14
 
15
+ try:
16
+ await pool.init()
17
+ except Exception as e:
18
+ logger.error(f"Failed to initialize Gemini clients: {e}")
19
+ return HealthCheckResponse(ok=False, error=str(e))
20
+
21
+ client_status = pool.status()
22
+
23
+ if not all(client_status.values()):
24
+ logger.warning("One or more Gemini clients not running")
25
 
26
  stat = db.stats()
27
  if not stat:
28
  logger.error("Failed to retrieve LMDB conversation store stats")
29
+ return HealthCheckResponse(
30
+ ok=False, error="LMDB conversation store unavailable", clients=client_status
31
+ )
32
 
33
+ return HealthCheckResponse(ok=all(client_status.values()), storage=stat, clients=client_status)
app/services/__init__.py CHANGED
@@ -1,7 +1,9 @@
1
- from .client import SingletonGeminiClient
2
  from .lmdb import LMDBConversationStore
 
3
 
4
  __all__ = [
 
 
5
  "LMDBConversationStore",
6
- "SingletonGeminiClient",
7
  ]
 
1
+ from .client import GeminiClientWrapper
2
  from .lmdb import LMDBConversationStore
3
+ from .pool import GeminiClientPool
4
 
5
  __all__ = [
6
+ "GeminiClientPool",
7
+ "GeminiClientWrapper",
8
  "LMDBConversationStore",
 
9
  ]
app/services/client.py CHANGED
@@ -6,15 +6,14 @@ from gemini_webapi import GeminiClient, ModelOutput
6
  from ..models import Message
7
  from ..utils import g_config
8
  from ..utils.helper import add_tag, save_file_to_tempfile, save_url_to_tempfile
9
- from ..utils.singleton import Singleton
10
 
11
 
12
- class SingletonGeminiClient(GeminiClient, metaclass=Singleton):
13
- def __init__(self, **kwargs):
14
- kwargs.setdefault("secure_1psid", g_config.gemini.secure_1psid)
15
- kwargs.setdefault("secure_1psidts", g_config.gemini.secure_1psidts)
16
 
 
17
  super().__init__(**kwargs)
 
18
 
19
  async def init(self, **kwargs):
20
  # Inject default configuration values
@@ -77,7 +76,7 @@ class SingletonGeminiClient(GeminiClient, metaclass=Singleton):
77
  files: list[Path | str] = []
78
 
79
  for msg in messages:
80
- input_part, files_part = await SingletonGeminiClient.process_message(msg, tempdir)
81
  conversation.append(input_part)
82
  files.extend(files_part)
83
 
 
6
  from ..models import Message
7
  from ..utils import g_config
8
  from ..utils.helper import add_tag, save_file_to_tempfile, save_url_to_tempfile
 
9
 
10
 
11
+ class GeminiClientWrapper(GeminiClient):
12
+ """Gemini client with helper methods."""
 
 
13
 
14
+ def __init__(self, client_id: str, **kwargs):
15
  super().__init__(**kwargs)
16
+ self.id = client_id
17
 
18
  async def init(self, **kwargs):
19
  # Inject default configuration values
 
76
  files: list[Path | str] = []
77
 
78
  for msg in messages:
79
+ input_part, files_part = await GeminiClientWrapper.process_message(msg, tempdir)
80
  conversation.append(input_part)
81
  files.extend(files_part)
82
 
app/services/lmdb.py CHANGED
@@ -21,10 +21,11 @@ def hash_message(message: Message) -> str:
21
  return hashlib.sha256(message_bytes).hexdigest()
22
 
23
 
24
- def hash_conversation(model: str, messages: List[Message]) -> str:
25
- """Generate a hash for a list of messages."""
26
  # Create a combined hash from all individual message hashes
27
  combined_hash = hashlib.sha256()
 
28
  combined_hash.update(model.encode("utf-8"))
29
  for message in messages:
30
  message_hash = hash_message(message)
@@ -35,7 +36,7 @@ def hash_conversation(model: str, messages: List[Message]) -> str:
35
  class LMDBConversationStore(metaclass=Singleton):
36
  """LMDB-based storage for Message lists with hash-based key-value operations."""
37
 
38
- CUSTOM_KEY_BINDING_PREFIX = "hash:"
39
 
40
  def __init__(self, db_path: Optional[str] = None, max_db_size: Optional[int] = None):
41
  """
@@ -115,7 +116,7 @@ class LMDBConversationStore(metaclass=Singleton):
115
  raise ValueError("Messages list cannot be empty")
116
 
117
  # Generate hash for the message list
118
- message_hash = hash_conversation(conv.model, conv.messages)
119
  storage_key = custom_key or message_hash
120
 
121
  # Prepare data for storage
@@ -132,11 +133,10 @@ class LMDBConversationStore(metaclass=Singleton):
132
  txn.put(storage_key.encode("utf-8"), value, overwrite=True)
133
 
134
  # Store hash -> key mapping for reverse lookup
135
- if custom_key:
136
- txn.put(
137
- f"{self.CUSTOM_KEY_BINDING_PREFIX}{message_hash}".encode("utf-8"),
138
- custom_key.encode("utf-8"),
139
- )
140
 
141
  logger.debug(f"Stored {len(conv.messages)} messages with key: {storage_key}")
142
  return storage_key
@@ -185,21 +185,21 @@ class LMDBConversationStore(metaclass=Singleton):
185
  if not messages:
186
  return None
187
 
188
- message_hash = hash_conversation(model, messages)
189
- key = f"{self.CUSTOM_KEY_BINDING_PREFIX}{message_hash}"
190
-
191
- try:
192
- with self._get_transaction(write=False) as txn:
193
- # Try custom key binding first
194
- key = txn.get(key.encode("utf-8"), default=None)
195
- key = key.decode("utf-8") if key else message_hash # type: ignore
196
-
197
- # Fallback to hash if no custom key found
198
- return self.get(key)
199
-
200
- except Exception as e:
201
- logger.error(f"Failed to retrieve messages by message list: {e}")
202
- return None
203
 
204
  def exists(self, key: str) -> bool:
205
  """
@@ -237,16 +237,16 @@ class LMDBConversationStore(metaclass=Singleton):
237
 
238
  storage_data = orjson.loads(data) # type: ignore
239
  conv = ConversationInStore.model_validate(storage_data)
240
- message_hash = hash_conversation(conv.model, conv.messages)
241
 
242
  # Delete main data
243
  txn.delete(key.encode("utf-8"))
244
 
245
  # Clean up hash mapping if it exists
246
  if message_hash and key != message_hash:
247
- txn.delete(f"{self.CUSTOM_KEY_BINDING_PREFIX}{message_hash}".encode("utf-8"))
248
 
249
- logger.info(f"Deleted messages with key: {key}")
250
  return conv
251
 
252
  except Exception as e:
@@ -274,7 +274,7 @@ class LMDBConversationStore(metaclass=Singleton):
274
  for key, _ in cursor:
275
  key_str = key.decode("utf-8")
276
  # Skip internal hash mappings
277
- if key_str.startswith(self.CUSTOM_KEY_BINDING_PREFIX):
278
  continue
279
 
280
  if not prefix or key_str.startswith(prefix):
 
21
  return hashlib.sha256(message_bytes).hexdigest()
22
 
23
 
24
+ def hash_conversation(client_id: str, model: str, messages: List[Message]) -> str:
25
+ """Generate a hash for a list of messages and client id."""
26
  # Create a combined hash from all individual message hashes
27
  combined_hash = hashlib.sha256()
28
+ combined_hash.update(client_id.encode("utf-8"))
29
  combined_hash.update(model.encode("utf-8"))
30
  for message in messages:
31
  message_hash = hash_message(message)
 
36
  class LMDBConversationStore(metaclass=Singleton):
37
  """LMDB-based storage for Message lists with hash-based key-value operations."""
38
 
39
+ HASH_LOOKUP_PREFIX = "hash:"
40
 
41
  def __init__(self, db_path: Optional[str] = None, max_db_size: Optional[int] = None):
42
  """
 
116
  raise ValueError("Messages list cannot be empty")
117
 
118
  # Generate hash for the message list
119
+ message_hash = hash_conversation(conv.client_id, conv.model, conv.messages)
120
  storage_key = custom_key or message_hash
121
 
122
  # Prepare data for storage
 
133
  txn.put(storage_key.encode("utf-8"), value, overwrite=True)
134
 
135
  # Store hash -> key mapping for reverse lookup
136
+ txn.put(
137
+ f"{self.HASH_LOOKUP_PREFIX}{message_hash}".encode("utf-8"),
138
+ storage_key.encode("utf-8"),
139
+ )
 
140
 
141
  logger.debug(f"Stored {len(conv.messages)} messages with key: {storage_key}")
142
  return storage_key
 
185
  if not messages:
186
  return None
187
 
188
+ for c in g_config.gemini.clients:
189
+ message_hash = hash_conversation(c.id, model, messages)
190
+ key = f"{self.HASH_LOOKUP_PREFIX}{message_hash}"
191
+ try:
192
+ with self._get_transaction(write=False) as txn:
193
+ mapped = txn.get(key.encode("utf-8"))
194
+ if mapped:
195
+ return self.get(mapped.decode("utf-8")) # type: ignore
196
+ except Exception as e:
197
+ logger.error(f"Failed to retrieve messages by message list for client {c.id}: {e}")
198
+ continue
199
+
200
+ if conv := self.get(message_hash):
201
+ return conv
202
+ return None
203
 
204
  def exists(self, key: str) -> bool:
205
  """
 
237
 
238
  storage_data = orjson.loads(data) # type: ignore
239
  conv = ConversationInStore.model_validate(storage_data)
240
+ message_hash = hash_conversation(conv.client_id, conv.model, conv.messages)
241
 
242
  # Delete main data
243
  txn.delete(key.encode("utf-8"))
244
 
245
  # Clean up hash mapping if it exists
246
  if message_hash and key != message_hash:
247
+ txn.delete(f"{self.HASH_LOOKUP_PREFIX}{message_hash}".encode("utf-8"))
248
 
249
+ logger.debug(f"Deleted messages with key: {key}")
250
  return conv
251
 
252
  except Exception as e:
 
274
  for key, _ in cursor:
275
  key_str = key.decode("utf-8")
276
  # Skip internal hash mappings
277
+ if key_str.startswith(self.HASH_LOOKUP_PREFIX):
278
  continue
279
 
280
  if not prefix or key_str.startswith(prefix):
app/services/pool.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import deque
2
+ from typing import Dict, List, Optional
3
+
4
+ from ..utils import g_config
5
+ from ..utils.singleton import Singleton
6
+ from .client import GeminiClientWrapper
7
+
8
+
9
+ class GeminiClientPool(metaclass=Singleton):
10
+ """Pool of GeminiClient instances identified by unique ids."""
11
+
12
+ def __init__(self) -> None:
13
+ self._clients: List[GeminiClientWrapper] = []
14
+ self._id_map: Dict[str, GeminiClientWrapper] = {}
15
+ self._round_robin: deque[GeminiClientWrapper] = deque()
16
+
17
+ if len(g_config.gemini.clients) == 0:
18
+ raise ValueError("No Gemini clients configured")
19
+
20
+ for c in g_config.gemini.clients:
21
+ client = GeminiClientWrapper(
22
+ client_id=c.id,
23
+ secure_1psid=c.secure_1psid,
24
+ secure_1psidts=c.secure_1psidts,
25
+ )
26
+ self._clients.append(client)
27
+ self._id_map[c.id] = client
28
+ self._round_robin.append(client)
29
+
30
+ async def init(self) -> None:
31
+ """Initialize all clients in the pool."""
32
+ for client in self._clients:
33
+ if not client.running:
34
+ await client.init(
35
+ timeout=g_config.gemini.timeout,
36
+ auto_refresh=g_config.gemini.auto_refresh,
37
+ verbose=g_config.gemini.verbose,
38
+ refresh_interval=g_config.gemini.refresh_interval,
39
+ )
40
+
41
+ def acquire(self, client_id: Optional[str] = None) -> GeminiClientWrapper:
42
+ """Return a client by id or using round-robin."""
43
+ if client_id:
44
+ client = self._id_map.get(client_id)
45
+ if not client:
46
+ raise ValueError(f"Client id {client_id} not found")
47
+ return client
48
+
49
+ client = self._round_robin[0]
50
+ self._round_robin.rotate(-1)
51
+ return client
52
+
53
+ @property
54
+ def clients(self) -> List[GeminiClientWrapper]:
55
+ """Return managed clients."""
56
+ return self._clients
57
+
58
+ def status(self) -> Dict[str, bool]:
59
+ """Return running status for each client."""
60
+ return {client.id: client.running for client in self._clients}
app/utils/config.py CHANGED
@@ -4,7 +4,11 @@ from typing import Literal, Optional
4
 
5
  from loguru import logger
6
  from pydantic import BaseModel, Field, ValidationError
7
- from pydantic_settings import BaseSettings, SettingsConfigDict, YamlConfigSettingsSource
 
 
 
 
8
 
9
  CONFIG_PATH = "config/config.yaml"
10
 
@@ -20,11 +24,20 @@ class ServerConfig(BaseModel):
20
  )
21
 
22
 
23
- class GeminiConfig(BaseModel):
24
- """Gemini API configuration"""
25
 
 
26
  secure_1psid: str = Field(..., description="Gemini Secure 1PSID")
27
  secure_1psidts: str = Field(..., description="Gemini Secure 1PSIDTS")
 
 
 
 
 
 
 
 
28
  timeout: int = Field(default=60, ge=1, description="Init timeout")
29
  auto_refresh: bool = Field(True, description="Enable auto-refresh for Gemini cookies")
30
  refresh_interval: int = Field(
@@ -50,8 +63,10 @@ class CORSConfig(BaseModel):
50
 
51
 
52
  class StorageConfig(BaseModel):
 
 
53
  path: str = Field(
54
- default="data/msg.lmdb",
55
  description="Path to the storage directory where data will be saved",
56
  )
57
  max_size: int = Field(
@@ -62,6 +77,8 @@ class StorageConfig(BaseModel):
62
 
63
 
64
  class LoggingConfig(BaseModel):
 
 
65
  level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(
66
  default="DEBUG",
67
  description="Logging level",
@@ -100,6 +117,7 @@ class Config(BaseSettings):
100
  model_config = SettingsConfigDict(
101
  env_prefix="CONFIG_",
102
  env_nested_delimiter="__",
 
103
  yaml_file=os.getenv("CONFIG_PATH", CONFIG_PATH),
104
  )
105
 
@@ -113,7 +131,55 @@ class Config(BaseSettings):
113
  file_secret_settings,
114
  ):
115
  """Read settings: env -> yaml -> default"""
116
- return (env_settings, YamlConfigSettingsSource(settings_cls))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
 
119
  def initialize_config() -> Config:
@@ -124,8 +190,18 @@ def initialize_config() -> Config:
124
  Config: Configuration object
125
  """
126
  try:
127
- # Using environment variables and YAML file for configuration
128
- return Config() # type: ignore
 
 
 
 
 
 
 
 
 
 
129
  except ValidationError as e:
130
  logger.error(f"Configuration validation failed: {e!s}")
131
  sys.exit(1)
 
4
 
5
  from loguru import logger
6
  from pydantic import BaseModel, Field, ValidationError
7
+ from pydantic_settings import (
8
+ BaseSettings,
9
+ SettingsConfigDict,
10
+ YamlConfigSettingsSource,
11
+ )
12
 
13
  CONFIG_PATH = "config/config.yaml"
14
 
 
24
  )
25
 
26
 
27
+ class GeminiClientSettings(BaseModel):
28
+ """Credential set for one Gemini client."""
29
 
30
+ id: str = Field(..., description="Unique identifier for the client")
31
  secure_1psid: str = Field(..., description="Gemini Secure 1PSID")
32
  secure_1psidts: str = Field(..., description="Gemini Secure 1PSIDTS")
33
+
34
+
35
+ class GeminiConfig(BaseModel):
36
+ """Gemini API configuration"""
37
+
38
+ clients: list[GeminiClientSettings] = Field(
39
+ ..., description="List of Gemini client credential pairs"
40
+ )
41
  timeout: int = Field(default=60, ge=1, description="Init timeout")
42
  auto_refresh: bool = Field(True, description="Enable auto-refresh for Gemini cookies")
43
  refresh_interval: int = Field(
 
63
 
64
 
65
  class StorageConfig(BaseModel):
66
+ """LMDB Storage configuration"""
67
+
68
  path: str = Field(
69
+ default="data/lmdb",
70
  description="Path to the storage directory where data will be saved",
71
  )
72
  max_size: int = Field(
 
77
 
78
 
79
  class LoggingConfig(BaseModel):
80
+ """Logging configuration"""
81
+
82
  level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(
83
  default="DEBUG",
84
  description="Logging level",
 
117
  model_config = SettingsConfigDict(
118
  env_prefix="CONFIG_",
119
  env_nested_delimiter="__",
120
+ nested_model_default_partial_update=True,
121
  yaml_file=os.getenv("CONFIG_PATH", CONFIG_PATH),
122
  )
123
 
 
131
  file_secret_settings,
132
  ):
133
  """Read settings: env -> yaml -> default"""
134
+ return (
135
+ env_settings,
136
+ YamlConfigSettingsSource(settings_cls),
137
+ )
138
+
139
+
140
+ def extract_gemini_clients_env() -> dict[int, dict[str, str]]:
141
+ """Extract and remove all Gemini clients related environment variables, return a mapping from index to field dict."""
142
+ prefix = "CONFIG_GEMINI__CLIENTS__"
143
+ env_overrides: dict[int, dict[str, str]] = {}
144
+ to_delete = []
145
+ for k, v in os.environ.items():
146
+ if k.startswith(prefix):
147
+ parts = k.split("__")
148
+ if len(parts) < 4:
149
+ continue
150
+ index_str, field = parts[2], parts[3].lower()
151
+ if not index_str.isdigit():
152
+ continue
153
+ idx = int(index_str)
154
+ env_overrides.setdefault(idx, {})[field] = v
155
+ to_delete.append(k)
156
+ # Remove these environment variables to avoid Pydantic parsing errors
157
+ for k in to_delete:
158
+ del os.environ[k]
159
+ return env_overrides
160
+
161
+
162
+ def _merge_clients_with_env(
163
+ base_clients: list[GeminiClientSettings] | None, env_overrides: dict[int, dict[str, str]]
164
+ ):
165
+ """Override base_clients with env_overrides, return the new clients list."""
166
+ if not env_overrides:
167
+ return base_clients
168
+ result_clients: list[GeminiClientSettings] = []
169
+ if base_clients:
170
+ result_clients = [client.model_copy() for client in base_clients]
171
+ for idx in sorted(env_overrides):
172
+ overrides = env_overrides[idx]
173
+ if idx < len(result_clients):
174
+ client_dict = result_clients[idx].model_dump()
175
+ client_dict.update(overrides)
176
+ result_clients[idx] = GeminiClientSettings(**client_dict)
177
+ elif idx == len(result_clients):
178
+ new_client = GeminiClientSettings(**overrides)
179
+ result_clients.append(new_client)
180
+ else:
181
+ raise IndexError(f"Client index {idx} in env is out of range.")
182
+ return result_clients if result_clients else base_clients
183
 
184
 
185
  def initialize_config() -> Config:
 
190
  Config: Configuration object
191
  """
192
  try:
193
+ # First, extract and remove Gemini clients related environment variables
194
+ env_clients_overrides = extract_gemini_clients_env()
195
+
196
+ # Then, initialize Config with pydantic_settings
197
+ config = Config() # type: ignore
198
+
199
+ # Synthesize clients
200
+ config.gemini.clients = _merge_clients_with_env(
201
+ config.gemini.clients, env_clients_overrides
202
+ ) # type: ignore
203
+
204
+ return config
205
  except ValidationError as e:
206
  logger.error(f"Configuration validation failed: {e!s}")
207
  sys.exit(1)
app/utils/logging.py CHANGED
@@ -37,7 +37,7 @@ def setup_logging(
37
  # Setup standard logging library interceptor
38
  _setup_logging_intercept()
39
 
40
- logger.info("Logger initialized.")
41
 
42
 
43
  def _setup_logging_intercept() -> None:
 
37
  # Setup standard logging library interceptor
38
  _setup_logging_intercept()
39
 
40
+ logger.debug("Logger initialized.")
41
 
42
 
43
  def _setup_logging_intercept() -> None:
config/config.yaml CHANGED
@@ -13,8 +13,10 @@ cors:
13
  allow_headers: ["*"]
14
 
15
  gemini:
16
- secure_1psid: "YOUR_SECURE_1PSID_HERE" # Required: Gemini session cookie
17
- secure_1psidts: "YOUR_SECURE_1PSIDTS_HERE" # Required: Gemini timestamp cookie
 
 
18
  timeout: 60 # Init timeout in seconds
19
  auto_refresh: true # Auto-refresh session cookies
20
  refresh_interval: 540 # Refresh interval in seconds
 
13
  allow_headers: ["*"]
14
 
15
  gemini:
16
+ clients:
17
+ - id: "example-id-1" # Arbitrary client ID
18
+ secure_1psid: "YOUR_SECURE_1PSID_HERE"
19
+ secure_1psidts: "YOUR_SECURE_1PSIDTS_HERE"
20
  timeout: 60 # Init timeout in seconds
21
  auto_refresh: true # Auto-refresh session cookies
22
  refresh_interval: 540 # Refresh interval in seconds