Spaces:
Paused
Paused
| # Resilience & API Key Management Library | |
| A robust, asynchronous, and thread-safe Python library for managing a pool of API keys. It is designed to be integrated into applications (such as the Universal LLM API Proxy included in this project) to provide a powerful layer of resilience and high availability when interacting with multiple LLM providers. | |
| ## Key Features | |
| - **Asynchronous by Design**: Built with `asyncio` and `httpx` for high-performance, non-blocking I/O. | |
| - **Advanced Concurrency Control**: A single API key can be used for multiple concurrent requests. By default, it supports concurrent requests to *different* models. With configuration (`MAX_CONCURRENT_REQUESTS_PER_KEY_<PROVIDER>`), it can also support multiple concurrent requests to the *same* model using the same key. | |
| - **Smart Key Management**: Selects the optimal key for each request using a tiered, model-aware locking strategy to distribute load evenly and maximize availability. | |
| - **Configurable Rotation Strategy**: Choose between deterministic least-used selection (perfect balance) or default weighted random selection (unpredictable, harder to fingerprint). | |
| - **Deadline-Driven Requests**: A global timeout ensures that no request, including all retries and key selections, exceeds a specified time limit. | |
| - **OAuth & API Key Support**: Built-in support for standard API keys and complex OAuth flows. | |
| - **Gemini CLI**: Full OAuth 2.0 web flow with automatic project discovery, free-tier onboarding, and credential prioritization (paid vs free tier). | |
| - **Antigravity**: Full OAuth 2.0 support for Gemini 3, Gemini 2.5, and Claude Sonnet 4.5 models with thought signature caching(Full support for Gemini 3 and Claude models). **First on the scene to provide full support for Gemini 3** via Antigravity with advanced features like thought signature caching and tool hallucination prevention. | |
| - **Qwen Code**: Device Code flow support. | |
| - **iFlow**: Authorization Code flow with local callback handling. | |
| - **Stateless Deployment Ready**: Can load complex OAuth credentials from environment variables, eliminating the need for physical credential files in containerized environments. | |
| - **Intelligent Error Handling**: | |
| - **Escalating Per-Model Cooldowns**: Failed keys are placed on a temporary, escalating cooldown for specific models. | |
| - **Key-Level Lockouts**: Keys failing across multiple models are temporarily removed from rotation. | |
| - **Stream Recovery**: The client detects mid-stream errors (like quota limits) and gracefully handles them. | |
| - **Credential Prioritization**: Automatic tier detection and priority-based credential selection (e.g., paid tier credentials used first for models that require them). | |
| - **Advanced Model Requirements**: Support for model-tier restrictions (e.g., Gemini 3 requires paid-tier credentials). | |
| - **Robust Streaming Support**: Includes a wrapper for streaming responses that reassembles fragmented JSON chunks. | |
| - **Detailed Usage Tracking**: Tracks daily and global usage for each key, persisted to a JSON file. | |
| - **Automatic Daily Resets**: Automatically resets cooldowns and archives stats daily. | |
| - **Provider Agnostic**: Works with any provider supported by `litellm`. | |
| - **Extensible**: Easily add support for new providers through a simple plugin-based architecture. | |
| - **Temperature Override**: Global temperature=0 override to prevent tool hallucination with low-temperature settings. | |
| - **Shared OAuth Base**: Refactored OAuth implementation with reusable [`GoogleOAuthBase`](providers/google_oauth_base.py) for multiple providers. | |
| ## Installation | |
| To install the library, you can install it directly from a local path. Using the `-e` flag installs it in "editable" mode, which is recommended for development. | |
| ```bash | |
| pip install -e . | |
| ``` | |
| ## `RotatingClient` Class | |
| This is the main class for interacting with the library. It is designed to be a long-lived object that manages the state of your API key pool. | |
| ### Initialization | |
| ```python | |
| import os | |
| from dotenv import load_dotenv | |
| from rotator_library import RotatingClient | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Dynamically load all provider API keys from environment variables | |
| api_keys = {} | |
| for key, value in os.environ.items(): | |
| # This pattern finds keys like "GEMINI_API_KEY_1" or "OPENAI_API_KEY" | |
| if (key.endswith("_API_KEY") or "_API_KEY_" in key) and key != "PROXY_API_KEY": | |
| # Extracts "gemini" from "GEMINI_API_KEY_1" | |
| provider = key.split("_API_KEY")[0].lower() | |
| if provider not in api_keys: | |
| api_keys[provider] = [] | |
| api_keys[provider].append(value) | |
| # Initialize empty dictionary for OAuth credentials (or load from CredentialManager) | |
| oauth_credentials = {} | |
| client = RotatingClient( | |
| api_keys=api_keys, | |
| oauth_credentials=oauth_credentials, | |
| max_retries=2, | |
| usage_file_path="key_usage.json", | |
| configure_logging=True, | |
| global_timeout=30, | |
| abort_on_callback_error=True, | |
| litellm_provider_params={}, | |
| ignore_models={}, | |
| whitelist_models={}, | |
| enable_request_logging=False, | |
| max_concurrent_requests_per_key={}, | |
| rotation_tolerance=2.0 # 0.0=deterministic, 2.0=recommended random | |
| ) | |
| ``` | |
| #### Arguments | |
| - `api_keys` (`Optional[Dict[str, List[str]]]`): A dictionary mapping provider names (e.g., "openai", "anthropic") to a list of API keys. | |
| - `oauth_credentials` (`Optional[Dict[str, List[str]]]`): A dictionary mapping provider names (e.g., "gemini_cli", "qwen_code") to a list of file paths to OAuth credential JSON files. | |
| - `max_retries` (`int`, default: `2`): The number of times to retry a request with the *same key* if a transient server error (e.g., 500, 503) occurs. | |
| - `usage_file_path` (`str`, default: `"key_usage.json"`): The path to the JSON file where usage statistics (tokens, cost, success counts) are persisted. | |
| - `configure_logging` (`bool`, default: `True`): If `True`, configures the library's logger to propagate logs to the root logger. Set to `False` if you want to handle logging configuration manually. | |
| - `global_timeout` (`int`, default: `30`): A hard time limit (in seconds) for the entire request lifecycle. If the request (including all retries) takes longer than this, it is aborted. | |
| - `abort_on_callback_error` (`bool`, default: `True`): If `True`, any exception raised by `pre_request_callback` will abort the request. If `False`, the error is logged and the request proceeds. | |
| - `litellm_provider_params` (`Optional[Dict[str, Any]]`, default: `None`): A dictionary of extra parameters to pass to `litellm` for specific providers. | |
| - `ignore_models` (`Optional[Dict[str, List[str]]]`, default: `None`): A dictionary where keys are provider names and values are lists of model names/patterns to exclude (blacklist). Supports wildcards (e.g., `"*-preview"`). | |
| - `whitelist_models` (`Optional[Dict[str, List[str]]]`, default: `None`): A dictionary where keys are provider names and values are lists of model names/patterns to always include, overriding `ignore_models`. | |
| - `enable_request_logging` (`bool`, default: `False`): If `True`, enables detailed per-request file logging (useful for debugging complex interactions). | |
| - `max_concurrent_requests_per_key` (`Optional[Dict[str, int]]`, default: `None`): A dictionary defining the maximum number of concurrent requests allowed for a single API key for a specific provider. Defaults to 1 if not specified. | |
| - `rotation_tolerance` (`float`, default: `0.0`): Controls credential rotation strategy: | |
| - `0.0`: **Deterministic** - Always selects the least-used credential for perfect load balance. | |
| - `2.0` (default, recommended): **Weighted Random** - Randomly selects credentials with bias toward less-used ones. Provides unpredictability (harder to fingerprint) while maintaining good balance. | |
| - `5.0+`: **High Randomness** - Even heavily-used credentials have significant selection probability. Maximum unpredictability. | |
| The weight formula is: `weight = (max_usage - credential_usage) + tolerance + 1` | |
| **Use Cases:** | |
| - `0.0`: When perfect load balance is critical | |
| - `2.0`: When avoiding fingerprinting/rate limit detection is important | |
| - `5.0+`: For stress testing or maximum unpredictability | |
| ### Concurrency and Resource Management | |
| The `RotatingClient` is asynchronous and manages an `httpx.AsyncClient` internally. It's crucial to close the client properly to release resources. The recommended way is to use an `async with` block. | |
| ```python | |
| import asyncio | |
| async def main(): | |
| async with RotatingClient(api_keys=api_keys) as client: | |
| # ... use the client ... | |
| response = await client.acompletion( | |
| model="gemini/gemini-1.5-flash", | |
| messages=[{"role": "user", "content": "Hello!"}] | |
| ) | |
| print(response) | |
| asyncio.run(main()) | |
| ``` | |
| ### Methods | |
| #### `async def acompletion(self, **kwargs) -> Any:` | |
| This is the primary method for making API calls. It's a wrapper around `litellm.acompletion` that adds the core logic for key acquisition, selection, and retries. | |
| - **Parameters**: Accepts the same keyword arguments as `litellm.acompletion`. The `model` parameter is required and must be a string in the format `provider/model_name`. | |
| - **Returns**: | |
| - For non-streaming requests, it returns the `litellm` response object. | |
| - For streaming requests, it returns an async generator that yields OpenAI-compatible Server-Sent Events (SSE). The wrapper ensures that key locks are released and usage is recorded only after the stream is fully consumed. | |
| **Streaming Example:** | |
| ```python | |
| async def stream_example(): | |
| async with RotatingClient(api_keys=api_keys) as client: | |
| response_stream = await client.acompletion( | |
| model="gemini/gemini-1.5-flash", | |
| messages=[{"role": "user", "content": "Tell me a long story."}], | |
| stream=True | |
| ) | |
| async for chunk in response_stream: | |
| print(chunk) | |
| asyncio.run(stream_example()) | |
| ``` | |
| #### `async def aembedding(self, **kwargs) -> Any:` | |
| A wrapper around `litellm.aembedding` that provides the same key management and retry logic for embedding requests. | |
| #### `def token_count(self, model: str, text: str = None, messages: List[Dict[str, str]] = None) -> int:` | |
| Calculates the token count for a given text or list of messages using `litellm.token_counter`. | |
| #### `async def get_available_models(self, provider: str) -> List[str]:` | |
| Fetches a list of available models for a specific provider, applying any configured whitelists or blacklists. Results are cached in memory. | |
| #### `async def get_all_available_models(self, grouped: bool = True) -> Union[Dict[str, List[str]], List[str]]:` | |
| Fetches a dictionary of all available models, grouped by provider, or as a single flat list if `grouped=False`. | |
| ## Credential Tool | |
| The library includes a utility to manage credentials easily: | |
| ```bash | |
| python -m src.rotator_library.credential_tool | |
| ``` | |
| Use this tool to: | |
| 1. **Initialize OAuth**: Run the interactive login flows for Gemini, Qwen, and iFlow. | |
| 2. **Export Credentials**: Generate `.env` compatible configuration blocks from your saved OAuth JSON files. This is essential for setting up stateless deployments. | |
| ## Provider Specifics | |
| ### Qwen Code | |
| - **Auth**: Uses OAuth 2.0 Device Flow. Requires manual entry of email/identifier if not returned by the provider. | |
| - **Resilience**: Injects a dummy tool (`do_not_call_me`) into requests with no tools to prevent known stream corruption issues on the API. | |
| - **Reasoning**: Parses `<think>` tags in the response and exposes them as `reasoning_content`. | |
| - **Schema Cleaning**: Recursively removes `strict` and `additionalProperties` from all tool schemas. Qwen's API has stricter validation than OpenAI's, and these properties cause `400 Bad Request` errors. | |
| ### iFlow | |
| - **Auth**: Uses Authorization Code Flow with a local callback server (port 11451). | |
| - **Key Separation**: Distinguishes between the OAuth `access_token` (used to fetch user info) and the `api_key` (used for actual chat requests). | |
| - **Resilience**: Similar to Qwen, injects a placeholder tool to stabilize streaming for empty tool lists. | |
| - **Schema Cleaning**: Recursively removes `strict` and `additionalProperties` from all tool schemas to prevent API validation errors. | |
| - **Custom Models**: Supports model definitions via `IFLOW_MODELS` environment variable (JSON array of model IDs or objects). | |
| ### NVIDIA NIM | |
| - **Discovery**: Dynamically fetches available models from the NVIDIA API. | |
| - **Thinking**: Automatically injects the `thinking` parameter into `extra_body` for DeepSeek models (`deepseek-v3.1`, etc.) when `reasoning_effort` is set to low/medium/high. | |
| ### Google Gemini (CLI) | |
| - **Auth**: Simulates the Google Cloud CLI authentication flow. | |
| - **Project Discovery**: Automatically discovers the default Google Cloud Project ID with enhanced onboarding flow. | |
| - **Credential Prioritization**: Automatic detection and prioritization of paid vs free tier credentials. | |
| - **Model Tier Requirements**: Gemini 3 models automatically filtered to paid-tier credentials only. | |
| - **Gemini 3 Support**: Full support for Gemini 3 models with: | |
| - `thinkingLevel` configuration (low/high) | |
| - Tool hallucination prevention via system instruction injection | |
| - ThoughtSignature caching for multi-turn conversations | |
| - Parameter signature injection into tool descriptions | |
| - **Rate Limits**: Implements smart fallback strategies (e.g., switching from `gemini-1.5-pro` to `gemini-1.5-pro-002`) when rate limits are hit. | |
| ### Antigravity | |
| - **Auth**: Uses OAuth 2.0 flow similar to Gemini CLI, with Antigravity-specific credentials and scopes. | |
| - **Credential Prioritization**: Automatic detection and prioritization of paid vs free tier credentials (paid tier resets every 5 hours, free tier resets weekly). | |
| - **Models**: Supports Gemini 3 Pro, Gemini 2.5 Flash/Flash Lite, Claude Sonnet 4.5 (with/without thinking), Claude Opus 4.5 (thinking only), and GPT-OSS 120B via Google's internal Antigravity API. | |
| - **Quota Groups**: Models that share quota are automatically grouped: | |
| - Claude/GPT-OSS: `claude-sonnet-4-5`, `claude-opus-4-5`, `gpt-oss-120b-medium` | |
| - Gemini 3 Pro: `gemini-3-pro-high`, `gemini-3-pro-low`, `gemini-3-pro-preview` | |
| - Gemini 2.5 Flash: `gemini-2.5-flash`, `gemini-2.5-flash-thinking`, `gemini-2.5-flash-lite` | |
| - All models in a group deplete the usage of the group equally. So in claude group - it is beneficial to use only Opus, and forget about Sonnet and GPT-OSS. | |
| - **Quota Baseline Tracking**: Background job fetches quota status from API every 5 minutes to provide accurate remaining quota estimates. | |
| - **Thought Signature Caching**: Server-side caching of `thoughtSignature` data for multi-turn conversations with Gemini 3 models. | |
| - **Tool Hallucination Prevention**: Automatic injection of system instructions and parameter signatures for Gemini 3 and Claude to prevent tool parameter hallucination. | |
| - **Parallel Tool Usage Instruction**: Configurable instruction injection to encourage parallel tool calls (enabled by default for Claude). | |
| - **Thinking Support**: | |
| - Gemini 3: Uses `thinkingLevel` (string: "low"/"high") | |
| - Gemini 2.5 Flash: Uses `-thinking` variant when `reasoning_effort` is provided | |
| - Claude Sonnet 4.5: Uses `thinkingBudget` (optional - supports both thinking and non-thinking modes) | |
| - Claude Opus 4.5: Uses `thinkingBudget` (always uses thinking variant) | |
| - **Base URL Fallback**: Automatic fallback between sandbox and production endpoints. | |
| ## Error Handling and Cooldowns | |
| The client uses a sophisticated error handling mechanism: | |
| - **Error Classification**: All exceptions from `litellm` are passed through a `classify_error` function to determine their type (`rate_limit`, `authentication`, `server_error`, `quota`, `context_length`, etc.). | |
| - **Server Errors**: The client will retry the request with the *same key* up to `max_retries` times, using an exponential backoff strategy. | |
| - **Key-Specific Errors (Authentication, Quota, etc.)**: The client records the failure in the `UsageManager`, which applies an escalating cooldown to the key for that specific model. The client then immediately acquires a new key and continues its attempt to complete the request. | |
| - **Escalating Cooldown Strategy**: Consecutive failures for a key on the same model result in increasing cooldown períods: | |
| - 1st failure: 10 seconds | |
| - 2nd failure: 30 seconds | |
| - 3rd failure: 60 seconds | |
| - 4th+ failure: 120 seconds | |
| - **Key-Level Lockouts**: If a key fails on multiple different models (3+ distinct models), the `UsageManager` applies a global 5-minute lockout for that key, removing it from rotation entirely. | |
| - **Authentication Errors**: Immediate 5-minute global lockout (key is assumed revoked or invalid). | |
| ### Global Timeout and Deadline-Driven Logic | |
| To ensure predictable performance, the client now operates on a strict time budget defined by the `global_timeout` parameter. | |
| - **Deadline Enforcement**: When a request starts, a `deadline` is set. The entire process, including all key rotations and retries, must complete before this deadline. | |
| - **Deadline-Aware Retries**: If a retry requires a wait time that would exceed the remaining budget, the wait is skipped, and the client immediately rotates to the next key. | |
| - **Silent Internal Errors**: Intermittent failures like provider capacity limits or temporary server errors are logged internally but are **not raised** to the caller. The client will simply rotate to the next key. | |
| ## Extending with Provider Plugins | |
| The library uses a dynamic plugin system. To add support for a new provider's model list, you only need to: | |
| 1. **Create a new provider file** in `src/rotator_library/providers/` (e.g., `my_provider.py`). | |
| 2. **Implement the `ProviderInterface`**: Inside your new file, create a class that inherits from `ProviderInterface` and implements the `get_models` method. | |
| ```python | |
| # src/rotator_library/providers/my_provider.py | |
| from .provider_interface import ProviderInterface | |
| from typing import List | |
| import httpx | |
| class MyProvider(ProviderInterface): | |
| async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]: | |
| # Logic to fetch and return a list of model names | |
| # The credential argument allows using the key to fetch models | |
| pass | |
| ``` | |
| The system will automatically discover and register your new provider. | |
| ## Detailed Documentation | |
| For a more in-depth technical explanation of the library's architecture, including the `UsageManager`'s concurrency model and the error classification system, please refer to the [Technical Documentation](../../DOCUMENTATION.md). | |