Mirrowel commited on
Commit
62ed41b
·
1 Parent(s): 83934f3

feat(core): implement deadline-driven request execution and resilient error handling

Browse files

Introduce a `global_timeout` parameter to `RotatingClient` and refactor the request lifecycle to operate within a strict time budget.

- All key acquisition, rotation, and retry mechanisms now adhere to this global deadline, preventing indefinite hangs.
- Redesign error propagation to handle transient provider failures (e.g., rate limits, 5xx errors) internally. These errors now trigger key rotation or deadline-aware retries without immediately raising exceptions to the caller.
- Provide a more stable client experience by shielding consumers from intermittent backend issues, returning a failure only when the global timeout is exceeded or all keys are exhausted.
- Update documentation to reflect the new `global_timeout` and improved error handling.

BREAKING CHANGE: The client's error propagation for transient failures has changed. `acompletion` and `aembedding` methods in `RotatingClient` no longer raise exceptions for intermittent issues like rate limits or server errors; instead, they handle them internally. Non-streaming requests will now return `None` upon final failure (after exhausting all keys or exceeding the global timeout), and streaming requests will yield a final `[DONE]` message with an error payload. Additionally, the `UsageManager.__init__` method no longer accepts the `wait_timeout` parameter, and `UsageManager.acquire_key` now requires a `deadline` argument.

DOCUMENTATION.md CHANGED
@@ -21,39 +21,46 @@ This library is the heart of the project, containing all the logic for key rotat
21
 
22
  The `RotatingClient` is the central class that orchestrates all operations. It is designed as a long-lived, async-native object.
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  #### Core Responsibilities
25
 
26
  * Managing a shared `httpx.AsyncClient` for all non-blocking HTTP requests.
27
  * Interfacing with the `UsageManager` to acquire and release API keys.
28
  * Dynamically loading and using provider-specific plugins from the `providers/` directory.
29
- * Executing API calls via `litellm` with a robust retry and rotation strategy.
30
  * Providing a safe, stateful wrapper for handling streaming responses.
31
 
32
- #### Request Lifecycle (`acompletion` & `aembedding`)
 
 
 
 
33
 
34
- When `acompletion` or `aembedding` is called, it follows a sophisticated, multi-layered process:
35
 
36
- 1. **Provider & Key Validation**: It extracts the provider from the `model` name (e.g., `"gemini/gemini-1.5-pro"` -> `"gemini"`) and ensures keys are configured for it.
37
 
38
- 2. **Key Acquisition Loop**: The client enters a `while` loop that attempts to find a valid key and complete the request. It iterates until one key succeeds or all have been tried.
39
- a. **Acquire Best Key**: It calls `self.usage_manager.acquire_key()`. This is a crucial, potentially blocking call that waits until a suitable key is available, based on the manager's tiered locking strategy (see `UsageManager` section).
40
- b. **Prepare Request**: It prepares the `litellm` keyword arguments. This includes applying provider-specific logic (e.g., remapping safety settings for Gemini, handling `api_base` for Chutes.ai) and sanitizing the payload to remove unsupported parameters.
41
 
42
- 3. **Retry Loop**: Once a key is acquired, it enters an inner `for` loop (`for attempt in range(self.max_retries)`):
43
- a. **API Call**: It calls `litellm.acompletion` or `litellm.aembedding`.
44
- b. **Success (Non-Streaming)**:
45
- - It calls `self.usage_manager.record_success()` to update usage stats and clear any cooldowns.
46
- - It calls `self.usage_manager.release_key()` to release the lock.
47
- - It returns the response, and the process ends.
48
- c. **Success (Streaming)**:
49
- - It returns the `_safe_streaming_wrapper` async generator. This wrapper is critical:
50
- - It yields SSE-formatted chunks to the consumer.
51
- - It can reassemble fragmented JSON chunks and detect errors mid-stream.
52
- - Its `finally` block ensures that `record_success()` and `release_key()` are called *only after the stream is fully consumed or closed*. This guarantees the key lock is held for the entire duration of the stream.
53
- d. **Failure**: If an exception occurs:
54
- - The exception is passed to `classify_error()` to get a structured `ClassifiedError` object.
55
- - **Server Error**: If the error is temporary (e.g., 5xx), it waits with exponential backoff and retries the request with the *same key*.
56
- - **Rotation Error (Rate Limit, Auth, etc.)**: For any other error, it's a trigger to rotate. `self.usage_manager.record_failure()` is called to apply a cooldown, and the lock is released. The inner `attempt` loop is broken, and the outer `while` loop continues, acquiring a new key.
57
 
58
  ### 2.2. `usage_manager.py` - Stateful Concurrency & Usage Management
59
 
@@ -66,14 +73,15 @@ This class is the stateful core of the library, managing concurrency, usage, and
66
 
67
  #### Tiered Key Acquisition (`acquire_key`)
68
 
69
- This method implements the intelligent logic for selecting the best key for a job.
70
 
71
- 1. **Filtering**: It first filters out any keys that are on a global or model-specific cooldown.
72
- 2. **Tiering**: It categorizes the remaining, valid keys into two tiers:
 
