Spaces:
Paused
Paused
Mirrowel commited on
Commit ·
4bbfff4
1
Parent(s): aa8035e
feat: Enhance asynchronous handling in RotatingClient and UsageManager for improved error management and usage tracking
Browse files- requirements.txt +4 -2
- src/rotator_library/client.py +23 -11
- src/rotator_library/error_handler.py +3 -7
- src/rotator_library/usage_manager.py +144 -132
requirements.txt
CHANGED
|
@@ -1,9 +1,7 @@
|
|
| 1 |
# FastAPI framework for building the proxy server
|
| 2 |
fastapi
|
| 3 |
-
|
| 4 |
# ASGI server for running the FastAPI application
|
| 5 |
uvicorn
|
| 6 |
-
|
| 7 |
# For loading environment variables from a .env file
|
| 8 |
python-dotenv
|
| 9 |
|
|
@@ -12,3 +10,7 @@ python-dotenv
|
|
| 12 |
|
| 13 |
# A library for calling LLM APIs with a consistent format
|
| 14 |
litellm
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
# FastAPI framework for building the proxy server
|
| 2 |
fastapi
|
|
|
|
| 3 |
# ASGI server for running the FastAPI application
|
| 4 |
uvicorn
|
|
|
|
| 5 |
# For loading environment variables from a .env file
|
| 6 |
python-dotenv
|
| 7 |
|
|
|
|
| 10 |
|
| 11 |
# A library for calling LLM APIs with a consistent format
|
| 12 |
litellm
|
| 13 |
+
|
| 14 |
+
filelock
|
| 15 |
+
httpx
|
| 16 |
+
aiofiles
|
src/rotator_library/client.py
CHANGED
|
@@ -58,7 +58,7 @@ class RotatingClient:
|
|
| 58 |
# Safely check for usage data in the chunk
|
| 59 |
if hasattr(chunk, 'usage') and chunk.usage:
|
| 60 |
lib_logger.info(f"Usage found in chunk for key ...{key[-4:]}: {chunk.usage}")
|
| 61 |
-
self.usage_manager.record_success(key, model, chunk)
|
| 62 |
|
| 63 |
finally:
|
| 64 |
# Signal the end of the stream
|
|
@@ -110,23 +110,35 @@ class RotatingClient:
|
|
| 110 |
if is_streaming:
|
| 111 |
return self._streaming_wrapper(response, current_key, model)
|
| 112 |
else:
|
| 113 |
-
self.usage_manager.record_success(current_key, model, response)
|
| 114 |
return response
|
| 115 |
|
| 116 |
except Exception as e:
|
| 117 |
log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
|
| 118 |
-
|
| 119 |
-
if is_server_error(e) or (is_rate_limit_error(e) and attempt < self.max_retries - 1):
|
| 120 |
-
lib_logger.warning(f"Key ...{current_key[-4:]} failed with retriable error. Retrying...")
|
| 121 |
-
await asyncio.sleep(1 * (attempt + 1))
|
| 122 |
-
continue
|
| 123 |
-
|
| 124 |
if is_unrecoverable_error(e):
|
|
|
|
| 125 |
raise e
|
| 126 |
|
| 127 |
-
|
| 128 |
-
|
| 129 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 130 |
|
| 131 |
# If we exit the retry loop due to failure, release the key and try to get a new one.
|
| 132 |
await self.usage_manager.release_key(current_key)
|
|
|
|
| 58 |
# Safely check for usage data in the chunk
|
| 59 |
if hasattr(chunk, 'usage') and chunk.usage:
|
| 60 |
lib_logger.info(f"Usage found in chunk for key ...{key[-4:]}: {chunk.usage}")
|
| 61 |
+
await self.usage_manager.record_success(key, model, chunk)
|
| 62 |
|
| 63 |
finally:
|
| 64 |
# Signal the end of the stream
|
|
|
|
| 110 |
if is_streaming:
|
| 111 |
return self._streaming_wrapper(response, current_key, model)
|
| 112 |
else:
|
| 113 |
+
await self.usage_manager.record_success(current_key, model, response)
|
| 114 |
return response
|
| 115 |
|
| 116 |
except Exception as e:
|
| 117 |
log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
|
| 118 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 119 |
if is_unrecoverable_error(e):
|
| 120 |
+
lib_logger.error(f"Key ...{current_key[-4:]} failed with unrecoverable error: {e}. Raising exception.")
|
| 121 |
raise e
|
| 122 |
|
| 123 |
+
if is_rate_limit_error(e):
|
| 124 |
+
lib_logger.warning(f"Key ...{current_key[-4:]} hit a rate limit for model {model}. Rotating key and setting cooldown.")
|
| 125 |
+
await self.usage_manager.record_rotation_error(current_key, model, e)
|
| 126 |
+
break # Break from retries to get a new key
|
| 127 |
+
|
| 128 |
+
if is_server_error(e):
|
| 129 |
+
if attempt < self.max_retries - 1:
|
| 130 |
+
lib_logger.warning(f"Key ...{current_key[-4:]} encountered a server error. Retrying (attempt {attempt + 2}/{self.max_retries})...")
|
| 131 |
+
await asyncio.sleep(1.5 * (attempt + 1))
|
| 132 |
+
continue
|
| 133 |
+
else:
|
| 134 |
+
lib_logger.error(f"Key ...{current_key[-4:]} failed after max retries on a server error. Rotating key.")
|
| 135 |
+
await self.usage_manager.record_rotation_error(current_key, model, e)
|
| 136 |
+
break
|
| 137 |
+
|
| 138 |
+
# Fallback for any other unexpected errors
|
| 139 |
+
lib_logger.error(f"Key ...{current_key[-4:]} failed with an unexpected error: {e}. Rotating key.")
|
| 140 |
+
await self.usage_manager.record_rotation_error(current_key, model, e)
|
| 141 |
+
break
|
| 142 |
|
| 143 |
# If we exit the retry loop due to failure, release the key and try to get a new one.
|
| 144 |
await self.usage_manager.release_key(current_key)
|
src/rotator_library/error_handler.py
CHANGED
|
@@ -1,9 +1,5 @@
|
|
| 1 |
from litellm.exceptions import APIConnectionError, RateLimitError, ServiceUnavailableError, AuthenticationError, InvalidRequestError
|
| 2 |
|
| 3 |
-
def is_authentication_error(e: Exception) -> bool:
|
| 4 |
-
"""Checks if the exception is related to authentication."""
|
| 5 |
-
return isinstance(e, AuthenticationError)
|
| 6 |
-
|
| 7 |
def is_rate_limit_error(e: Exception) -> bool:
|
| 8 |
"""Checks if the exception is a rate limit error."""
|
| 9 |
return isinstance(e, RateLimitError)
|
|
@@ -14,7 +10,7 @@ def is_server_error(e: Exception) -> bool:
|
|
| 14 |
|
| 15 |
def is_unrecoverable_error(e: Exception) -> bool:
|
| 16 |
"""
|
| 17 |
-
Checks if the exception is a non-retriable client-side error
|
| 18 |
-
|
| 19 |
"""
|
| 20 |
-
return isinstance(e, InvalidRequestError)
|
|
|
|
| 1 |
from litellm.exceptions import APIConnectionError, RateLimitError, ServiceUnavailableError, AuthenticationError, InvalidRequestError
|
| 2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 3 |
def is_rate_limit_error(e: Exception) -> bool:
|
| 4 |
"""Checks if the exception is a rate limit error."""
|
| 5 |
return isinstance(e, RateLimitError)
|
|
|
|
| 10 |
|
| 11 |
def is_unrecoverable_error(e: Exception) -> bool:
|
| 12 |
"""
|
| 13 |
+
Checks if the exception is a non-retriable client-side error.
|
| 14 |
+
These are errors that will not resolve on their own.
|
| 15 |
"""
|
| 16 |
+
return isinstance(e, (InvalidRequestError, AuthenticationError))
|
src/rotator_library/usage_manager.py
CHANGED
|
@@ -3,20 +3,22 @@ import os
|
|
| 3 |
import time
|
| 4 |
import logging
|
| 5 |
import asyncio
|
| 6 |
-
from datetime import date
|
| 7 |
-
from typing import Dict, List, Optional,
|
| 8 |
from filelock import FileLock
|
|
|
|
| 9 |
import litellm
|
| 10 |
import re
|
| 11 |
|
| 12 |
lib_logger = logging.getLogger('rotator_library')
|
| 13 |
-
lib_logger.propagate = False
|
| 14 |
if not lib_logger.handlers:
|
| 15 |
lib_logger.addHandler(logging.NullHandler())
|
| 16 |
|
| 17 |
class UsageManager:
|
| 18 |
"""
|
| 19 |
-
Manages usage statistics and cooldowns for API keys with asyncio-safe locking
|
|
|
|
| 20 |
"""
|
| 21 |
def __init__(self, file_path: str = "key_usage.json", wait_timeout: int = 5):
|
| 22 |
self.file_path = file_path
|
|
@@ -24,34 +26,62 @@ class UsageManager:
|
|
| 24 |
self.key_locks: Dict[str, asyncio.Lock] = {}
|
| 25 |
self.condition = asyncio.Condition()
|
| 26 |
self.wait_timeout = wait_timeout
|
| 27 |
-
|
| 28 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 29 |
|
| 30 |
-
def
|
| 31 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 32 |
if not os.path.exists(self.file_path):
|
| 33 |
-
|
|
|
|
| 34 |
try:
|
| 35 |
-
with open(self.file_path, 'r') as f:
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
|
| 40 |
-
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 47 |
today_str = date.today().isoformat()
|
| 48 |
needs_saving = False
|
| 49 |
-
for key, data in self.
|
| 50 |
daily_data = data.get("daily", {})
|
| 51 |
-
|
| 52 |
-
if last_date_str != today_str:
|
| 53 |
needs_saving = True
|
| 54 |
-
# Add yesterday's daily stats to global stats
|
| 55 |
global_data = data.setdefault("global", {"models": {}})
|
| 56 |
for model, stats in daily_data.get("models", {}).items():
|
| 57 |
global_model_stats = global_data["models"].setdefault(model, {"success_count": 0, "prompt_tokens": 0, "completion_tokens": 0, "approx_cost": 0.0})
|
|
@@ -59,12 +89,10 @@ class UsageManager:
|
|
| 59 |
global_model_stats["prompt_tokens"] += stats.get("prompt_tokens", 0)
|
| 60 |
global_model_stats["completion_tokens"] += stats.get("completion_tokens", 0)
|
| 61 |
global_model_stats["approx_cost"] += stats.get("approx_cost", 0.0)
|
| 62 |
-
|
| 63 |
-
# Reset daily stats
|
| 64 |
data["daily"] = {"date": today_str, "models": {}}
|
| 65 |
|
| 66 |
if needs_saving:
|
| 67 |
-
self._save_usage()
|
| 68 |
|
| 69 |
def _initialize_locks(self, keys: List[str]):
|
| 70 |
"""Initializes asyncio locks for all provided keys if not already present."""
|
|
@@ -74,31 +102,29 @@ class UsageManager:
|
|
| 74 |
|
| 75 |
async def acquire_key(self, available_keys: List[str], model: str) -> str:
|
| 76 |
"""
|
| 77 |
-
Acquires the best available key
|
| 78 |
-
released or times out and returns the best-ranked key anyway.
|
| 79 |
"""
|
|
|
|
| 80 |
self._initialize_locks(available_keys)
|
| 81 |
|
| 82 |
async with self.condition:
|
| 83 |
while True:
|
| 84 |
-
# Rank all keys that are not on cooldown
|
| 85 |
eligible_keys = []
|
| 86 |
-
|
| 87 |
-
|
| 88 |
-
|
| 89 |
-
|
| 90 |
-
|
| 91 |
-
|
|
|
|
| 92 |
|
| 93 |
if not eligible_keys:
|
| 94 |
lib_logger.warning("All keys are on cooldown. Waiting...")
|
| 95 |
await asyncio.sleep(5)
|
| 96 |
continue
|
| 97 |
|
| 98 |
-
# Sort by usage count (ascending)
|
| 99 |
eligible_keys.sort(key=lambda x: x[1])
|
| 100 |
|
| 101 |
-
# Try to acquire the lock for the first unlocked key in the ranked list
|
| 102 |
for key, _ in eligible_keys:
|
| 103 |
lock = self.key_locks[key]
|
| 104 |
if not lock.locked():
|
|
@@ -106,112 +132,98 @@ class UsageManager:
|
|
| 106 |
lib_logger.info(f"Acquired lock for available key: ...{key[-4:]}")
|
| 107 |
return key
|
| 108 |
|
| 109 |
-
|
| 110 |
-
best_locked_key = eligible_keys[0][0]
|
| 111 |
-
lib_logger.info(f"All eligible keys are locked. Waiting for a key to be released. Best candidate: ...{best_locked_key[-4:]}")
|
| 112 |
|
| 113 |
try:
|
| 114 |
await asyncio.wait_for(self.condition.wait(), timeout=self.wait_timeout)
|
| 115 |
-
# If wait() returns, it means we were notified, so we re-run the loop
|
| 116 |
lib_logger.info("Notified that a key was released. Re-evaluating...")
|
| 117 |
continue
|
| 118 |
except asyncio.TimeoutError:
|
| 119 |
-
|
| 120 |
-
|
| 121 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 122 |
|
| 123 |
async def release_key(self, key: str):
|
| 124 |
"""Releases the lock for a given key and notifies waiting tasks."""
|
| 125 |
async with self.condition:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 126 |
if key in self.key_locks and self.key_locks[key].locked():
|
| 127 |
self.key_locks[key].release()
|
| 128 |
lib_logger.info(f"Released lock for key ...{key[-4:]}")
|
| 129 |
-
self.condition.notify()
|
| 130 |
-
|
| 131 |
-
def record_success(self, key: str, model: str, completion_response: litellm.ModelResponse):
|
| 132 |
-
|
| 133 |
-
|
| 134 |
-
|
| 135 |
-
|
| 136 |
-
|
| 137 |
-
|
| 138 |
-
|
| 139 |
-
|
| 140 |
-
|
| 141 |
-
|
| 142 |
-
|
| 143 |
-
|
| 144 |
-
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
|
| 148 |
-
|
| 149 |
-
|
| 150 |
-
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
| 154 |
-
|
| 155 |
-
|
| 156 |
-
key_data["last_used_ts"] = time.time()
|
| 157 |
-
self._save_usage()
|
| 158 |
|
| 159 |
-
|
| 160 |
-
key_data = self.usage_data.setdefault(key, {"daily": {"date": date.today().isoformat(), "models": {}}, "global": {"models": {}}, "model_cooldowns": {}})
|
| 161 |
|
| 162 |
-
|
| 163 |
-
|
| 164 |
-
|
| 165 |
-
|
| 166 |
-
|
| 167 |
-
|
| 168 |
-
|
| 169 |
-
|
| 170 |
-
|
| 171 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 172 |
try:
|
| 173 |
-
|
| 174 |
-
|
| 175 |
-
|
| 176 |
-
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
| 180 |
-
|
| 181 |
-
|
| 182 |
-
|
| 183 |
-
|
| 184 |
-
|
| 185 |
-
|
| 186 |
-
|
| 187 |
-
|
| 188 |
-
try:
|
| 189 |
-
# Look for numbers followed by 's' or 'seconds' in retry/delay context
|
| 190 |
-
patterns = [
|
| 191 |
-
r'retry.*?(\d+)s',
|
| 192 |
-
r'delay.*?(\d+)s',
|
| 193 |
-
r'wait.*?(\d+)\s*seconds?'
|
| 194 |
-
]
|
| 195 |
-
for pattern in patterns:
|
| 196 |
-
match = re.search(pattern, error_str, re.IGNORECASE)
|
| 197 |
-
if match:
|
| 198 |
-
delay_str = match.group(1)
|
| 199 |
-
break
|
| 200 |
-
except Exception:
|
| 201 |
-
pass
|
| 202 |
-
|
| 203 |
-
if delay_str:
|
| 204 |
-
cooldown_seconds = int(delay_str)
|
| 205 |
-
cooldown_seconds = int(delay_str)
|
| 206 |
-
except (IndexError, ValueError):
|
| 207 |
-
pass
|
| 208 |
-
|
| 209 |
-
model_cooldowns = key_data.setdefault("model_cooldowns", {})
|
| 210 |
-
model_cooldowns[model] = time.time() + cooldown_seconds
|
| 211 |
-
|
| 212 |
-
key_data["last_rotation_error"] = {
|
| 213 |
-
"timestamp": time.time(),
|
| 214 |
-
"model": model,
|
| 215 |
-
"error": str(error)
|
| 216 |
-
}
|
| 217 |
-
self._save_usage()
|
|
|
|
| 3 |
import time
|
| 4 |
import logging
|
| 5 |
import asyncio
|
| 6 |
+
from datetime import date
|
| 7 |
+
from typing import Dict, List, Optional, Set
|
| 8 |
from filelock import FileLock
|
| 9 |
+
import aiofiles
|
| 10 |
import litellm
|
| 11 |
import re
|
| 12 |
|
| 13 |
lib_logger = logging.getLogger('rotator_library')
|
| 14 |
+
lib_logger.propagate = False
|
| 15 |
if not lib_logger.handlers:
|
| 16 |
lib_logger.addHandler(logging.NullHandler())
|
| 17 |
|
| 18 |
class UsageManager:
|
| 19 |
"""
|
| 20 |
+
Manages usage statistics and cooldowns for API keys with asyncio-safe locking,
|
| 21 |
+
asynchronous file I/O, and a lazy-loading mechanism for usage data.
|
| 22 |
"""
|
| 23 |
def __init__(self, file_path: str = "key_usage.json", wait_timeout: int = 5):
|
| 24 |
self.file_path = file_path
|
|
|
|
| 26 |
self.key_locks: Dict[str, asyncio.Lock] = {}
|
| 27 |
self.condition = asyncio.Condition()
|
| 28 |
self.wait_timeout = wait_timeout
|
| 29 |
+
|
| 30 |
+
# Data-related locks and state
|
| 31 |
+
self._data_lock = asyncio.Lock()
|
| 32 |
+
self._usage_data: Optional[Dict] = None
|
| 33 |
+
self._initialized = asyncio.Event()
|
| 34 |
+
self._init_lock = asyncio.Lock()
|
| 35 |
+
|
| 36 |
+
# For "fair timeout" logic
|
| 37 |
+
self._timeout_lock = asyncio.Lock()
|
| 38 |
+
self._claimed_on_timeout: Set[str] = set()
|
| 39 |
|
| 40 |
+
async def _lazy_init(self):
|
| 41 |
+
"""
|
| 42 |
+
Initializes the usage data by loading it from the file asynchronously.
|
| 43 |
+
This method is called on the first access to ensure data is loaded
|
| 44 |
+
before any operations are performed.
|
| 45 |
+
"""
|
| 46 |
+
async with self._init_lock:
|
| 47 |
+
if not self._initialized.is_set():
|
| 48 |
+
await self._load_usage()
|
| 49 |
+
await self._reset_daily_stats_if_needed()
|
| 50 |
+
self._initialized.set()
|
| 51 |
+
|
| 52 |
+
async def _load_usage(self):
|
| 53 |
+
"""Loads usage data from the JSON file asynchronously."""
|
| 54 |
+
async with self._data_lock:
|
| 55 |
if not os.path.exists(self.file_path):
|
| 56 |
+
self._usage_data = {}
|
| 57 |
+
return
|
| 58 |
try:
|
| 59 |
+
async with aiofiles.open(self.file_path, 'r') as f:
|
| 60 |
+
content = await f.read()
|
| 61 |
+
self._usage_data = json.loads(content)
|
| 62 |
+
except (json.JSONDecodeError, IOError, FileNotFoundError):
|
| 63 |
+
self._usage_data = {}
|
| 64 |
+
|
| 65 |
+
async def _save_usage(self):
|
| 66 |
+
"""Saves the current usage data to the JSON file asynchronously."""
|
| 67 |
+
if self._usage_data is None:
|
| 68 |
+
return
|
| 69 |
+
async with self._data_lock:
|
| 70 |
+
with self.file_lock: # Use filelock to prevent multi-process race conditions
|
| 71 |
+
async with aiofiles.open(self.file_path, 'w') as f:
|
| 72 |
+
await f.write(json.dumps(self._usage_data, indent=2))
|
| 73 |
+
|
| 74 |
+
async def _reset_daily_stats_if_needed(self):
|
| 75 |
+
"""Checks if daily stats need to be reset for any key (async version)."""
|
| 76 |
+
if self._usage_data is None:
|
| 77 |
+
return
|
| 78 |
+
|
| 79 |
today_str = date.today().isoformat()
|
| 80 |
needs_saving = False
|
| 81 |
+
for key, data in self._usage_data.items():
|
| 82 |
daily_data = data.get("daily", {})
|
| 83 |
+
if daily_data.get("date") != today_str:
|
|
|
|
| 84 |
needs_saving = True
|
|
|
|
| 85 |
global_data = data.setdefault("global", {"models": {}})
|
| 86 |
for model, stats in daily_data.get("models", {}).items():
|
| 87 |
global_model_stats = global_data["models"].setdefault(model, {"success_count": 0, "prompt_tokens": 0, "completion_tokens": 0, "approx_cost": 0.0})
|
|
|
|
| 89 |
global_model_stats["prompt_tokens"] += stats.get("prompt_tokens", 0)
|
| 90 |
global_model_stats["completion_tokens"] += stats.get("completion_tokens", 0)
|
| 91 |
global_model_stats["approx_cost"] += stats.get("approx_cost", 0.0)
|
|
|
|
|
|
|
| 92 |
data["daily"] = {"date": today_str, "models": {}}
|
| 93 |
|
| 94 |
if needs_saving:
|
| 95 |
+
await self._save_usage()
|
| 96 |
|
| 97 |
def _initialize_locks(self, keys: List[str]):
|
| 98 |
"""Initializes asyncio locks for all provided keys if not already present."""
|
|
|
|
| 102 |
|
| 103 |
async def acquire_key(self, available_keys: List[str], model: str) -> str:
|
| 104 |
"""
|
| 105 |
+
Acquires the best available key with robust locking and a fair timeout mechanism.
|
|
|
|
| 106 |
"""
|
| 107 |
+
await self._lazy_init()
|
| 108 |
self._initialize_locks(available_keys)
|
| 109 |
|
| 110 |
async with self.condition:
|
| 111 |
while True:
|
|
|
|
| 112 |
eligible_keys = []
|
| 113 |
+
async with self._data_lock:
|
| 114 |
+
for key in available_keys:
|
| 115 |
+
key_data = self._usage_data.get(key, {})
|
| 116 |
+
cooldown_until = key_data.get("model_cooldowns", {}).get(model)
|
| 117 |
+
if not cooldown_until or time.time() > cooldown_until:
|
| 118 |
+
usage_count = key_data.get("daily", {}).get("models", {}).get(model, {}).get("success_count", 0)
|
| 119 |
+
eligible_keys.append((key, usage_count))
|
| 120 |
|
| 121 |
if not eligible_keys:
|
| 122 |
lib_logger.warning("All keys are on cooldown. Waiting...")
|
| 123 |
await asyncio.sleep(5)
|
| 124 |
continue
|
| 125 |
|
|
|
|
| 126 |
eligible_keys.sort(key=lambda x: x[1])
|
| 127 |
|
|
|
|
| 128 |
for key, _ in eligible_keys:
|
| 129 |
lock = self.key_locks[key]
|
| 130 |
if not lock.locked():
|
|
|
|
| 132 |
lib_logger.info(f"Acquired lock for available key: ...{key[-4:]}")
|
| 133 |
return key
|
| 134 |
|
| 135 |
+
lib_logger.info("All eligible keys are locked. Waiting for a key to be released.")
|
|
|
|
|
|
|
| 136 |
|
| 137 |
try:
|
| 138 |
await asyncio.wait_for(self.condition.wait(), timeout=self.wait_timeout)
|
|
|
|
| 139 |
lib_logger.info("Notified that a key was released. Re-evaluating...")
|
| 140 |
continue
|
| 141 |
except asyncio.TimeoutError:
|
| 142 |
+
lib_logger.warning("Wait timed out. Attempting to acquire a key via fair timeout logic.")
|
| 143 |
+
async with self._timeout_lock:
|
| 144 |
+
for key, _ in eligible_keys:
|
| 145 |
+
if key not in self._claimed_on_timeout:
|
| 146 |
+
self._claimed_on_timeout.add(key)
|
| 147 |
+
lib_logger.info(f"Acquired key ...{key[-4:]} via timeout claim.")
|
| 148 |
+
return key
|
| 149 |
+
lib_logger.error("Timeout occurred, but all eligible keys were already claimed by other timed-out tasks.")
|
| 150 |
+
# Fallback to waiting again if all keys were claimed
|
| 151 |
+
await asyncio.sleep(1)
|
| 152 |
+
|
| 153 |
|
| 154 |
async def release_key(self, key: str):
|
| 155 |
"""Releases the lock for a given key and notifies waiting tasks."""
|
| 156 |
async with self.condition:
|
| 157 |
+
# Also release from timeout claim set if it's there
|
| 158 |
+
async with self._timeout_lock:
|
| 159 |
+
if key in self._claimed_on_timeout:
|
| 160 |
+
self._claimed_on_timeout.remove(key)
|
| 161 |
+
|
| 162 |
if key in self.key_locks and self.key_locks[key].locked():
|
| 163 |
self.key_locks[key].release()
|
| 164 |
lib_logger.info(f"Released lock for key ...{key[-4:]}")
|
| 165 |
+
self.condition.notify()
|
| 166 |
+
|
| 167 |
+
async def record_success(self, key: str, model: str, completion_response: litellm.ModelResponse):
|
| 168 |
+
"""Records a successful API call asynchronously."""
|
| 169 |
+
await self._lazy_init()
|
| 170 |
+
async with self._data_lock:
|
| 171 |
+
key_data = self._usage_data.setdefault(key, {"daily": {"date": date.today().isoformat(), "models": {}}, "global": {"models": {}}, "model_cooldowns": {}})
|
| 172 |
+
|
| 173 |
+
if model in key_data.get("model_cooldowns", {}):
|
| 174 |
+
del key_data["model_cooldowns"][model]
|
| 175 |
+
|
| 176 |
+
if key_data["daily"].get("date") != date.today().isoformat():
|
| 177 |
+
# This is a simplified reset for the current key. A full reset is done in _lazy_init.
|
| 178 |
+
key_data["daily"] = {"date": date.today().isoformat(), "models": {}}
|
| 179 |
+
|
| 180 |
+
daily_model_data = key_data["daily"]["models"].setdefault(model, {"success_count": 0, "prompt_tokens": 0, "completion_tokens": 0, "approx_cost": 0.0})
|
| 181 |
+
|
| 182 |
+
usage = completion_response.usage
|
| 183 |
+
daily_model_data["success_count"] += 1
|
| 184 |
+
daily_model_data["prompt_tokens"] += usage.prompt_tokens
|
| 185 |
+
daily_model_data["completion_tokens"] += usage.completion_tokens
|
| 186 |
+
|
| 187 |
+
try:
|
| 188 |
+
cost = litellm.completion_cost(completion_response=completion_response)
|
| 189 |
+
daily_model_data["approx_cost"] += cost
|
| 190 |
+
except Exception as e:
|
| 191 |
+
lib_logger.warning(f"Could not calculate cost for model {model}: {e}")
|
|
|
|
|
|
|
| 192 |
|
| 193 |
+
key_data["last_used_ts"] = time.time()
|
|
|
|
| 194 |
|
| 195 |
+
await self._save_usage()
|
| 196 |
+
|
| 197 |
+
async def record_rotation_error(self, key: str, model: str, error: Exception):
|
| 198 |
+
"""Records a rotation error and sets a cooldown asynchronously."""
|
| 199 |
+
await self._lazy_init()
|
| 200 |
+
async with self._data_lock:
|
| 201 |
+
key_data = self._usage_data.setdefault(key, {"daily": {"date": date.today().isoformat(), "models": {}}, "global": {"models": {}}, "model_cooldowns": {}})
|
| 202 |
+
|
| 203 |
+
cooldown_seconds = 86400
|
| 204 |
+
error_str = str(error).lower()
|
| 205 |
+
|
| 206 |
+
patterns = [
|
| 207 |
+
r'retry_delay.*?(\d+)',
|
| 208 |
+
r'retrydelay.*?(\d+)s',
|
| 209 |
+
r'wait.*?(\d+)\s*seconds?'
|
| 210 |
+
]
|
| 211 |
+
for pattern in patterns:
|
| 212 |
+
match = re.search(pattern, error_str, re.IGNORECASE)
|
| 213 |
+
if match:
|
| 214 |
try:
|
| 215 |
+
cooldown_seconds = int(match.group(1))
|
| 216 |
+
break
|
| 217 |
+
except (ValueError, IndexError):
|
| 218 |
+
continue
|
| 219 |
+
|
| 220 |
+
model_cooldowns = key_data.setdefault("model_cooldowns", {})
|
| 221 |
+
model_cooldowns[model] = time.time() + cooldown_seconds
|
| 222 |
+
|
| 223 |
+
key_data["last_rotation_error"] = {
|
| 224 |
+
"timestamp": time.time(),
|
| 225 |
+
"model": model,
|
| 226 |
+
"error": str(error)
|
| 227 |
+
}
|
| 228 |
+
|
| 229 |
+
await self._save_usage()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|