Mirrowel commited on
Commit
bab8bf0
·
1 Parent(s): 44481e5

feat: Update key release logic in RotatingClient and UsageManager to support model-specific tracking

Browse files
src/rotator_library/client.py CHANGED
@@ -62,7 +62,7 @@ class RotatingClient:
62
  lib_logger.info(f"Recorded usage from final stream object for key ...{key[-4:]}")
63
 
64
  # 3. Release the key only after all attempts to record usage are complete
65
- await self.usage_manager.release_key(key)
66
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
67
  yield "data: [DONE]\n\n"
68
 
@@ -127,7 +127,7 @@ class RotatingClient:
127
  else:
128
  # For non-streaming, record and release here.
129
  await self.usage_manager.record_success(current_key, model, response)
130
- await self.usage_manager.release_key(current_key)
131
  return response
132
 
133
  except Exception as e:
 
62
  lib_logger.info(f"Recorded usage from final stream object for key ...{key[-4:]}")
63
 
64
  # 3. Release the key only after all attempts to record usage are complete
65
+ await self.usage_manager.release_key(key, model)
66
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
67
  yield "data: [DONE]\n\n"
68
 
 
127
  else:
128
  # For non-streaming, record and release here.
129
  await self.usage_manager.record_success(current_key, model, response)
130
+ await self.usage_manager.release_key(current_key, model)
131
  return response
132
 
133
  except Exception as e:
src/rotator_library/usage_manager.py CHANGED
@@ -21,11 +21,10 @@ class UsageManager:
21
  Manages usage statistics and cooldowns for API keys with asyncio-safe locking,
22
  asynchronous file I/O, and a lazy-loading mechanism for usage data.
23
  """
24
- def __init__(self, file_path: str = "key_usage.json", wait_timeout: int = 5, daily_reset_time_utc: Optional[str] = "00:00"):
25
  self.file_path = file_path
26
  self.file_lock = FileLock(f"{self.file_path}.lock")
27
- self.key_locks: Dict[str, asyncio.Lock] = {}
28
- self.condition = asyncio.Condition()
29
  self.wait_timeout = wait_timeout
30
 
31
  self._data_lock = asyncio.Lock()
@@ -120,80 +119,103 @@ class UsageManager:
120
  if needs_saving:
121
  await self._save_usage()
122
 
123
- def _initialize_locks(self, keys: List[str]):
124
- """Initializes asyncio locks for all provided keys if not already present."""
125
  for key in keys:
126
- if key not in self.key_locks:
127
- self.key_locks[key] = asyncio.Lock()
 
 
 
 
128
 
129
  async def acquire_key(self, available_keys: List[str], model: str) -> str:
130
- """Acquires the best available key with robust locking and a fair timeout mechanism."""
 
 
131
  await self._lazy_init()
132
- self._initialize_locks(available_keys)
133
-
134
- async with self.condition:
135
- while True:
136
- eligible_keys = []
137
- async with self._data_lock:
138
- now = time.time()
139
- for key in available_keys:
140
- key_data = self._usage_data.get(key, {})
141
-
142
- # Check for key-level lockout
143
- key_cooldown = key_data.get("key_cooldown_until")
144
- if key_cooldown and now < key_cooldown:
145
- continue
146
-
147
- # Check for model-specific cooldown
148
- model_cooldown = key_data.get("model_cooldowns", {}).get(model)
149
- if model_cooldown and now < model_cooldown:
150
- continue
151
-
152
- usage_count = key_data.get("daily", {}).get("models", {}).get(model, {}).get("success_count", 0)
153
- eligible_keys.append((key, usage_count))
154
-
155
- if not eligible_keys:
156
- lib_logger.warning("All keys are on cooldown. Waiting...")
157
- await asyncio.sleep(5)
158
- continue
 
 
 
 
 
 
 
159
 
160
- eligible_keys.sort(key=lambda x: x[1])
161
-
162
- for key, _ in eligible_keys:
163
- lock = self.key_locks[key]
164
- if not lock.locked():
165
- await lock.acquire()
166
- lib_logger.info(f"Acquired lock for available key: ...{key[-4:]}")
167
  return key
168
 
169
- lib_logger.info("All eligible keys are locked. Waiting for a key to be released.")
170
-
171
- try:
172
- await asyncio.wait_for(self.condition.wait(), timeout=self.wait_timeout)
173
- lib_logger.info("Notified that a key was released. Re-evaluating...")
174
- continue
175
- except asyncio.TimeoutError:
176
- lib_logger.warning("Wait timed out. Attempting to acquire a key via fair timeout logic.")
177
- async with self._timeout_lock:
178
- for key, _ in eligible_keys:
179
- if key not in self._claimed_on_timeout:
180
- self._claimed_on_timeout.add(key)
181
- lib_logger.info(f"Acquired key ...{key[-4:]} via timeout claim.")
182
- return key
183
- lib_logger.error("Timeout occurred, but all eligible keys were already claimed by other timed-out tasks.")
184
- await asyncio.sleep(1)
185
-
186
- async def release_key(self, key: str):
187
- """Releases the lock for a given key and notifies waiting tasks."""
188
- async with self.condition:
189
- async with self._timeout_lock:
190
- if key in self._claimed_on_timeout:
191
- self._claimed_on_timeout.remove(key)
192
-
193
- if key in self.key_locks and self.key_locks[key].locked():
194
- self.key_locks[key].release()
195
- lib_logger.info(f"Released lock for key ...{key[-4:]}")
196
- self.condition.notify()
 
 
 
 
 
 
 
 
 
 
197
 
198
  async def record_success(self, key: str, model: str, completion_response: Optional[litellm.ModelResponse] = None):
199
  """
 