73
  - **Tier 1 (Ideal)**: Keys that are completely free (not being used by any model).
74
  - **Tier 2 (Acceptable)**: Keys that are currently in use, but for *different models* than the one being requested. This allows a single key to be used for concurrent calls to, for example, `gemini-1.5-pro` and `gemini-1.5-flash`.
75
- 3. **Selection**: It attempts to acquire a lock on a key, prioritizing Tier 1 over Tier 2. Within each tier, it prioritizes the key with the lowest usage count.
76
- 4. **Waiting**: If no keys in Tier 1 or Tier 2 can be locked, it means all eligible keys are currently handling requests for the *same model*. The method then `await`s on the `asyncio.Condition` of the best available key, waiting efficiently until it is notified that a key has been released.
77
 
78
  #### Failure Handling & Cooldowns (`record_failure`)
79
 
 
21
 
22
  The `RotatingClient` is the central class that orchestrates all operations. It is designed as a long-lived, async-native object.
23
 
24
+ #### Initialization
25
+
26
+ The client is initialized with your provider API keys, retry settings, and a new `global_timeout`.
27
+
28
+ ```python
29
+ client = RotatingClient(
30
+ api_keys=api_keys,
31
+ max_retries=2,
32
+ global_timeout=30 # in seconds
33
+ )
34
+ ```
35
+
36
+ - `global_timeout`: A crucial new parameter that sets a hard time limit for the entire request lifecycle, from the moment `acompletion` is called until a response is returned or the timeout is exceeded.
37
+
38
  #### Core Responsibilities
39
 
40
  * Managing a shared `httpx.AsyncClient` for all non-blocking HTTP requests.
41
  * Interfacing with the `UsageManager` to acquire and release API keys.
42
  * Dynamically loading and using provider-specific plugins from the `providers/` directory.
43
+ * Executing API calls via `litellm` with a robust, **deadline-driven** retry and rotation strategy.
44
  * Providing a safe, stateful wrapper for handling streaming responses.
45
 
46
+ #### Request Lifecycle: A Deadline-Driven Approach
47
+
48
+ The request lifecycle has been redesigned around a single, authoritative time budget to ensure predictable performance and prevent requests from hanging indefinitely.
49
+
50
+ 1. **Deadline Establishment**: The moment `acompletion` or `aembedding` is called, a `deadline` is calculated: `time.time() + self.global_timeout`. This `deadline` is the absolute point in time by which the entire operation must complete.
51
 
52
+ 2. **Deadline-Aware Key Rotation Loop**: The main `while` loop now has a critical secondary condition: `while len(tried_keys) < len(keys_for_provider) and time.time() < deadline:`. The loop will exit immediately if the `deadline` is reached, regardless of how many keys are left to try.
53
 
54
+ 3. **Deadline-Aware Key Acquisition**: The `self.usage_manager.acquire_key()` method now accepts the `deadline`. The `UsageManager` will not wait indefinitely for a key; if it cannot acquire one before the `deadline` is met, it will raise a `NoAvailableKeysError`, causing the request to fail fast with a "busy" error.
55
 
56
+ 4. **Deadline-Aware Retries**: When a transient error occurs, the client calculates the necessary `wait_time` for an exponential backoff. It then checks if this wait time fits within the remaining budget (`deadline - time.time()`).
57
+ - **If it fits**: It waits (`asyncio.sleep`) and retries with the same key.
58
+ - **If it exceeds the budget**: It skips the wait entirely, logs a warning, and immediately rotates to the next key to avoid wasting time.
59
 
60
+ 5. **Refined Error Propagation**:
61
+ - **Fatal Errors**: Invalid requests or authentication errors are raised immediately to the client.
62
+ - **Intermittent Errors**: Rate limits, server errors, and other temporary issues are now handled internally. The error is logged, the key is rotated, but the exception is **not** propagated to the end client. This prevents the client from seeing disruptive, intermittent failures.
63
+ - **Final Failure**: A non-streaming request will only return `None` (indicating failure) if either a) the global `deadline` is exceeded, or b) all keys for the provider have been tried and have failed. A streaming request will yield a final `[DONE]` with an error message in the same scenarios.
 
 
 
 
 
 
 
 
 
 
 
64
 
65
  ### 2.2. `usage_manager.py` - Stateful Concurrency & Usage Management
66
 
 
73
 
74
  #### Tiered Key Acquisition (`acquire_key`)
75
 
76
+ This method implements the intelligent logic for selecting the best key for a job, now with deadline awareness.
77
 
78
+ 1. **Deadline Enforcement**: The entire acquisition process runs in a `while time.time() < deadline:` loop. If a key cannot be found before the deadline, the method raises `NoAvailableKeysError`.
79
+ 2. **Filtering**: It first filters out any keys that are on a global or model-specific cooldown.
80
+ 3. **Tiering**: It categorizes the remaining, valid keys into two tiers:
81
  - **Tier 1 (Ideal)**: Keys that are completely free (not being used by any model).
82
  - **Tier 2 (Acceptable)**: Keys that are currently in use, but for *different models* than the one being requested. This allows a single key to be used for concurrent calls to, for example, `gemini-1.5-pro` and `gemini-1.5-flash`.
