import os import time import threading from typing import Dict, Any, Optional from .proxy import OpenRouterProxy from .requesty_proxy import RequestyProxy from models.model_costs import get_model_cost_for_provider class ProxyManager: """Manages switching between OpenRouter and Requesty proxies""" def __init__(self): # Initialize proxies based on available API keys self.openrouter_key = os.environ.get('OPENROUTER_KEY') self.requesty_keys = os.environ.get('REQUESTY_API_KEYS') self.openrouter_proxy = OpenRouterProxy() if self.openrouter_key else None self.requesty_proxy = RequestyProxy() if self.requesty_keys else None # Default to OpenRouter if available, otherwise Requesty if self.openrouter_proxy: self.current_provider = "openrouter" elif self.requesty_proxy: self.current_provider = "requesty" else: raise ValueError("No API keys configured for any provider") # Auto-switching configuration self.auto_switch_enabled = False self.switch_interval = 300 # 5 minutes default self.last_switch_time = time.time() self.switch_scheduler = None self.scheduler_lock = threading.Lock() def get_current_proxy(self): """Get the currently active proxy instance""" if self.auto_switch_enabled: self._check_auto_switch() if self.current_provider == "requesty" and self.requesty_proxy: return self.requesty_proxy elif self.current_provider == "openrouter" and self.openrouter_proxy: return self.openrouter_proxy else: # Fallback logic if self.openrouter_proxy: self.current_provider = "openrouter" return self.openrouter_proxy elif self.requesty_proxy: self.current_provider = "requesty" return self.requesty_proxy else: raise ValueError("No proxy available") def forward_request(self, data: Dict[Any, Any], api_key: str, is_special_token: bool = False) -> Any: """Forward request to the current active proxy""" current_proxy = self.get_current_proxy() if self.current_provider == "openrouter": return current_proxy.forward_request(data, self.openrouter_key, is_special_token) else: # Requesty handles its own API key rotation internally # Don't pass the user's api_key as it's not needed for Requesty return current_proxy.forward_request(data, None, is_special_token) def switch_to_provider(self, provider: str) -> bool: """Manually switch to a specific provider""" if provider == "requesty" and self.requesty_proxy: self.current_provider = "requesty" self.last_switch_time = time.time() return True elif provider == "openrouter" and self.openrouter_proxy: self.current_provider = "openrouter" self.last_switch_time = time.time() return True else: return False def configure_auto_switch(self, enabled: bool, interval_seconds: int): """Configure automatic switching between providers""" with self.scheduler_lock: self.auto_switch_enabled = enabled self.switch_interval = interval_seconds if enabled and self._has_multiple_providers(): self._start_switch_scheduler() else: self._stop_switch_scheduler() def _has_multiple_providers(self) -> bool: """Check if we have multiple providers available""" return bool(self.openrouter_proxy and self.requesty_proxy) def _check_auto_switch(self): """Check if it's time for an automatic switch""" if not self.auto_switch_enabled or not self._has_multiple_providers(): return current_time = time.time() if current_time - self.last_switch_time >= self.switch_interval: self._perform_auto_switch() def _start_switch_scheduler(self): """Start the automatic switching scheduler""" self._stop_switch_scheduler() # Stop any existing scheduler if self._has_multiple_providers(): self.switch_scheduler = threading.Timer( self.switch_interval, self._perform_auto_switch ) self.switch_scheduler.daemon = True self.switch_scheduler.start() def _stop_switch_scheduler(self): """Stop the automatic switching scheduler""" if self.switch_scheduler: self.switch_scheduler.cancel() self.switch_scheduler = None def _perform_auto_switch(self): """Perform the automatic switch between providers""" with self.scheduler_lock: # Switch to the other provider if self.current_provider == "openrouter": if self.requesty_proxy: self.current_provider = "requesty" else: if self.openrouter_proxy: self.current_provider = "openrouter" self.last_switch_time = time.time() # Schedule the next switch if auto-switching is still enabled if self.auto_switch_enabled: self._start_switch_scheduler() def get_time_until_next_switch(self) -> Optional[int]: """Get seconds until next automatic switch""" if not self.auto_switch_enabled or not self._has_multiple_providers(): return None elapsed = time.time() - self.last_switch_time remaining = self.switch_interval - elapsed return max(0, int(remaining)) def get_available_providers(self) -> Dict[str, bool]: """Get list of available providers""" return { "openrouter": bool(self.openrouter_proxy), "requesty": bool(self.requesty_proxy) } def check_providers_health(self) -> Dict[str, Dict[str, Any]]: """Check health status of all providers""" health = {} if self.openrouter_proxy: health['openrouter'] = self._test_provider_connection('openrouter') if self.requesty_proxy: health['requesty'] = self._test_provider_connection('requesty') return health def _test_provider_connection(self, provider: str) -> Dict[str, Any]: """Test connection to a specific provider""" try: if provider == 'openrouter' and self.openrouter_proxy: start_time = time.time() response = self.openrouter_proxy.get_models(self.openrouter_key) end_time = time.time() return { "status": "healthy" if response.status_code == 200 else "unhealthy", "response_time": end_time - start_time, "status_code": response.status_code } elif provider == 'requesty' and self.requesty_proxy: # Use admin method to avoid key rotation during health checks return self.requesty_proxy.test_connection_admin() else: return {"status": "unavailable", "error": "Provider not configured"} except Exception as e: return {"status": "unhealthy", "error": str(e)} def get_provider_stats(self) -> Dict[str, Any]: """Get statistics for all providers""" stats = { "current_provider": self.current_provider, "auto_switch_enabled": self.auto_switch_enabled, "switch_interval": self.switch_interval, "time_until_next_switch": self.get_time_until_next_switch(), "available_providers": self.get_available_providers() } # Add OpenRouter stats if self.openrouter_proxy: try: stats["openrouter_stats"] = self.openrouter_proxy.get_cost_optimization_stats() except: stats["openrouter_stats"] = {"error": "Could not load OpenRouter stats"} # Add Requesty stats (including cache from shared cache manager) if self.requesty_proxy: try: key_rotation_stats = self.requesty_proxy.get_key_rotation_stats() # Get cache stats from shared cache manager cache_stats = self.requesty_proxy.cache_manager.get_cache_statistics() stats["requesty_stats"] = { "key_rotation": key_rotation_stats, "cache_optimization": cache_stats, "combined_metrics": { "total_requests_processed": cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0), "cache_hit_ratio": cache_stats.get('hit_ratio', 0), "cost_optimization_enabled": True } } except Exception as e: stats["requesty_stats"] = {"error": f"Could not load Requesty stats: {str(e)}"} return stats def reset_all_stats(self): """Reset statistics for all providers""" # Reset shared cache manager (used by both proxies) from .prompt_cache import get_prompt_cache_manager try: cache_manager = get_prompt_cache_manager() cache_manager.clear_cache() except Exception as e: print(f"Warning: Could not reset cache manager: {e}") if self.openrouter_proxy: try: self.openrouter_proxy.reset_optimization_stats() except: pass if self.requesty_proxy: try: self.requesty_proxy.key_rotator.reset_stats() except: pass def force_switch_now(self) -> bool: """Force an immediate switch to the other provider""" if not self._has_multiple_providers(): return False # Switch to the other provider if self.current_provider == "openrouter": return self.switch_to_provider("requesty") else: return self.switch_to_provider("openrouter") def get_model_cost(self, model: str) -> int: """Get the model cost using the provider-aware cost function""" return get_model_cost_for_provider(model, self.current_provider)