import requests import json import os import time import threading import logging from typing import Dict, Any, List from .token_limiter import get_token_limiter from .prompt_cache import get_prompt_cache_manager from models.model_costs import translate_model_to_requesty, translate_model_to_openrouter # Configure logging logger = logging.getLogger(__name__) class RequestyKeyRotator: """Manages rotation of multiple Requesty API keys with auto-disable functionality""" def __init__(self): self.api_keys = self._load_api_keys() self.current_index = 0 self.usage_stats = {key: 0 for key in self.api_keys} self.lock = threading.Lock() # Auto-disable functionality self.active_keys = set(self.api_keys) self.disabled_keys = set() self.failure_counts = {key: 0 for key in self.api_keys} self.success_counts = {key: 0 for key in self.api_keys} self.last_failure_time = {} self.last_used_time = {} self.disable_reasons = {} self.failure_threshold = 10 self.cooldown_period = 300 # 5 minutes # Log initialization info if self.api_keys: logger.info(f"RequestyKeyRotator initialized with {len(self.api_keys)} API keys") else: logger.warning("RequestyKeyRotator: No API keys found in REQUESTY_API_KEYS environment variable") def _load_api_keys(self) -> List[str]: """Load API keys from environment variable""" keys_str = os.environ.get('REQUESTY_API_KEYS', '') return [key.strip() for key in keys_str.split(',') if key.strip()] def get_next_key(self) -> str: """Get the next API key in rotation""" with self.lock: if not self.api_keys: raise ValueError("No Requesty API keys configured") key = self.api_keys[self.current_index] old_index = self.current_index self.current_index = (self.current_index + 1) % len(self.api_keys) self.usage_stats[key] += 1 # Log key rotation for visibility masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" logger.info(f"Requesty API Key Rotation: Using key {old_index + 1}/{len(self.api_keys)} ({masked_key})") return key def get_current_key(self) -> str: """Get the current API key without rotating (for admin operations)""" with self.lock: if not self.api_keys: raise ValueError("No Requesty API keys configured") key = self.api_keys[self.current_index] return key def get_usage_stats(self) -> Dict[str, int]: """Get usage statistics for all keys""" return self.usage_stats.copy() def get_next_active_key(self) -> str: """Get the next active API key in rotation, skipping disabled ones""" with self.lock: if not self.active_keys: raise ValueError("No active Requesty API keys available") # Try to reactivate keys that have been in cooldown self._try_reactivate_keys() if not self.active_keys: raise ValueError("No active Requesty API keys available after cooldown check") # Find next active key starting from current index attempts = 0 while attempts < len(self.api_keys): key = self.api_keys[self.current_index] self.current_index = (self.current_index + 1) % len(self.api_keys) if key in self.active_keys: self.usage_stats[key] += 1 self.last_used_time[key] = time.time() # Log key rotation for visibility masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" logger.info(f"Requesty Active Key: Using {masked_key} ({len(self.active_keys)} active keys)") return key attempts += 1 raise ValueError("No active Requesty API keys available") def record_failure(self, api_key: str, error_code: int = None): """Record a failure for an API key and auto-disable if threshold reached""" with self.lock: if api_key not in self.api_keys: return self.failure_counts[api_key] += 1 self.last_failure_time[api_key] = time.time() masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***" logger.warning(f"Requesty Key Failure: {masked_key} failed (count: {self.failure_counts[api_key]}, error: {error_code})") # Auto-disable if threshold reached if self.failure_counts[api_key] >= self.failure_threshold and api_key in self.active_keys: self.active_keys.remove(api_key) self.disabled_keys.add(api_key) self.disable_reasons[api_key] = f"Auto-disabled after {self.failure_threshold} consecutive failures" logger.error(f"Requesty Key Auto-Disabled: {masked_key} (failures: {self.failure_counts[api_key]})") # Alert if running low on active keys if len(self.active_keys) <= 1: logger.critical(f"ALERT: Only {len(self.active_keys)} Requesty API keys remaining active!") def record_success(self, api_key: str): """Record a successful request for an API key, resetting failure count""" with self.lock: if api_key not in self.api_keys: return # Reset failure count on success old_failures = self.failure_counts[api_key] self.failure_counts[api_key] = 0 self.success_counts[api_key] += 1 if old_failures > 0: masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***" logger.info(f"Requesty Key Recovered: {masked_key} success after {old_failures} failures") def _try_reactivate_keys(self): """Try to reactivate keys that have been in cooldown period""" current_time = time.time() reactivated_keys = [] for key in list(self.disabled_keys): last_failure = self.last_failure_time.get(key, 0) if current_time - last_failure >= self.cooldown_period: # Only reactivate if it was auto-disabled (not manually disabled) if self.disable_reasons.get(key, "").startswith("Auto-disabled"): self.disabled_keys.remove(key) self.active_keys.add(key) self.failure_counts[key] = 0 # Reset failure count self.disable_reasons.pop(key, None) reactivated_keys.append(key) if reactivated_keys: masked_keys = [f"{key[:8]}...{key[-4:]}" for key in reactivated_keys] logger.info(f"Requesty Keys Reactivated: {', '.join(masked_keys)} after cooldown period") def manually_disable_key(self, api_key: str, reason: str = "Manually disabled") -> bool: """Manually disable an API key""" with self.lock: if api_key not in self.api_keys: return False if api_key in self.active_keys: self.active_keys.remove(api_key) self.disabled_keys.add(api_key) self.disable_reasons[api_key] = reason masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***" logger.info(f"Requesty Key Manually Disabled: {masked_key} - {reason}") return True return False def manually_enable_key(self, api_key: str) -> bool: """Manually enable a disabled API key""" with self.lock: if api_key not in self.api_keys: return False if api_key in self.disabled_keys: self.disabled_keys.remove(api_key) self.active_keys.add(api_key) self.failure_counts[api_key] = 0 # Reset failure count self.disable_reasons.pop(api_key, None) masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***" logger.info(f"Requesty Key Manually Enabled: {masked_key}") return True return False def get_detailed_key_stats(self) -> Dict[str, Any]: """Get detailed statistics for all API keys""" with self.lock: keys_detail = [] for key in self.api_keys: masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" # Determine status if key in self.active_keys: if self.failure_counts[key] == 0: status = "healthy" elif self.failure_counts[key] < self.failure_threshold: status = "warning" else: status = "active" # Should not happen, but just in case else: status = "disabled" keys_detail.append({ "key_id": masked_key, "full_key": key, # For internal operations only "status": status, "is_active": key in self.active_keys, "failure_count": self.failure_counts[key], "success_count": self.success_counts[key], "total_usage": self.usage_stats[key], "last_used": self.last_used_time.get(key), "last_failure": self.last_failure_time.get(key), "disable_reason": self.disable_reasons.get(key, ""), "can_be_reactivated": ( key in self.disabled_keys and self.disable_reasons.get(key, "").startswith("Auto-disabled") and time.time() - self.last_failure_time.get(key, 0) >= self.cooldown_period ) }) return { "total_keys": len(self.api_keys), "active_keys": len(self.active_keys), "disabled_keys": len(self.disabled_keys), "healthy_keys": len([k for k in self.active_keys if self.failure_counts[k] == 0]), "warning_keys": len([k for k in self.active_keys if 0 < self.failure_counts[k] < self.failure_threshold]), "failure_threshold": self.failure_threshold, "cooldown_period": self.cooldown_period, "keys_detail": keys_detail } def reset_stats(self): """Reset usage statistics""" with self.lock: self.usage_stats = {key: 0 for key in self.api_keys} self.failure_counts = {key: 0 for key in self.api_keys} self.success_counts = {key: 0 for key in self.api_keys} self.last_failure_time = {} self.last_used_time = {} class RequestyProxy: """Proxy for Requesty API requests with key rotation and auto-cache""" def __init__(self): self.base_url = "https://router.requesty.ai/v1" self.timeout = 30 self.key_rotator = RequestyKeyRotator() self.token_limiter = get_token_limiter() self.cache_manager = get_prompt_cache_manager() self.site_url = os.environ.get('REQUESTY_SITE_URL', 'https://github.com/openrouter-proxy') self.site_name = os.environ.get('REQUESTY_SITE_NAME', 'OpenRouter Proxy') def forward_request(self, data: Dict[Any, Any], api_key: str = None, is_special_token: bool = False) -> requests.Response: """Forward chat completion request to Requesty with cache optimization""" model = data.get('model', '') if not is_special_token: # Apply normal limitations for regular tokens # 1. Apply token limits (same as OpenRouter) is_valid, limited_data, limit_reason = self.token_limiter.validate_and_limit_request(data, model) if not is_valid: return self._create_error_response( 400, "token_limit_exceeded", limit_reason ) # 2. Apply cost optimizations optimized_data = self.token_limiter.add_cost_optimization_to_request(limited_data, model) # 3. PROCESS WITH MANDATORY CACHE (same logic as OpenRouter) cache_hit, cached_response, final_data, cache_key = self.cache_manager.process_request_with_cache( optimized_data, model ) if cache_hit and cached_response: # Return cached response logger.info(f"✓ Requesty Cache HIT for model {model} (key: {cache_key[:16]}...)") return self._create_cached_response(cached_response) # 4. Add Requesty-specific optimizations final_data = self._add_requesty_optimizations(final_data) else: # Special token - bypass all limitations and cache final_data = self._add_requesty_optimizations(data, bypass_limitations=True) cache_key = "special_token_bypass" # 5. CLEAN STOP SEQUENCES TO PREVENT API ERRORS final_data = self._clean_stop_sequences(final_data) # 6. Retry logic with all active API keys url = f"{self.base_url}/chat/completions" max_attempts = len(self.key_rotator.active_keys) if self.key_rotator.active_keys else 1 attempts = 0 last_error = None while attempts < max_attempts: try: # Get next active API key current_key = self.key_rotator.get_next_active_key() except ValueError as e: # No active keys available return self._create_error_response(500, "no_active_keys", str(e)) # Prepare headers for current key headers = self._prepare_headers(current_key) try: # Send request to Requesty if final_data.get('stream', False): response = requests.post( url, json=final_data, headers=headers, timeout=self.timeout, stream=True ) else: response = requests.post( url, json=final_data, headers=headers, timeout=self.timeout ) # Check response status if response.status_code == 200: # Success! Record it and return self.key_rotator.record_success(current_key) # Store in cache if successful and not streaming (only for regular tokens) if not is_special_token and not final_data.get('stream', False): try: response_json = response.json() self.cache_manager.store_in_cache(cache_key, response_json, model) logger.info(f"✓ Requesty Cache STORED for model {model} (key: {cache_key[:16]}...)") except Exception as e: logger.warning(f"Failed to store Requesty response in cache: {e}") return response elif response.status_code in [401, 403, 429]: # API key related errors - record failure and try next key self.key_rotator.record_failure(current_key, response.status_code) attempts += 1 try: error_data = response.json() last_error = error_data.get('error', {}).get('message', f'HTTP {response.status_code}') # Store detailed error for final response error_details = { "status_code": response.status_code, "response_body": error_data, "headers": dict(response.headers), "url": url } except: last_error = f'HTTP {response.status_code} - API key authentication failed' error_details = { "status_code": response.status_code, "response_body": response.text, "headers": dict(response.headers), "url": url } logger.warning(f"Requesty API key failed with {response.status_code}, trying next key...") continue elif response.status_code >= 500: # Server errors - record failure and try next key self.key_rotator.record_failure(current_key, response.status_code) attempts += 1 try: error_data = response.json() last_error = error_data.get('error', {}).get('message', f'HTTP {response.status_code}') # Store detailed error for final response error_details = { "status_code": response.status_code, "response_body": error_data, "headers": dict(response.headers), "url": url } except: last_error = f'HTTP {response.status_code} - Server error' error_details = { "status_code": response.status_code, "response_body": response.text, "headers": dict(response.headers), "url": url } logger.warning(f"Requesty server error {response.status_code}, trying next key...") continue else: # Other HTTP errors (4xx client errors) - don't retry, return with detailed error logger.warning(f"Requesty client error {response.status_code}, not retrying") try: error_data = response.json() # Enhance the response with additional context if 'error' in error_data: error_data['error']['provider'] = 'requesty' error_data['error']['url'] = url error_data['error']['headers'] = dict(response.headers) # Update response content with enhanced error data response._content = json.dumps(error_data).encode('utf-8') except: # If we can't parse JSON, create our own error response return self._create_error_response( response.status_code, "client_error", f"HTTP {response.status_code}: {response.text}", { "original_status_code": response.status_code, "response_body": response.text, "headers": dict(response.headers), "url": url } ) return response except requests.exceptions.RequestException as e: # Network/connection errors - record failure and try next key self.key_rotator.record_failure(current_key, None) attempts += 1 last_error = str(e) logger.warning(f"Requesty connection error: {e}, trying next key...") continue # If we get here, all keys failed logger.error(f"All Requesty API keys failed after {attempts} attempts") # Include detailed error information if available error_context = { "attempts_made": attempts, "max_attempts": max_attempts, "active_keys_count": len(self.key_rotator.active_keys), "last_error_details": locals().get('error_details', None) } return self._create_error_response( 500, "all_keys_failed", f"All API keys are currently unavailable. Last error: {last_error}", error_context ) def _clean_stop_sequences(self, data: Dict[Any, Any]) -> Dict[Any, Any]: """Clean stop sequences to avoid 'stop_sequences must contain non-whitespace' errors""" cleaned_data = data.copy() if 'stop' in cleaned_data and isinstance(cleaned_data['stop'], list): # Filter valid stop sequences (non-empty and containing non-whitespace characters) valid_stops = [] for stop in cleaned_data['stop']: if stop and isinstance(stop, str) and stop.strip(): valid_stops.append(stop) if valid_stops: cleaned_data['stop'] = valid_stops else: # If no valid stop sequences remain, remove the field entirely del cleaned_data['stop'] logger.info("Removed empty stop sequences to prevent API error") return cleaned_data def _add_requesty_optimizations(self, data: Dict[Any, Any], bypass_limitations: bool = False) -> Dict[Any, Any]: """Add Requesty-specific optimizations and model translation""" optimized_data = data.copy() # Translate model name from OpenRouter format to Requesty format original_model = optimized_data.get('model', '') requesty_model = translate_model_to_requesty(original_model) optimized_data['model'] = requesty_model # Add auto_cache flag for Requesty (always enabled) requesty_config = { "auto_cache": True } # Add special token indication if bypassing limitations if bypass_limitations: requesty_config["special_token"] = True requesty_config["unlimited"] = True optimized_data["requesty"] = requesty_config return optimized_data def _prepare_headers(self, api_key: str) -> Dict[str, str]: """Prepare headers for Requesty API""" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": self.site_url, "X-Title": self.site_name } return headers def _create_error_response(self, status_code: int, error_type: str, message: str, context: Dict = None) -> requests.Response: """Create a mock error response with additional context""" mock_response = requests.Response() mock_response.status_code = status_code error_data = { "error": { "message": message, "type": error_type, "code": status_code, "provider": "requesty" } } # Add context information if provided if context: error_data["error"].update(context) mock_response._content = json.dumps(error_data).encode('utf-8') mock_response.headers['Content-Type'] = 'application/json' return mock_response def _create_cached_response(self, cached_data: Dict) -> requests.Response: """Create a mock response from cached data""" mock_response = requests.Response() mock_response.status_code = 200 # Add cache metadata to the response enhanced_response = cached_data.copy() enhanced_response['x_cache_info'] = { "cache_hit": True, "source": "requesty_local_cache", "cost_savings": True } mock_response._content = json.dumps(enhanced_response).encode('utf-8') mock_response.headers['Content-Type'] = 'application/json' mock_response.headers['X-Cache-Status'] = 'HIT' mock_response.headers['X-Cost-Optimized'] = 'true' mock_response.headers['X-Provider'] = 'requesty' return mock_response def get_models(self) -> requests.Response: """Get available models from Requesty (with rotation)""" try: current_key = self.key_rotator.get_next_key() headers = self._prepare_headers(current_key) url = f"{self.base_url}/models" response = requests.get(url, headers=headers, timeout=self.timeout) return response except Exception as e: return self._create_error_response( 500, "connection_error", f"Failed to get models from Requesty: {str(e)}" ) def get_models_admin(self) -> requests.Response: """Get available models from Requesty (admin - no rotation)""" try: current_key = self.key_rotator.get_current_key() headers = self._prepare_headers(current_key) url = f"{self.base_url}/models" response = requests.get(url, headers=headers, timeout=self.timeout) return response except Exception as e: return self._create_error_response( 500, "connection_error", f"Failed to get models from Requesty: {str(e)}" ) def get_key_rotation_stats(self) -> Dict[str, Any]: """Get key rotation statistics""" usage_stats = self.key_rotator.get_usage_stats() # Create masked version of usage stats for display masked_usage_stats = {} for key, usage in usage_stats.items(): masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***" masked_usage_stats[masked_key] = usage return { "total_keys": len(self.key_rotator.api_keys), "usage_per_key": masked_usage_stats, "current_index": self.key_rotator.current_index, "next_key_index": (self.key_rotator.current_index) % len(self.key_rotator.api_keys) if self.key_rotator.api_keys else 0, "rotation_working": len(self.key_rotator.api_keys) > 1, "total_requests": sum(usage_stats.values()) } def test_connection(self) -> Dict[str, Any]: """Test connection to Requesty API (with rotation)""" try: start_time = time.time() response = self.get_models() end_time = time.time() return { "status": "healthy" if response.status_code == 200 else "unhealthy", "response_time": end_time - start_time, "status_code": response.status_code } except Exception as e: return { "status": "unhealthy", "error": str(e), "response_time": None } def test_connection_admin(self) -> Dict[str, Any]: """Test connection to Requesty API (admin - no rotation)""" try: start_time = time.time() response = self.get_models_admin() end_time = time.time() return { "status": "healthy" if response.status_code == 200 else "unhealthy", "response_time": end_time - start_time, "status_code": response.status_code } except Exception as e: return { "status": "unhealthy", "error": str(e), "response_time": None } def get_detailed_key_stats(self) -> Dict[str, Any]: """Get detailed statistics for all API keys""" return self.key_rotator.get_detailed_key_stats() def manually_disable_key(self, api_key: str, reason: str = "Manually disabled") -> bool: """Manually disable an API key""" return self.key_rotator.manually_disable_key(api_key, reason) def manually_enable_key(self, api_key: str) -> bool: """Manually enable a disabled API key""" return self.key_rotator.manually_enable_key(api_key)