83
+ 4. **Selection**: It attempts to acquire a lock on a key, prioritizing Tier 1 over Tier 2. Within each tier, it prioritizes the key with the lowest usage count.
84
+ 5. **Waiting**: If no keys in Tier 1 or Tier 2 can be locked, it means all eligible keys are currently handling requests for the *same model*. The method then `await`s on the `asyncio.Condition` of the best available key. Crucially, this wait is itself timed out by the remaining request budget, preventing indefinite waits.
85
 
86
  #### Failure Handling & Cooldowns (`record_failure`)
87
 
README.md CHANGED
@@ -22,10 +22,11 @@ This project provides a robust, self-hosted solution for managing and rotating A
22
 
23
  ## Features
24
 
 
 
25
  - **Advanced Concurrency Control**: A single API key can handle multiple concurrent requests to different models, maximizing throughput.
26
  - **Smart Key Rotation**: Intelligently selects the least-used, available API key to distribute request loads evenly.
27
- - **Escalating Per-Model Cooldowns**: If a key fails for a specific model (e.g., due to rate limits), it's placed on a temporary, escalating cooldown for that model, allowing it to be used with others.
28
- - **Automatic Retries**: Automatically retries requests on transient server errors (e.g., 5xx status codes) with exponential backoff.
29
  - **Automatic Daily Resets**: Cooldowns and usage statistics are automatically reset daily, making the system self-maintaining.
30
  - **Request Logging**: Optional logging of full request and response payloads for easy debugging.
31
  - **Provider Agnostic**: Compatible with any provider supported by `litellm`.
 
22
 
23
  ## Features
24
 
25
+ - **Predictable Performance**: A new **global timeout** ensures that requests complete within a set time, preventing your application from hanging on slow or failing provider responses.
26
+ - **Resilient Error Handling**: The proxy now shields your application from transient backend errors. It handles rate limits and temporary provider issues internally by rotating keys, so your client only sees a failure if all options are exhausted or the timeout is hit.
27
  - **Advanced Concurrency Control**: A single API key can handle multiple concurrent requests to different models, maximizing throughput.
28
  - **Smart Key Rotation**: Intelligently selects the least-used, available API key to distribute request loads evenly.
29
+ - **Escalating Per-Model Cooldowns**: If a key fails for a specific model, it's placed on a temporary, escalating cooldown for that model, allowing it to be used with others.
 
30
  - **Automatic Daily Resets**: Cooldowns and usage statistics are automatically reset daily, making the system self-maintaining.
31
  - **Request Logging**: Optional logging of full request and response payloads for easy debugging.
32
  - **Provider Agnostic**: Compatible with any provider supported by `litellm`.
src/rotator_library/README.md CHANGED
@@ -7,9 +7,10 @@ A robust, asynchronous, and thread-safe client that intelligently rotates and re
7
  - **Asynchronous by Design**: Built with `asyncio` and `httpx` for high-performance, non-blocking I/O.
8
  - **Advanced Concurrency Control**: A single API key can be used for multiple concurrent requests to *different* models, maximizing throughput while ensuring thread safety. Requests for the *same model* using the same key are queued, preventing conflicts.
9
  - **Smart Key Rotation**: Acquires the least-used, available key using a tiered, model-aware locking strategy to distribute load evenly.
 
10
  - **Intelligent Error Handling**:
11
  - **Escalating Per-Model Cooldowns**: If a key fails, it's placed on a temporary, escalating cooldown for that specific model, allowing it to continue being used for others.
12
- - **Automatic Retries**: Retries requests on transient server errors (e.g., 5xx) with exponential backoff.
13
  - **Key-Level Lockouts**: If a key fails across multiple models, it's temporarily taken out of rotation entirely.
14
  - **Robust Streaming Support**: The client includes a wrapper for streaming responses that can reassemble fragmented JSON chunks and intelligently detect and handle errors that occur mid-stream.
15
  - **Detailed Usage Tracking**: Tracks daily and global usage for each key, including token counts and approximate cost, persisted to a JSON file.
@@ -56,13 +57,15 @@ if not api_keys:
56
  client = RotatingClient(
57
  api_keys=api_keys,
58
  max_retries=2,
59
- usage_file_path="key_usage.json"
 
60
  )
