Mirrowel
docs(antigravity): 📚 document quota tracking, background jobs, and error handling
2eb0cb6
# Resilience & API Key Management Library
A robust, asynchronous, and thread-safe Python library for managing a pool of API keys. It is designed to be integrated into applications (such as the Universal LLM API Proxy included in this project) to provide a powerful layer of resilience and high availability when interacting with multiple LLM providers.
## Key Features
- **Asynchronous by Design**: Built with `asyncio` and `httpx` for high-performance, non-blocking I/O.
- **Advanced Concurrency Control**: A single API key can be used for multiple concurrent requests. By default, it supports concurrent requests to *different* models. With configuration (`MAX_CONCURRENT_REQUESTS_PER_KEY_<PROVIDER>`), it can also support multiple concurrent requests to the *same* model using the same key.
- **Smart Key Management**: Selects the optimal key for each request using a tiered, model-aware locking strategy to distribute load evenly and maximize availability.
- **Configurable Rotation Strategy**: Choose between deterministic least-used selection (perfect balance) or default weighted random selection (unpredictable, harder to fingerprint).
- **Deadline-Driven Requests**: A global timeout ensures that no request, including all retries and key selections, exceeds a specified time limit.
- **OAuth & API Key Support**: Built-in support for standard API keys and complex OAuth flows.
- **Gemini CLI**: Full OAuth 2.0 web flow with automatic project discovery, free-tier onboarding, and credential prioritization (paid vs free tier).
- **Antigravity**: Full OAuth 2.0 support for Gemini 3, Gemini 2.5, and Claude Sonnet 4.5 models with thought signature caching(Full support for Gemini 3 and Claude models). **First on the scene to provide full support for Gemini 3** via Antigravity with advanced features like thought signature caching and tool hallucination prevention.
- **Qwen Code**: Device Code flow support.
- **iFlow**: Authorization Code flow with local callback handling.
- **Stateless Deployment Ready**: Can load complex OAuth credentials from environment variables, eliminating the need for physical credential files in containerized environments.
- **Intelligent Error Handling**:
- **Escalating Per-Model Cooldowns**: Failed keys are placed on a temporary, escalating cooldown for specific models.
- **Key-Level Lockouts**: Keys failing across multiple models are temporarily removed from rotation.
- **Stream Recovery**: The client detects mid-stream errors (like quota limits) and gracefully handles them.
- **Credential Prioritization**: Automatic tier detection and priority-based credential selection (e.g., paid tier credentials used first for models that require them).
- **Advanced Model Requirements**: Support for model-tier restrictions (e.g., Gemini 3 requires paid-tier credentials).
- **Robust Streaming Support**: Includes a wrapper for streaming responses that reassembles fragmented JSON chunks.
- **Detailed Usage Tracking**: Tracks daily and global usage for each key, persisted to a JSON file.
- **Automatic Daily Resets**: Automatically resets cooldowns and archives stats daily.
- **Provider Agnostic**: Works with any provider supported by `litellm`.
- **Extensible**: Easily add support for new providers through a simple plugin-based architecture.
- **Temperature Override**: Global temperature=0 override to prevent tool hallucination with low-temperature settings.
- **Shared OAuth Base**: Refactored OAuth implementation with reusable [`GoogleOAuthBase`](providers/google_oauth_base.py) for multiple providers.
## Installation
To install the library, you can install it directly from a local path. Using the `-e` flag installs it in "editable" mode, which is recommended for development.
```bash
pip install -e .
```
## `RotatingClient` Class
This is the main class for interacting with the library. It is designed to be a long-lived object that manages the state of your API key pool.
### Initialization
```python
import os
from dotenv import load_dotenv
from rotator_library import RotatingClient
# Load environment variables from .env file
load_dotenv()
# Dynamically load all provider API keys from environment variables
api_keys = {}
for key, value in os.environ.items():
# This pattern finds keys like "GEMINI_API_KEY_1" or "OPENAI_API_KEY"
if (key.endswith("_API_KEY") or "_API_KEY_" in key) and key != "PROXY_API_KEY":
# Extracts "gemini" from "GEMINI_API_KEY_1"
provider = key.split("_API_KEY")[0].lower()
if provider not in api_keys:
api_keys[provider] = []
api_keys[provider].append(value)
# Initialize empty dictionary for OAuth credentials (or load from CredentialManager)
oauth_credentials = {}
client = RotatingClient(
api_keys=api_keys,
oauth_credentials=oauth_credentials,
max_retries=2,
usage_file_path="key_usage.json",
configure_logging=True,
global_timeout=30,
abort_on_callback_error=True,
litellm_provider_params={},
ignore_models={},
whitelist_models={},
enable_request_logging=False,
max_concurrent_requests_per_key={},
rotation_tolerance=2.0 # 0.0=deterministic, 2.0=recommended random
)
```
#### Arguments
- `api_keys` (`Optional[Dict[str, List[str]]]`): A dictionary mapping provider names (e.g., "openai", "anthropic") to a list of API keys.
- `oauth_credentials` (`Optional[Dict[str, List[str]]]`): A dictionary mapping provider names (e.g., "gemini_cli", "qwen_code") to a list of file paths to OAuth credential JSON files.
- `max_retries` (`int`, default: `2`): The number of times to retry a request with the *same key* if a transient server error (e.g., 500, 503) occurs.
- `usage_file_path` (`str`, default: `"key_usage.json"`): The path to the JSON file where usage statistics (tokens, cost, success counts) are persisted.
- `configure_logging` (`bool`, default: `True`): If `True`, configures the library's logger to propagate logs to the root logger. Set to `False` if you want to handle logging configuration manually.
- `global_timeout` (`int`, default: `30`): A hard time limit (in seconds) for the entire request lifecycle. If the request (including all retries) takes longer than this, it is aborted.
- `abort_on_callback_error` (`bool`, default: `True`): If `True`, any exception raised by `pre_request_callback` will abort the request. If `False`, the error is logged and the request proceeds.
- `litellm_provider_params` (`Optional[Dict[str, Any]]`, default: `None`): A dictionary of extra parameters to pass to `litellm` for specific providers.
- `ignore_models` (`Optional[Dict[str, List[str]]]`, default: `None`): A dictionary where keys are provider names and values are lists of model names/patterns to exclude (blacklist). Supports wildcards (e.g., `"*-preview"`).
- `whitelist_models` (`Optional[Dict[str, List[str]]]`, default: `None`): A dictionary where keys are provider names and values are lists of model names/patterns to always include, overriding `ignore_models`.
- `enable_request_logging` (`bool`, default: `False`): If `True`, enables detailed per-request file logging (useful for debugging complex interactions).
- `max_concurrent_requests_per_key` (`Optional[Dict[str, int]]`, default: `None`): A dictionary defining the maximum number of concurrent requests allowed for a single API key for a specific provider. Defaults to 1 if not specified.
- `rotation_tolerance` (`float`, default: `0.0`): Controls credential rotation strategy:
- `0.0`: **Deterministic** - Always selects the least-used credential for perfect load balance.
- `2.0` (default, recommended): **Weighted Random** - Randomly selects credentials with bias toward less-used ones. Provides unpredictability (harder to fingerprint) while maintaining good balance.
- `5.0+`: **High Randomness** - Even heavily-used credentials have significant selection probability. Maximum unpredictability.
The weight formula is: `weight = (max_usage - credential_usage) + tolerance + 1`
**Use Cases:**
- `0.0`: When perfect load balance is critical
- `2.0`: When avoiding fingerprinting/rate limit detection is important
- `5.0+`: For stress testing or maximum unpredictability
### Concurrency and Resource Management
The `RotatingClient` is asynchronous and manages an `httpx.AsyncClient` internally. It's crucial to close the client properly to release resources. The recommended way is to use an `async with` block.
```python
import asyncio
async def main():
async with RotatingClient(api_keys=api_keys) as client:
# ... use the client ...
response = await client.acompletion(
model="gemini/gemini-1.5-flash",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response)
asyncio.run(main())
```
### Methods
#### `async def acompletion(self, **kwargs) -> Any:`
This is the primary method for making API calls. It's a wrapper around `litellm.acompletion` that adds the core logic for key acquisition, selection, and retries.
- **Parameters**: Accepts the same keyword arguments as `litellm.acompletion`. The `model` parameter is required and must be a string in the format `provider/model_name`.
- **Returns**:
- For non-streaming requests, it returns the `litellm` response object.
- For streaming requests, it returns an async generator that yields OpenAI-compatible Server-Sent Events (SSE). The wrapper ensures that key locks are released and usage is recorded only after the stream is fully consumed.
**Streaming Example:**
```python
async def stream_example():
async with RotatingClient(api_keys=api_keys) as client:
response_stream = await client.acompletion(
model="gemini/gemini-1.5-flash",
messages=[{"role": "user", "content": "Tell me a long story."}],
stream=True
)
async for chunk in response_stream:
print(chunk)
asyncio.run(stream_example())
```
#### `async def aembedding(self, **kwargs) -> Any:`
A wrapper around `litellm.aembedding` that provides the same key management and retry logic for embedding requests.
#### `def token_count(self, model: str, text: str = None, messages: List[Dict[str, str]] = None) -> int:`
Calculates the token count for a given text or list of messages using `litellm.token_counter`.
#### `async def get_available_models(self, provider: str) -> List[str]:`
Fetches a list of available models for a specific provider, applying any configured whitelists or blacklists. Results are cached in memory.
#### `async def get_all_available_models(self, grouped: bool = True) -> Union[Dict[str, List[str]], List[str]]:`
Fetches a dictionary of all available models, grouped by provider, or as a single flat list if `grouped=False`.
## Credential Tool
The library includes a utility to manage credentials easily:
```bash
python -m src.rotator_library.credential_tool
```
Use this tool to:
1. **Initialize OAuth**: Run the interactive login flows for Gemini, Qwen, and iFlow.
2. **Export Credentials**: Generate `.env` compatible configuration blocks from your saved OAuth JSON files. This is essential for setting up stateless deployments.
## Provider Specifics
### Qwen Code
- **Auth**: Uses OAuth 2.0 Device Flow. Requires manual entry of email/identifier if not returned by the provider.
- **Resilience**: Injects a dummy tool (`do_not_call_me`) into requests with no tools to prevent known stream corruption issues on the API.
- **Reasoning**: Parses `<think>` tags in the response and exposes them as `reasoning_content`.
- **Schema Cleaning**: Recursively removes `strict` and `additionalProperties` from all tool schemas. Qwen's API has stricter validation than OpenAI's, and these properties cause `400 Bad Request` errors.
### iFlow
- **Auth**: Uses Authorization Code Flow with a local callback server (port 11451).
- **Key Separation**: Distinguishes between the OAuth `access_token` (used to fetch user info) and the `api_key` (used for actual chat requests).
- **Resilience**: Similar to Qwen, injects a placeholder tool to stabilize streaming for empty tool lists.
- **Schema Cleaning**: Recursively removes `strict` and `additionalProperties` from all tool schemas to prevent API validation errors.
- **Custom Models**: Supports model definitions via `IFLOW_MODELS` environment variable (JSON array of model IDs or objects).
### NVIDIA NIM
- **Discovery**: Dynamically fetches available models from the NVIDIA API.
- **Thinking**: Automatically injects the `thinking` parameter into `extra_body` for DeepSeek models (`deepseek-v3.1`, etc.) when `reasoning_effort` is set to low/medium/high.
### Google Gemini (CLI)
- **Auth**: Simulates the Google Cloud CLI authentication flow.
- **Project Discovery**: Automatically discovers the default Google Cloud Project ID with enhanced onboarding flow.
- **Credential Prioritization**: Automatic detection and prioritization of paid vs free tier credentials.
- **Model Tier Requirements**: Gemini 3 models automatically filtered to paid-tier credentials only.
- **Gemini 3 Support**: Full support for Gemini 3 models with:
- `thinkingLevel` configuration (low/high)
- Tool hallucination prevention via system instruction injection
- ThoughtSignature caching for multi-turn conversations
- Parameter signature injection into tool descriptions
- **Rate Limits**: Implements smart fallback strategies (e.g., switching from `gemini-1.5-pro` to `gemini-1.5-pro-002`) when rate limits are hit.
### Antigravity
- **Auth**: Uses OAuth 2.0 flow similar to Gemini CLI, with Antigravity-specific credentials and scopes.
- **Credential Prioritization**: Automatic detection and prioritization of paid vs free tier credentials (paid tier resets every 5 hours, free tier resets weekly).
- **Models**: Supports Gemini 3 Pro, Gemini 2.5 Flash/Flash Lite, Claude Sonnet 4.5 (with/without thinking), Claude Opus 4.5 (thinking only), and GPT-OSS 120B via Google's internal Antigravity API.
- **Quota Groups**: Models that share quota are automatically grouped:
- Claude/GPT-OSS: `claude-sonnet-4-5`, `claude-opus-4-5`, `gpt-oss-120b-medium`
- Gemini 3 Pro: `gemini-3-pro-high`, `gemini-3-pro-low`, `gemini-3-pro-preview`
- Gemini 2.5 Flash: `gemini-2.5-flash`, `gemini-2.5-flash-thinking`, `gemini-2.5-flash-lite`
- All models in a group deplete the usage of the group equally. So in claude group - it is beneficial to use only Opus, and forget about Sonnet and GPT-OSS.
- **Quota Baseline Tracking**: Background job fetches quota status from API every 5 minutes to provide accurate remaining quota estimates.
- **Thought Signature Caching**: Server-side caching of `thoughtSignature` data for multi-turn conversations with Gemini 3 models.
- **Tool Hallucination Prevention**: Automatic injection of system instructions and parameter signatures for Gemini 3 and Claude to prevent tool parameter hallucination.
- **Parallel Tool Usage Instruction**: Configurable instruction injection to encourage parallel tool calls (enabled by default for Claude).
- **Thinking Support**:
- Gemini 3: Uses `thinkingLevel` (string: "low"/"high")
- Gemini 2.5 Flash: Uses `-thinking` variant when `reasoning_effort` is provided
- Claude Sonnet 4.5: Uses `thinkingBudget` (optional - supports both thinking and non-thinking modes)
- Claude Opus 4.5: Uses `thinkingBudget` (always uses thinking variant)
- **Base URL Fallback**: Automatic fallback between sandbox and production endpoints.
## Error Handling and Cooldowns
The client uses a sophisticated error handling mechanism:
- **Error Classification**: All exceptions from `litellm` are passed through a `classify_error` function to determine their type (`rate_limit`, `authentication`, `server_error`, `quota`, `context_length`, etc.).
- **Server Errors**: The client will retry the request with the *same key* up to `max_retries` times, using an exponential backoff strategy.
- **Key-Specific Errors (Authentication, Quota, etc.)**: The client records the failure in the `UsageManager`, which applies an escalating cooldown to the key for that specific model. The client then immediately acquires a new key and continues its attempt to complete the request.
- **Escalating Cooldown Strategy**: Consecutive failures for a key on the same model result in increasing cooldown períods:
- 1st failure: 10 seconds
- 2nd failure: 30 seconds
- 3rd failure: 60 seconds
- 4th+ failure: 120 seconds
- **Key-Level Lockouts**: If a key fails on multiple different models (3+ distinct models), the `UsageManager` applies a global 5-minute lockout for that key, removing it from rotation entirely.
- **Authentication Errors**: Immediate 5-minute global lockout (key is assumed revoked or invalid).
### Global Timeout and Deadline-Driven Logic
To ensure predictable performance, the client now operates on a strict time budget defined by the `global_timeout` parameter.
- **Deadline Enforcement**: When a request starts, a `deadline` is set. The entire process, including all key rotations and retries, must complete before this deadline.
- **Deadline-Aware Retries**: If a retry requires a wait time that would exceed the remaining budget, the wait is skipped, and the client immediately rotates to the next key.
- **Silent Internal Errors**: Intermittent failures like provider capacity limits or temporary server errors are logged internally but are **not raised** to the caller. The client will simply rotate to the next key.
## Extending with Provider Plugins
The library uses a dynamic plugin system. To add support for a new provider's model list, you only need to:
1. **Create a new provider file** in `src/rotator_library/providers/` (e.g., `my_provider.py`).
2. **Implement the `ProviderInterface`**: Inside your new file, create a class that inherits from `ProviderInterface` and implements the `get_models` method.
```python
# src/rotator_library/providers/my_provider.py
from .provider_interface import ProviderInterface
from typing import List
import httpx
class MyProvider(ProviderInterface):
async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]:
# Logic to fetch and return a list of model names
# The credential argument allows using the key to fetch models
pass
```
The system will automatically discover and register your new provider.
## Detailed Documentation
For a more in-depth technical explanation of the library's architecture, including the `UsageManager`'s concurrency model and the error classification system, please refer to the [Technical Documentation](../../DOCUMENTATION.md).