Spaces:
Paused
Resilience & API Key Management Library
A robust, asynchronous, and thread-safe Python library for managing a pool of API keys. It is designed to be integrated into applications (such as the Universal LLM API Proxy included in this project) to provide a powerful layer of resilience and high availability when interacting with multiple LLM providers.
Key Features
- Asynchronous by Design: Built with
asyncioandhttpxfor high-performance, non-blocking I/O. - Advanced Concurrency Control: A single API key can be used for multiple concurrent requests. By default, it supports concurrent requests to different models. With configuration (
MAX_CONCURRENT_REQUESTS_PER_KEY_<PROVIDER>), it can also support multiple concurrent requests to the same model using the same key. - Smart Key Management: Selects the optimal key for each request using a tiered, model-aware locking strategy to distribute load evenly and maximize availability.
- Configurable Rotation Strategy: Choose between deterministic least-used selection (perfect balance) or default weighted random selection (unpredictable, harder to fingerprint).
- Deadline-Driven Requests: A global timeout ensures that no request, including all retries and key selections, exceeds a specified time limit.
- OAuth & API Key Support: Built-in support for standard API keys and complex OAuth flows.
- Gemini CLI: Full OAuth 2.0 web flow with automatic project discovery, free-tier onboarding, and credential prioritization (paid vs free tier).
- Antigravity: Full OAuth 2.0 support for Gemini 3, Gemini 2.5, and Claude Sonnet 4.5 models with thought signature caching(Full support for Gemini 3 and Claude models). First on the scene to provide full support for Gemini 3 via Antigravity with advanced features like thought signature caching and tool hallucination prevention.
- Qwen Code: Device Code flow support.
- iFlow: Authorization Code flow with local callback handling.
- Stateless Deployment Ready: Can load complex OAuth credentials from environment variables, eliminating the need for physical credential files in containerized environments.
- Intelligent Error Handling:
- Escalating Per-Model Cooldowns: Failed keys are placed on a temporary, escalating cooldown for specific models.
- Key-Level Lockouts: Keys failing across multiple models are temporarily removed from rotation.
- Stream Recovery: The client detects mid-stream errors (like quota limits) and gracefully handles them.
- Credential Prioritization: Automatic tier detection and priority-based credential selection (e.g., paid tier credentials used first for models that require them).
- Advanced Model Requirements: Support for model-tier restrictions (e.g., Gemini 3 requires paid-tier credentials).
- Robust Streaming Support: Includes a wrapper for streaming responses that reassembles fragmented JSON chunks.
- Detailed Usage Tracking: Tracks daily and global usage for each key, persisted to a JSON file.
- Automatic Daily Resets: Automatically resets cooldowns and archives stats daily.
- Provider Agnostic: Works with any provider supported by
litellm. - Extensible: Easily add support for new providers through a simple plugin-based architecture.
- Temperature Override: Global temperature=0 override to prevent tool hallucination with low-temperature settings.
- Shared OAuth Base: Refactored OAuth implementation with reusable
GoogleOAuthBasefor multiple providers.
Installation
To install the library, you can install it directly from a local path. Using the -e flag installs it in "editable" mode, which is recommended for development.
pip install -e .
RotatingClient Class
This is the main class for interacting with the library. It is designed to be a long-lived object that manages the state of your API key pool.
Initialization
import os
from dotenv import load_dotenv
from rotator_library import RotatingClient
# Load environment variables from .env file
load_dotenv()
# Dynamically load all provider API keys from environment variables
api_keys = {}
for key, value in os.environ.items():
# This pattern finds keys like "GEMINI_API_KEY_1" or "OPENAI_API_KEY"
if (key.endswith("_API_KEY") or "_API_KEY_" in key) and key != "PROXY_API_KEY":
# Extracts "gemini" from "GEMINI_API_KEY_1"
provider = key.split("_API_KEY")[0].lower()
if provider not in api_keys:
api_keys[provider] = []
api_keys[provider].append(value)
# Initialize empty dictionary for OAuth credentials (or load from CredentialManager)
oauth_credentials = {}
client = RotatingClient(
api_keys=api_keys,
oauth_credentials=oauth_credentials,
max_retries=2,
usage_file_path="key_usage.json",
configure_logging=True,
global_timeout=30,
abort_on_callback_error=True,
litellm_provider_params={},
ignore_models={},
whitelist_models={},
enable_request_logging=False,
max_concurrent_requests_per_key={},
rotation_tolerance=2.0 # 0.0=deterministic, 2.0=recommended random
)
Arguments
api_keys(Optional[Dict[str, List[str]]]): A dictionary mapping provider names (e.g., "openai", "anthropic") to a list of API keys.oauth_credentials(Optional[Dict[str, List[str]]]): A dictionary mapping provider names (e.g., "gemini_cli", "qwen_code") to a list of file paths to OAuth credential JSON files.max_retries(int, default:2): The number of times to retry a request with the same key if a transient server error (e.g., 500, 503) occurs.usage_file_path(str, default:"key_usage.json"): The path to the JSON file where usage statistics (tokens, cost, success counts) are persisted.configure_logging(bool, default:True): IfTrue, configures the library's logger to propagate logs to the root logger. Set toFalseif you want to handle logging configuration manually.global_timeout(int, default:30): A hard time limit (in seconds) for the entire request lifecycle. If the request (including all retries) takes longer than this, it is aborted.abort_on_callback_error(bool, default:True): IfTrue, any exception raised bypre_request_callbackwill abort the request. IfFalse, the error is logged and the request proceeds.litellm_provider_params(Optional[Dict[str, Any]], default:None): A dictionary of extra parameters to pass tolitellmfor specific providers.ignore_models(Optional[Dict[str, List[str]]], default:None): A dictionary where keys are provider names and values are lists of model names/patterns to exclude (blacklist). Supports wildcards (e.g.,"*-preview").whitelist_models(Optional[Dict[str, List[str]]], default:None): A dictionary where keys are provider names and values are lists of model names/patterns to always include, overridingignore_models.enable_request_logging(bool, default:False): IfTrue, enables detailed per-request file logging (useful for debugging complex interactions).max_concurrent_requests_per_key(Optional[Dict[str, int]], default:None): A dictionary defining the maximum number of concurrent requests allowed for a single API key for a specific provider. Defaults to 1 if not specified.rotation_tolerance(float, default:0.0): Controls credential rotation strategy:0.0: Deterministic - Always selects the least-used credential for perfect load balance.2.0(default, recommended): Weighted Random - Randomly selects credentials with bias toward less-used ones. Provides unpredictability (harder to fingerprint) while maintaining good balance.5.0+: High Randomness - Even heavily-used credentials have significant selection probability. Maximum unpredictability.
The weight formula is:
weight = (max_usage - credential_usage) + tolerance + 1Use Cases:
0.0: When perfect load balance is critical2.0: When avoiding fingerprinting/rate limit detection is important5.0+: For stress testing or maximum unpredictability
Concurrency and Resource Management
The RotatingClient is asynchronous and manages an httpx.AsyncClient internally. It's crucial to close the client properly to release resources. The recommended way is to use an async with block.
import asyncio
async def main():
async with RotatingClient(api_keys=api_keys) as client:
# ... use the client ...
response = await client.acompletion(
model="gemini/gemini-1.5-flash",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response)
asyncio.run(main())
Methods
async def acompletion(self, **kwargs) -> Any:
This is the primary method for making API calls. It's a wrapper around litellm.acompletion that adds the core logic for key acquisition, selection, and retries.
- Parameters: Accepts the same keyword arguments as
litellm.acompletion. Themodelparameter is required and must be a string in the formatprovider/model_name. - Returns:
- For non-streaming requests, it returns the
litellmresponse object. - For streaming requests, it returns an async generator that yields OpenAI-compatible Server-Sent Events (SSE). The wrapper ensures that key locks are released and usage is recorded only after the stream is fully consumed.
- For non-streaming requests, it returns the
Streaming Example:
async def stream_example():
async with RotatingClient(api_keys=api_keys) as client:
response_stream = await client.acompletion(
model="gemini/gemini-1.5-flash",
messages=[{"role": "user", "content": "Tell me a long story."}],
stream=True
)
async for chunk in response_stream:
print(chunk)
asyncio.run(stream_example())
async def aembedding(self, **kwargs) -> Any:
A wrapper around litellm.aembedding that provides the same key management and retry logic for embedding requests.
def token_count(self, model: str, text: str = None, messages: List[Dict[str, str]] = None) -> int:
Calculates the token count for a given text or list of messages using litellm.token_counter.
async def get_available_models(self, provider: str) -> List[str]:
Fetches a list of available models for a specific provider, applying any configured whitelists or blacklists. Results are cached in memory.
async def get_all_available_models(self, grouped: bool = True) -> Union[Dict[str, List[str]], List[str]]:
Fetches a dictionary of all available models, grouped by provider, or as a single flat list if grouped=False.
Credential Tool
The library includes a utility to manage credentials easily:
python -m src.rotator_library.credential_tool
Use this tool to:
- Initialize OAuth: Run the interactive login flows for Gemini, Qwen, and iFlow.
- Export Credentials: Generate
.envcompatible configuration blocks from your saved OAuth JSON files. This is essential for setting up stateless deployments.
Provider Specifics
Qwen Code
- Auth: Uses OAuth 2.0 Device Flow. Requires manual entry of email/identifier if not returned by the provider.
- Resilience: Injects a dummy tool (
do_not_call_me) into requests with no tools to prevent known stream corruption issues on the API. - Reasoning: Parses
<think>tags in the response and exposes them asreasoning_content. - Schema Cleaning: Recursively removes
strictandadditionalPropertiesfrom all tool schemas. Qwen's API has stricter validation than OpenAI's, and these properties cause400 Bad Requesterrors.
iFlow
- Auth: Uses Authorization Code Flow with a local callback server (port 11451).
- Key Separation: Distinguishes between the OAuth
access_token(used to fetch user info) and theapi_key(used for actual chat requests). - Resilience: Similar to Qwen, injects a placeholder tool to stabilize streaming for empty tool lists.
- Schema Cleaning: Recursively removes
strictandadditionalPropertiesfrom all tool schemas to prevent API validation errors. - Custom Models: Supports model definitions via
IFLOW_MODELSenvironment variable (JSON array of model IDs or objects).
NVIDIA NIM
- Discovery: Dynamically fetches available models from the NVIDIA API.
- Thinking: Automatically injects the
thinkingparameter intoextra_bodyfor DeepSeek models (deepseek-v3.1, etc.) whenreasoning_effortis set to low/medium/high.
Google Gemini (CLI)
- Auth: Simulates the Google Cloud CLI authentication flow.
- Project Discovery: Automatically discovers the default Google Cloud Project ID with enhanced onboarding flow.
- Credential Prioritization: Automatic detection and prioritization of paid vs free tier credentials.
- Model Tier Requirements: Gemini 3 models automatically filtered to paid-tier credentials only.
- Gemini 3 Support: Full support for Gemini 3 models with:
thinkingLevelconfiguration (low/high)- Tool hallucination prevention via system instruction injection
- ThoughtSignature caching for multi-turn conversations
- Parameter signature injection into tool descriptions
- Rate Limits: Implements smart fallback strategies (e.g., switching from
gemini-1.5-protogemini-1.5-pro-002) when rate limits are hit.
Antigravity
- Auth: Uses OAuth 2.0 flow similar to Gemini CLI, with Antigravity-specific credentials and scopes.
- Credential Prioritization: Automatic detection and prioritization of paid vs free tier credentials (paid tier resets every 5 hours, free tier resets weekly).
- Models: Supports Gemini 3 Pro, Gemini 2.5 Flash/Flash Lite, Claude Sonnet 4.5 (with/without thinking), Claude Opus 4.5 (thinking only), and GPT-OSS 120B via Google's internal Antigravity API.
- Quota Groups: Models that share quota are automatically grouped:
- Claude/GPT-OSS:
claude-sonnet-4-5,claude-opus-4-5,gpt-oss-120b-medium - Gemini 3 Pro:
gemini-3-pro-high,gemini-3-pro-low,gemini-3-pro-preview - Gemini 2.5 Flash:
gemini-2.5-flash,gemini-2.5-flash-thinking,gemini-2.5-flash-lite - All models in a group deplete the usage of the group equally. So in claude group - it is beneficial to use only Opus, and forget about Sonnet and GPT-OSS.
- Claude/GPT-OSS:
- Quota Baseline Tracking: Background job fetches quota status from API every 5 minutes to provide accurate remaining quota estimates.
- Thought Signature Caching: Server-side caching of
thoughtSignaturedata for multi-turn conversations with Gemini 3 models. - Tool Hallucination Prevention: Automatic injection of system instructions and parameter signatures for Gemini 3 and Claude to prevent tool parameter hallucination.
- Parallel Tool Usage Instruction: Configurable instruction injection to encourage parallel tool calls (enabled by default for Claude).
- Thinking Support:
- Gemini 3: Uses
thinkingLevel(string: "low"/"high") - Gemini 2.5 Flash: Uses
-thinkingvariant whenreasoning_effortis provided - Claude Sonnet 4.5: Uses
thinkingBudget(optional - supports both thinking and non-thinking modes) - Claude Opus 4.5: Uses
thinkingBudget(always uses thinking variant)
- Gemini 3: Uses
- Base URL Fallback: Automatic fallback between sandbox and production endpoints.
Error Handling and Cooldowns
The client uses a sophisticated error handling mechanism:
- Error Classification: All exceptions from
litellmare passed through aclassify_errorfunction to determine their type (rate_limit,authentication,server_error,quota,context_length, etc.). - Server Errors: The client will retry the request with the same key up to
max_retriestimes, using an exponential backoff strategy. - Key-Specific Errors (Authentication, Quota, etc.): The client records the failure in the
UsageManager, which applies an escalating cooldown to the key for that specific model. The client then immediately acquires a new key and continues its attempt to complete the request. - Escalating Cooldown Strategy: Consecutive failures for a key on the same model result in increasing cooldown períods:
- 1st failure: 10 seconds
- 2nd failure: 30 seconds
- 3rd failure: 60 seconds
- 4th+ failure: 120 seconds
- Key-Level Lockouts: If a key fails on multiple different models (3+ distinct models), the
UsageManagerapplies a global 5-minute lockout for that key, removing it from rotation entirely. - Authentication Errors: Immediate 5-minute global lockout (key is assumed revoked or invalid).
Global Timeout and Deadline-Driven Logic
To ensure predictable performance, the client now operates on a strict time budget defined by the global_timeout parameter.
- Deadline Enforcement: When a request starts, a
deadlineis set. The entire process, including all key rotations and retries, must complete before this deadline. - Deadline-Aware Retries: If a retry requires a wait time that would exceed the remaining budget, the wait is skipped, and the client immediately rotates to the next key.
- Silent Internal Errors: Intermittent failures like provider capacity limits or temporary server errors are logged internally but are not raised to the caller. The client will simply rotate to the next key.
Extending with Provider Plugins
The library uses a dynamic plugin system. To add support for a new provider's model list, you only need to:
- Create a new provider file in
src/rotator_library/providers/(e.g.,my_provider.py). - Implement the
ProviderInterface: Inside your new file, create a class that inherits fromProviderInterfaceand implements theget_modelsmethod.
# src/rotator_library/providers/my_provider.py
from .provider_interface import ProviderInterface
from typing import List
import httpx
class MyProvider(ProviderInterface):
async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]:
# Logic to fetch and return a list of model names
# The credential argument allows using the key to fetch models
pass
The system will automatically discover and register your new provider.
Detailed Documentation
For a more in-depth technical explanation of the library's architecture, including the UsageManager's concurrency model and the error classification system, please refer to the Technical Documentation.