21
  Manages usage statistics and cooldowns for API keys with asyncio-safe locking,
22
  asynchronous file I/O, and a lazy-loading mechanism for usage data.
23
  """
24
+ def __init__(self, file_path: str = "key_usage.json", wait_timeout: int = 13, daily_reset_time_utc: Optional[str] = "03:00"):
25
  self.file_path = file_path
26
  self.file_lock = FileLock(f"{self.file_path}.lock")
27
+ self.key_states: Dict[str, Dict[str, Any]] = {}
 
28
  self.wait_timeout = wait_timeout
29
 
30
  self._data_lock = asyncio.Lock()
 
119
  if needs_saving:
120
  await self._save_usage()
121
 
122
+ def _initialize_key_states(self, keys: List[str]):
123
+ """Initializes state tracking for all provided keys if not already present."""
124
  for key in keys:
125
+ if key not in self.key_states:
126
+ self.key_states[key] = {
127
+ "lock": asyncio.Lock(),
128
+ "condition": asyncio.Condition(),
129
+ "models_in_use": set()
130
+ }
131
 
132
  async def acquire_key(self, available_keys: List[str], model: str) -> str:
133
+ """
134
+ Acquires the best available key using a tiered, model-aware locking strategy.
135
+ """
136
  await self._lazy_init()
137
+ self._initialize_key_states(available_keys)
138
+
139
+ while True:
140
+ tier1_keys, tier2_keys = [], []
141
+ async with self._data_lock:
142
+ now = time.time()
143
+ for key in available_keys:
144
+ key_data = self._usage_data.get(key, {})
145
+
146
+ # Skip keys on global or model-specific cooldown
147
+ if key_data.get("key_cooldown_until", 0) > now or \
148
+ key_data.get("model_cooldowns", {}).get(model, 0) > now:
149
+ continue
150
+
151
+ usage_count = key_data.get("daily", {}).get("models", {}).get(model, {}).get("success_count", 0)
152
+ key_state = self.key_states[key]
153
+
154
+ if not key_state["models_in_use"]:
155
+ tier1_keys.append((key, usage_count))
156
+ elif model not in key_state["models_in_use"]:
157
+ tier2_keys.append((key, usage_count))
158
+
159
+ # Sort keys by usage count (ascending)
160
+ tier1_keys.sort(key=lambda x: x[1])
161
+ tier2_keys.sort(key=lambda x: x[1])
162
+
163
+ # Attempt to acquire from Tier 1 (completely free)
164
+ for key, _ in tier1_keys:
165
+ state = self.key_states[key]
166
+ async with state["lock"]:
167
+ if not state["models_in_use"]:
168
+ state["models_in_use"].add(model)
169
+ lib_logger.info(f"Acquired Tier 1 key ...{key[-4:]} for model {model}")
170
+ return key
171
 
172
+ # Attempt to acquire from Tier 2 (in use by other models)
173
+ for key, _ in tier2_keys:
174
+ state = self.key_states[key]
175
+ async with state["lock"]:
176
+ if model not in state["models_in_use"]:
177
+ state["models_in_use"].add(model)
178
+ lib_logger.info(f"Acquired Tier 2 key ...{key[-4:]} for model {model}")
179
  return key
180
 
181
+ # If no key is available, wait for one to be released
182
+ lib_logger.info("All eligible keys are currently locked for this model. Waiting...")
183
+
184
+ # Create a combined list of all potentially usable keys to wait on
185
+ all_potential_keys = tier1_keys + tier2_keys
186
+ if not all_potential_keys:
187
+ lib_logger.warning("No keys are eligible at all (all on cooldown). Waiting before re-evaluating.")
188
+ await asyncio.sleep(5)
189
+ continue
190
+
191
+ # Wait on the condition of the best available key
192
+ best_wait_key = min(all_potential_keys, key=lambda x: x[1])[0]
193
+ wait_condition = self.key_states[best_wait_key]["condition"]
194
+
195
+ try:
196
+ async with wait_condition:
197
+ await asyncio.wait_for(wait_condition.wait(), timeout=self.wait_timeout)
198
+ lib_logger.info("Notified that a key was released. Re-evaluating...")
199
+ except asyncio.TimeoutError:
200
+ lib_logger.warning("Wait timed out. Re-evaluating for any available key.")
201
+
202
+
203
+ async def release_key(self, key: str, model: str):
204
+ """Releases a key's lock for a specific model and notifies waiting tasks."""
205
+ if key not in self.key_states:
206
+ return
207
+
208
+ state = self.key_states[key]
209
+ async with state["lock"]:
210
+ if model in state["models_in_use"]:
211
+ state["models_in_use"].remove(model)
212
+ lib_logger.info(f"Released key ...{key[-4:]} from model {model}")
213
+ else:
214
+ lib_logger.warning(f"Attempted to release key ...{key[-4:]} for model {model}, but it was not in use.")
215
+
216
+ # Notify all tasks waiting on this key's condition
217
+ async with state["condition"]:
218
+ state["condition"].notify_all()
219
 
220
  async def record_success(self, key: str, model: str, completion_response: Optional[litellm.ModelResponse] = None):
221
  """