61
  ```
62
 
63
  - `api_keys`: A dictionary where keys are provider names (e.g., `"openai"`, `"gemini"`) and values are lists of API keys for that provider.
64
  - `max_retries`: The number of times to retry a request with the *same key* if a transient server error occurs.
65
  - `usage_file_path`: The path to the JSON file where key usage data will be stored.
 
66
 
67
  ### Concurrency and Resource Management
68
 
@@ -135,6 +138,14 @@ The client uses a sophisticated error handling mechanism:
135
  - **Rotation Errors (Rate Limit, Auth, etc.)**: The client records the failure in the `UsageManager`, which applies an escalating cooldown to the key for that specific model. The client then immediately acquires a new key and continues its attempt to complete the request.
136
  - **Key-Level Lockouts**: If a key fails on multiple different models, the `UsageManager` can apply a key-level lockout, taking it out of rotation entirely for a short period.
137
 
 
 
 
 
 
 
 
 
138
  ## Extending with Provider Plugins
139
 
140
  The library uses a dynamic plugin system. To add support for a new provider's model list, you only need to:
 
7
  - **Asynchronous by Design**: Built with `asyncio` and `httpx` for high-performance, non-blocking I/O.
8
  - **Advanced Concurrency Control**: A single API key can be used for multiple concurrent requests to *different* models, maximizing throughput while ensuring thread safety. Requests for the *same model* using the same key are queued, preventing conflicts.
9
  - **Smart Key Rotation**: Acquires the least-used, available key using a tiered, model-aware locking strategy to distribute load evenly.
10
+ - **Deadline-Driven Requests**: A global timeout ensures that no request, including all retries and key rotations, exceeds a specified time limit, preventing indefinite hangs.
11
  - **Intelligent Error Handling**:
12
  - **Escalating Per-Model Cooldowns**: If a key fails, it's placed on a temporary, escalating cooldown for that specific model, allowing it to continue being used for others.
13
+ - **Deadline-Aware Retries**: Retries requests on transient server errors with exponential backoff, but only if the wait time fits within the global request budget.
14
  - **Key-Level Lockouts**: If a key fails across multiple models, it's temporarily taken out of rotation entirely.
15
  - **Robust Streaming Support**: The client includes a wrapper for streaming responses that can reassemble fragmented JSON chunks and intelligently detect and handle errors that occur mid-stream.
16
  - **Detailed Usage Tracking**: Tracks daily and global usage for each key, including token counts and approximate cost, persisted to a JSON file.
 
57
  client = RotatingClient(
58
  api_keys=api_keys,
59
  max_retries=2,
60
+ usage_file_path="key_usage.json",
61
+ global_timeout=30 # Default is 30 seconds
62
  )
63
  ```
64
 
65
  - `api_keys`: A dictionary where keys are provider names (e.g., `"openai"`, `"gemini"`) and values are lists of API keys for that provider.
66
  - `max_retries`: The number of times to retry a request with the *same key* if a transient server error occurs.
67
  - `usage_file_path`: The path to the JSON file where key usage data will be stored.
68
+ - `global_timeout`: A hard time limit (in seconds) for the entire request lifecycle. If the total time exceeds this, the request will fail.
69
 
70
  ### Concurrency and Resource Management
71
 
 
138
  - **Rotation Errors (Rate Limit, Auth, etc.)**: The client records the failure in the `UsageManager`, which applies an escalating cooldown to the key for that specific model. The client then immediately acquires a new key and continues its attempt to complete the request.
139
  - **Key-Level Lockouts**: If a key fails on multiple different models, the `UsageManager` can apply a key-level lockout, taking it out of rotation entirely for a short period.
140
 
141
+ ### Global Timeout and Deadline-Driven Logic
142
+
143
+ To ensure predictable performance, the client now operates on a strict time budget defined by the `global_timeout` parameter.
144
+
145
+ - **Deadline Enforcement**: When a request starts, a `deadline` is set. The entire process, including all key rotations and retries, must complete before this deadline.
146
+ - **Deadline-Aware Retries**: If a retry requires a wait time that would exceed the remaining budget, the wait is skipped, and the client immediately rotates to the next key.
147
+ - **Silent Internal Errors**: Intermittent failures like rate limits or temporary server errors are logged internally but are **not raised** to the caller. The client will simply rotate to the next key. A non-streaming request will only return `None` (or a streaming request will end) if the global timeout is exceeded or all keys have been exhausted. This creates a more stable experience for the end-user, as they are shielded from transient backend issues.
148
+
149
  ## Extending with Provider Plugins
150
 
151
  The library uses a dynamic plugin system. To add support for a new provider's model list, you only need to:
src/rotator_library/client.py CHANGED
@@ -1,5 +1,6 @@
1
  import asyncio
2
  import json
 
3
  import os
4
  import random
5
  import httpx
@@ -33,7 +34,7 @@ class RotatingClient:
33
  A client that intelligently rotates and retries API keys using LiteLLM,
34
  with support for both streaming and non-streaming responses.
