Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| from typing import List, Optional | |
| from dotenv import load_dotenv | |
| import logging | |
| import sys | |
| # Configure logging to stdout only | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # API key management constants | |
| KEY_COOLDOWN = 60 # Seconds to wait before retrying a failed key | |
| class ApiKeyManager: | |
| """ | |
| Manages the rotation and cooldown of API keys to prevent rate limiting issues | |
| """ | |
| def __init__(self, api_keys: Optional[List[str]] = None): | |
| """ | |
| Initialize the API key manager with a list of API keys | |
| Args: | |
| api_keys: List of API keys. If None, keys will be loaded from environment variables | |
| """ | |
| if api_keys is None: | |
| # Load API keys from environment variable | |
| env_keys = os.environ.get('GEMINI_API_KEYS', '') | |
| if not env_keys: | |
| logger.error("No API keys found in environment variables") | |
| raise ValueError("No API keys found in environment variables") | |
| api_keys = [key.strip() for key in env_keys.split(',') if key.strip()] | |
| logger.info(f"Loaded {len(api_keys)} API keys from environment variables") | |
| self.api_keys = api_keys | |
| self.current_index = 0 | |
| self.failed_keys = {} # Maps key to timestamp of failure | |
| if not self.api_keys: | |
| logger.error("No valid API keys provided") | |
| raise ValueError("No API keys provided. Please add at least one valid API key.") | |
| # Log the first few characters of each key for debugging | |
| for i, key in enumerate(self.api_keys): | |
| masked_key = key[:8] + "..." if len(key) > 8 else "..." | |
| logger.info(f"API key {i+1}: {masked_key}") | |
| logger.info(f"ApiKeyManager initialized with {len(self.api_keys)} keys") | |
| def get_next_api_key(self) -> str: | |
| """ | |
| Get the next working API key in rotation | |
| Returns: | |
| str: The next available API key | |
| Raises: | |
| Exception: If no working API keys are available | |
| """ | |
| current_time = time.time() | |
| # Check for keys that have completed their cooldown period | |
| for key in list(self.failed_keys.keys()): | |
| if current_time - self.failed_keys[key] > KEY_COOLDOWN: | |
| masked_key = key[:8] + "..." if len(key) > 8 else "..." | |
| logger.info(f"API key {masked_key} has cooled down after {KEY_COOLDOWN} seconds and is being retried") | |
| del self.failed_keys[key] | |
| # If all keys are in cooldown | |
| if len(self.failed_keys) == len(self.api_keys): | |
| logger.warning("All API keys are currently in cooldown. Waiting for the first key to be available again...") | |
| # Find the key with the earliest cooldown | |
| earliest_key = min(self.failed_keys, key=self.failed_keys.get) | |
| wait_time = KEY_COOLDOWN - (current_time - self.failed_keys[earliest_key]) + 1 | |
| wait_time = max(1, int(wait_time)) | |
| logger.info(f"Waiting {wait_time} seconds for cooldown...") | |
| time.sleep(wait_time) | |
| # Remove this key from the failed keys to retry it | |
| masked_key = earliest_key[:8] + "..." if len(earliest_key) > 8 else "..." | |
| logger.info(f"Cooldown complete for {masked_key}, removing from failed keys") | |
| del self.failed_keys[earliest_key] | |
| return earliest_key | |
| # Find the next key not in cooldown | |
| start_index = self.current_index | |
| while True: | |
| api_key = self.api_keys[self.current_index] | |
| self.current_index = (self.current_index + 1) % len(self.api_keys) | |
| if api_key not in self.failed_keys: | |
| masked_key = api_key[:8] + "..." if len(api_key) > 8 else "..." | |
| logger.info(f"Using API key: {masked_key}") | |
| return api_key | |
| # If we've checked all keys and none are available | |
| if self.current_index == start_index: | |
| # This shouldn't happen because we've already checked above | |
| logger.error("Could not find an available API key, which is unexpected") | |
| raise ValueError("Could not find an available API key, which is unexpected") | |
| def mark_key_as_failed(self, api_key: str) -> None: | |
| """ | |
| Mark an API key as temporarily unavailable | |
| Args: | |
| api_key: The API key to mark as failed | |
| """ | |
| self.failed_keys[api_key] = time.time() | |
| masked_key = api_key[:8] + "..." if len(api_key) > 8 else "..." | |
| logger.warning(f"API key {masked_key} marked as temporarily unavailable") | |
| available_keys = len(self.api_keys) - len(self.failed_keys) | |
| logger.info(f"{available_keys} keys immediately available. {len(self.failed_keys)} keys in cooldown.") | |
| # If we're running out of keys, log a more urgent warning | |
| if available_keys <= 1: | |
| logger.warning(f"WARNING: Only {available_keys} API keys remaining. Consider adding more keys.") | |
| def has_working_keys(self) -> bool: | |
| """ | |
| Check if there are any working keys left | |
| Returns: | |
| bool: True if there are working keys, False otherwise | |
| """ | |
| available_keys = len(self.api_keys) - len(self.failed_keys) | |
| logger.info(f"Checking API key availability: {available_keys} keys available") | |
| return available_keys > 0 | |
| def get_api_keys_status(self) -> dict: | |
| """ | |
| Get status information about API keys | |
| Returns: | |
| dict: Status information about the API keys | |
| """ | |
| current_time = time.time() | |
| available_keys = [] | |
| cooldown_keys = [] | |
| for i, key in enumerate(self.api_keys): | |
| masked_key = key[:8] + "..." if len(key) > 8 else "..." | |
| if key in self.failed_keys: | |
| cooldown_remaining = max(0, KEY_COOLDOWN - (current_time - self.failed_keys[key])) | |
| cooldown_keys.append({ | |
| "key_id": i, | |
| "masked_key": masked_key, | |
| "cooldown_remaining_seconds": int(cooldown_remaining) | |
| }) | |
| else: | |
| available_keys.append({ | |
| "key_id": i, | |
| "masked_key": masked_key | |
| }) | |
| return { | |
| "total_keys": len(self.api_keys), | |
| "available_keys": len(available_keys), | |
| "cooldown_keys": len(cooldown_keys), | |
| "available_key_details": available_keys, | |
| "cooldown_key_details": cooldown_keys | |
| } |