35
  """
36
- def __init__(self, api_keys: Dict[str, List[str]], max_retries: int = 2, usage_file_path: str = "key_usage.json", configure_logging: bool = True):
37
  os.environ["LITELLM_LOG"] = "ERROR"
38
  litellm.set_verbose = False
39
  litellm.drop_params = True
@@ -52,6 +53,7 @@ class RotatingClient:
52
  raise ValueError("API keys dictionary cannot be empty.")
53
  self.api_keys = api_keys
54
  self.max_retries = max_retries
 
55
  self.usage_manager = UsageManager(file_path=usage_file_path)
56
  self._model_list_cache = {}
57
  self._provider_plugins = PROVIDER_PLUGINS
@@ -227,25 +229,40 @@ class RotatingClient:
227
  if provider not in self.api_keys:
228
  raise ValueError(f"No API keys configured for provider: {provider}")
229
 
 
 
230
  keys_for_provider = self.api_keys[provider]
231
  tried_keys = set()
232
  last_exception = None
233
  kwargs = self._convert_model_params(**kwargs)
234
 
235
- while len(tried_keys) < len(keys_for_provider):
 
236
  current_key = None
237
  key_acquired = False
238
  try:
 
239
  if await self.cooldown_manager.is_cooling_down(provider):
240
- remaining_time = await self.cooldown_manager.get_cooldown_remaining(provider)
241
- lib_logger.warning(f"Provider {provider} is in cooldown. Waiting for {remaining_time:.2f} seconds.")
242
- await asyncio.sleep(remaining_time)
 
 
 
 
 
 
 
243
 
244
  keys_to_try = [k for k in keys_for_provider if k not in tried_keys]
245
  if not keys_to_try:
246
  break
247
 
248
- current_key = await self.usage_manager.acquire_key(available_keys=keys_to_try, model=model)
 
 
 
 
249
  key_acquired = True
250
  tried_keys.add(current_key)
251
 
@@ -308,7 +325,15 @@ class RotatingClient:
308
  lib_logger.warning(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
309
  break # Move to the next key
310
 
 
311
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
 
 
 
 
 
 
 
312
  error_message = str(e).split('\n')[0]
313
  lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Retrying in {wait_time:.2f} seconds.")
314
  await asyncio.sleep(wait_time)
@@ -341,27 +366,35 @@ class RotatingClient:
341
  await self.usage_manager.release_key(current_key, model)
342
 
343
  if last_exception:
344
- raise last_exception
 
 
345
 
346
- raise Exception("Failed to complete the request: No available API keys for the provider or all keys failed.")
 
347
 
348
  async def _streaming_acompletion_with_retry(self, request: Optional[Any], **kwargs) -> AsyncGenerator[str, None]:
349
  """A dedicated generator for retrying streaming completions with full request preparation and per-key retries."""
350
  model = kwargs.get("model")
351
  provider = model.split('/')[0]
352
  keys_for_provider = self.api_keys[provider]
 
353
  tried_keys = set()
354
  last_exception = None
355
  kwargs = self._convert_model_params(**kwargs)
356
  try:
357
- while len(tried_keys) < len(keys_for_provider):
358
  current_key = None
359
  key_acquired = False
360
  try:
361
  if await self.cooldown_manager.is_cooling_down(provider):
362
- remaining_time = await self.cooldown_manager.get_cooldown_remaining(provider)
363
- lib_logger.warning(f"Provider {provider} is in a global cooldown. All requests to this provider will be paused for {remaining_time:.2f} seconds.")
364
- await asyncio.sleep(remaining_time)
 
 
 
 
365
 
366
  keys_to_try = [k for k in keys_for_provider if k not in tried_keys]
367
  if not keys_to_try:
@@ -369,7 +402,11 @@ class RotatingClient:
369
  break
370
 
371
  lib_logger.info(f"Acquiring key for model {model}. Tried keys: {len(tried_keys)}/{len(keys_for_provider)}")
372
- current_key = await self.usage_manager.acquire_key(available_keys=keys_to_try, model=model)
 
 
 
 
373
  key_acquired = True
374
  tried_keys.add(current_key)
375
 
@@ -455,6 +492,11 @@ class RotatingClient:
455
  break
456
 
457
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
 
 
 
 
 
458
  lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type}. Retrying in {wait_time:.2f} seconds.")
459
  await asyncio.sleep(wait_time)
460
  continue
@@ -493,22 +535,23 @@ class RotatingClient:
493
  if key_acquired and current_key:
494
  await self.usage_manager.release_key(current_key, model)
495
 
 
496
  if last_exception:
497
- error_data = {"error": {"message": f"Failed to complete the streaming request. Last error: {str(last_exception)}", "type": "proxy_error"}}
498
- yield f"data: {json.dumps(error_data)}\n\n"
499
- else:
500
- error_data = {"error": {"message": "Failed to complete the streaming request: No available API keys after rotation.", "type": "proxy_error"}}
501
- yield f"data: {json.dumps(error_data)}\n\n"
502
-
503
  yield "data: [DONE]\n\n"
504
 
505
  except NoAvailableKeysError as e:
506
- lib_logger.error(f"A streaming request failed because no keys were available: {e}")
507
  error_data = {"error": {"message": str(e), "type": "proxy_busy"}}
508
  yield f"data: {json.dumps(error_data)}\n\n"
509
  yield "data: [DONE]\n\n"
510
  except Exception as e:
511
- lib_logger.error(f"An unhandled exception occurred in streaming retry logic: {e}")
 
512
  error_data = {"error": {"message": f"An unexpected error occurred: {str(e)}", "type": "proxy_internal_error"}}
513
  yield f"data: {json.dumps(error_data)}\n\n"
514
  yield "data: [DONE]\n\n"
 
1
  import asyncio
2
  import json
3
+ import time
4
  import os
5
  import random
6
  import httpx
 
34
  A client that intelligently rotates and retries API keys using LiteLLM,
35
  with support for both streaming and non-streaming responses.
36
  """
37
+ def __init__(self, api_keys: Dict[str, List[str]], max_retries: int = 2, usage_file_path: str = "key_usage.json", configure_logging: bool = True, global_timeout: int = 30):
38
  os.environ["LITELLM_LOG"] = "ERROR"
39
  litellm.set_verbose = False
40
  litellm.drop_params = True
 
53
  raise ValueError("API keys dictionary cannot be empty.")
54
  self.api_keys = api_keys
55
  self.max_retries = max_retries
56
+ self.global_timeout = global_timeout
57
  self.usage_manager = UsageManager(file_path=usage_file_path)
58
  self._model_list_cache = {}
59
  self._provider_plugins = PROVIDER_PLUGINS
 
229
  if provider not in self.api_keys:
230
  raise ValueError(f"No API keys configured for provider: {provider}")
231
 
232
+ # Establish a global deadline for the entire request lifecycle.
233
+ deadline = time.time() + self.global_timeout
234
  keys_for_provider = self.api_keys[provider]
235
  tried_keys = set()
236
  last_exception = None
237
  kwargs = self._convert_model_params(**kwargs)
238
 
239
+ # The main rotation loop. It continues as long as there are untried keys and the global deadline has not been exceeded.
240
+ while len(tried_keys) < len(keys_for_provider) and time.time() < deadline:
241
  current_key = None
242
  key_acquired = False
243
  try:
244
+ # Check for a provider-wide cooldown first.
245
  if await self.cooldown_manager.is_cooling_down(provider):
246
+ remaining_cooldown = await self.cooldown_manager.get_cooldown_remaining(provider)
247
+ remaining_budget = deadline - time.time()
248
+
249
+ # If the cooldown is longer than the remaining time budget, fail fast.
250
+ if remaining_cooldown > remaining_budget:
251
+ lib_logger.warning(f"Provider {provider} cooldown ({remaining_cooldown:.2f}s) exceeds remaining request budget ({remaining_budget:.2f}s). Failing early.")
252
+ break
253
+
254
+ lib_logger.warning(f"Provider {provider} is in cooldown. Waiting for {remaining_cooldown:.2f} seconds.")
255
+ await asyncio.sleep(remaining_cooldown)
256
 
257
  keys_to_try = [k for k in keys_for_provider if k not in tried_keys]
258
  if not keys_to_try:
259
  break
260
 
261
+ current_key = await self.usage_manager.acquire_key(
262
+ available_keys=keys_to_try,
263
+ model=model,
264
+ deadline=deadline
265
+ )
266
  key_acquired = True
267
  tried_keys.add(current_key)
268
 
 
325
  lib_logger.warning(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
326
  break # Move to the next key
327
 
328
+ # For temporary errors, wait before retrying with the same key.
329
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
330
+ remaining_budget = deadline - time.time()
331
+
332
+ # If the required wait time exceeds the budget, don't wait; rotate to the next key immediately.
333
+ if wait_time > remaining_budget:
334
+ lib_logger.warning(f"Required retry wait time ({wait_time:.2f}s) exceeds remaining budget ({remaining_budget:.2f}s). Rotating key early.")
335
+ break
336
+
337
  error_message = str(e).split('\n')[0]
338
  lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Retrying in {wait_time:.2f} seconds.")
339
  await asyncio.sleep(wait_time)
 
366
  await self.usage_manager.release_key(current_key, model)
367
 
368
  if last_exception:
369
+ # Log the final error but do not raise it, as per the new requirement.
370
+ # The client should not see intermittent failures.
371
+ lib_logger.error(f"Request failed after trying all keys or exceeding global timeout. Last error: {last_exception}")
372
 
373
+ # Return None to indicate failure without propagating a disruptive exception.
374
+ return None
375
 
376
  async def _streaming_acompletion_with_retry(self, request: Optional[Any], **kwargs) -> AsyncGenerator[str, None]:
377
  """A dedicated generator for retrying streaming completions with full request preparation and per-key retries."""
378
  model = kwargs.get("model")
379
  provider = model.split('/')[0]
380
  keys_for_provider = self.api_keys[provider]
381
+ deadline = time.time() + self.global_timeout
382
  tried_keys = set()
383
  last_exception = None
384
  kwargs = self._convert_model_params(**kwargs)
385
  try:
386
+ while len(tried_keys) < len(keys_for_provider) and time.time() < deadline:
387
  current_key = None
388
  key_acquired = False
389
  try:
390
  if await self.cooldown_manager.is_cooling_down(provider):
391
+ remaining_cooldown = await self.cooldown_manager.get_cooldown_remaining(provider)
392
+ remaining_budget = deadline - time.time()
393
+ if remaining_cooldown > remaining_budget:
394
+ lib_logger.warning(f"Provider {provider} cooldown ({remaining_cooldown:.2f}s) exceeds remaining request budget ({remaining_budget:.2f}s). Failing early.")
395
+ break
396
+ lib_logger.warning(f"Provider {provider} is in a global cooldown. All requests to this provider will be paused for {remaining_cooldown:.2f} seconds.")
397
+ await asyncio.sleep(remaining_cooldown)
398
 
399
  keys_to_try = [k for k in keys_for_provider if k not in tried_keys]
400
  if not keys_to_try:
 
402
  break
403
 
404
  lib_logger.info(f"Acquiring key for model {model}. Tried keys: {len(tried_keys)}/{len(keys_for_provider)}")
405
+ current_key = await self.usage_manager.acquire_key(
406
+ available_keys=keys_to_try,
407
+ model=model,
408
+ deadline=deadline
409
+ )
410
  key_acquired = True
411
  tried_keys.add(current_key)
412
 
 
492
  break
493
 
494
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
495
+ remaining_budget = deadline - time.time()
496
+ if wait_time > remaining_budget:
497
+ lib_logger.warning(f"Required retry wait time ({wait_time:.2f}s) exceeds remaining budget ({remaining_budget:.2f}s). Rotating key early.")
498
+ break
499
+
500
  lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type}. Retrying in {wait_time:.2f} seconds.")
501
  await asyncio.sleep(wait_time)
502
  continue
 
535
  if key_acquired and current_key:
536
  await self.usage_manager.release_key(current_key, model)
537
 
538
+ final_error_message = "Failed to complete the streaming request: No available API keys after rotation or global timeout exceeded."
539
  if last_exception:
540
+ final_error_message = f"Failed to complete the streaming request. Last error: {str(last_exception)}"
541
+ lib_logger.error(f"Streaming request failed after trying all keys. Last error: {last_exception}")
542
+
543
+ error_data = {"error": {"message": final_error_message, "type": "proxy_error"}}
544
+ yield f"data: {json.dumps(error_data)}\n\n"
 
545
  yield "data: [DONE]\n\n"
546
 
547
  except NoAvailableKeysError as e:
548
+ lib_logger.error(f"A streaming request failed because no keys were available within the time budget: {e}")
549
  error_data = {"error": {"message": str(e), "type": "proxy_busy"}}
550
  yield f"data: {json.dumps(error_data)}\n\n"
551
  yield "data: [DONE]\n\n"
552
  except Exception as e:
553
+ # This will now only catch fatal errors that should be raised, like invalid requests.
554
+ lib_logger.error(f"An unhandled exception occurred in streaming retry logic: {e}", exc_info=True)
555
  error_data = {"error": {"message": f"An unexpected error occurred: {str(e)}", "type": "proxy_internal_error"}}
556
  yield f"data: {json.dumps(error_data)}\n\n"
557
  yield "data: [DONE]\n\n"
src/rotator_library/usage_manager.py CHANGED
@@ -20,10 +20,9 @@ class UsageManager:
20
  Manages usage statistics and cooldowns for API keys with asyncio-safe locking,
21
  asynchronous file I/O, and a lazy-loading mechanism for usage data.
22
  """
23
- def __init__(self, file_path: str = "key_usage.json", wait_timeout: int = 13, daily_reset_time_utc: Optional[str] = "03:00"):
24
  self.file_path = file_path
25
  self.key_states: Dict[str, Dict[str, Any]] = {}
26
- self.wait_timeout = wait_timeout
27
 
28
  self._data_lock = asyncio.Lock()
29
  self._usage_data: Optional[Dict] = None
@@ -129,18 +128,21 @@ class UsageManager:
129
  "models_in_use": set()
130
  }
131
 
132
- async def acquire_key(self, available_keys: List[str], model: str) -> str:
133
  """
134
- Acquires the best available key using a tiered, model-aware locking strategy.
 
135
  """
136
  await self._lazy_init()
137
  self._initialize_key_states(available_keys)
138
 
139
- start_time = time.time()
140
- while time.time() - start_time < self.wait_timeout:
141
  tier1_keys, tier2_keys = [], []
 
 
 
142
  async with self._data_lock:
143
- now = time.time()
144
  for key in available_keys:
145
  key_data = self._usage_data.get(key, {})
146
 
@@ -148,17 +150,21 @@ class UsageManager:
148
  (key_data.get("model_cooldowns", {}).get(model) or 0) > now:
149
  continue
150
 
 
151
  usage_count = key_data.get("daily", {}).get("models", {}).get(model, {}).get("success_count", 0)
152
  key_state = self.key_states[key]
153
 
 
154
  if not key_state["models_in_use"]:
155
  tier1_keys.append((key, usage_count))
 
156
  elif model not in key_state["models_in_use"]:
157
  tier2_keys.append((key, usage_count))
158
 
159
  tier1_keys.sort(key=lambda x: x[1])
160
  tier2_keys.sort(key=lambda x: x[1])
161
 
 
162
  for key, _ in tier1_keys:
163
  state = self.key_states[key]
164
  async with state["lock"]:
@@ -167,6 +173,7 @@ class UsageManager:
167
  lib_logger.info(f"Acquired Tier 1 key ...{key[-4:]} for model {model}")
168
  return key
169
 
 
170
  for key, _ in tier2_keys:
171
  state = self.key_states[key]
172
  async with state["lock"]:
@@ -175,6 +182,7 @@ class UsageManager:
175
  lib_logger.info(f"Acquired Tier 2 key ...{key[-4:]} for model {model}")
176
  return key
177
 
 
178
  lib_logger.info("All eligible keys are currently locked for this model. Waiting...")
179
 
180
  all_potential_keys = tier1_keys + tier2_keys
@@ -183,20 +191,24 @@ class UsageManager:
183
  await asyncio.sleep(1)
184
  continue
185
 
 
186
  best_wait_key = min(all_potential_keys, key=lambda x: x[1])[0]
187
  wait_condition = self.key_states[best_wait_key]["condition"]
188
 
189
  try:
190
  async with wait_condition:
191
- remaining_timeout = self.wait_timeout - (time.time() - start_time)
192
- if remaining_timeout <= 0:
193
- break
194
- await asyncio.wait_for(wait_condition.wait(), timeout=min(1, remaining_timeout))
 
195
  lib_logger.info("Notified that a key was released. Re-evaluating...")
196
  except asyncio.TimeoutError:
 
197
  lib_logger.info("Wait timed out. Re-evaluating for any available key.")
198
 
199
- raise NoAvailableKeysError(f"Could not acquire a key for model {model} within the {self.wait_timeout}s timeout.")
 
200
 
201
 
202
  async def release_key(self, key: str, model: str):
 
20
  Manages usage statistics and cooldowns for API keys with asyncio-safe locking,
21
  asynchronous file I/O, and a lazy-loading mechanism for usage data.
22
  """
23
+ def __init__(self, file_path: str = "key_usage.json", daily_reset_time_utc: Optional[str] = "03:00"):
24
  self.file_path = file_path
25
  self.key_states: Dict[str, Dict[str, Any]] = {}
 
26
 
27
  self._data_lock = asyncio.Lock()
28
  self._usage_data: Optional[Dict] = None
 
128
  "models_in_use": set()
129
  }
130
 
131
+ async def acquire_key(self, available_keys: List[str], model: str, deadline: float) -> str:
132
  """
133
+ Acquires the best available key using a tiered, model-aware locking strategy,
134
+ respecting a global deadline.
135
  """
136
  await self._lazy_init()
137
  self._initialize_key_states(available_keys)
138
 
139
+ # This loop continues as long as the global deadline has not been met.
140
+ while time.time() < deadline:
141
  tier1_keys, tier2_keys = [], []
142
+ now = time.time()
143
+
144
+ # First, filter the list of available keys to exclude any on cooldown.
145
  async with self._data_lock:
 
146
  for key in available_keys:
147
  key_data = self._usage_data.get(key, {})
148
 
 
150
  (key_data.get("model_cooldowns", {}).get(model) or 0) > now:
151
  continue
152
 
153
+ # Prioritize keys based on their current usage to ensure load balancing.
154
  usage_count = key_data.get("daily", {}).get("models", {}).get(model, {}).get("success_count", 0)
155
  key_state = self.key_states[key]
156
 
157
+ # Tier 1: Completely idle keys (preferred).
158
  if not key_state["models_in_use"]:
159
  tier1_keys.append((key, usage_count))
160
+ # Tier 2: Keys busy with other models, but free for this one.
161
  elif model not in key_state["models_in_use"]:
162
  tier2_keys.append((key, usage_count))
163
 
164
  tier1_keys.sort(key=lambda x: x[1])
165
  tier2_keys.sort(key=lambda x: x[1])
166
 
167
+ # Attempt to acquire a key from Tier 1 first.
168
  for key, _ in tier1_keys:
169
  state = self.key_states[key]
170
  async with state["lock"]:
 
173
  lib_logger.info(f"Acquired Tier 1 key ...{key[-4:]} for model {model}")
174
  return key
175
 
176
+ # If no Tier 1 keys are available, try Tier 2.
177
  for key, _ in tier2_keys:
178
  state = self.key_states[key]
179
  async with state["lock"]:
 
182
  lib_logger.info(f"Acquired Tier 2 key ...{key[-4:]} for model {model}")
183
  return key
184
 
185
+ # If all eligible keys are locked, wait for a key to be released.
186
  lib_logger.info("All eligible keys are currently locked for this model. Waiting...")
187
 
188
  all_potential_keys = tier1_keys + tier2_keys
 
191
  await asyncio.sleep(1)
192
  continue
193
 
194
+ # Wait on the condition of the key with the lowest current usage.
195
  best_wait_key = min(all_potential_keys, key=lambda x: x[1])[0]
196
  wait_condition = self.key_states[best_wait_key]["condition"]
197
 
198
  try:
199
  async with wait_condition:
200
+ remaining_budget = deadline - time.time()
201
+ if remaining_budget <= 0:
202
+ break # Exit if the budget has already been exceeded.
203
+ # Wait for a notification, but no longer than the remaining budget or 1 second.
204
+ await asyncio.wait_for(wait_condition.wait(), timeout=min(1, remaining_budget))
205
  lib_logger.info("Notified that a key was released. Re-evaluating...")
206
  except asyncio.TimeoutError:
207
+ # This is not an error, just a timeout for the wait. The main loop will re-evaluate.
208
  lib_logger.info("Wait timed out. Re-evaluating for any available key.")
209
 
210
+ # If the loop exits, it means the deadline was exceeded.
211
+ raise NoAvailableKeysError(f"Could not acquire a key for model {model} within the global time budget.")
212
 
213
 
214
  async def release_key(self, key: str, model: str):