Spaces:
Paused
Paused
| import os | |
| import time | |
| import threading | |
| from typing import Dict, Any, Optional | |
| from .proxy import OpenRouterProxy | |
| from .requesty_proxy import RequestyProxy | |
| from models.model_costs import get_model_cost_for_provider | |
| class ProxyManager: | |
| """Manages switching between OpenRouter and Requesty proxies""" | |
| def __init__(self): | |
| # Initialize proxies based on available API keys | |
| self.openrouter_key = os.environ.get('OPENROUTER_KEY') | |
| self.requesty_keys = os.environ.get('REQUESTY_API_KEYS') | |
| self.openrouter_proxy = OpenRouterProxy() if self.openrouter_key else None | |
| self.requesty_proxy = RequestyProxy() if self.requesty_keys else None | |
| # Default to OpenRouter if available, otherwise Requesty | |
| if self.openrouter_proxy: | |
| self.current_provider = "openrouter" | |
| elif self.requesty_proxy: | |
| self.current_provider = "requesty" | |
| else: | |
| raise ValueError("No API keys configured for any provider") | |
| # Auto-switching configuration | |
| self.auto_switch_enabled = False | |
| self.switch_interval = 300 # 5 minutes default | |
| self.last_switch_time = time.time() | |
| self.switch_scheduler = None | |
| self.scheduler_lock = threading.Lock() | |
| def get_current_proxy(self): | |
| """Get the currently active proxy instance""" | |
| if self.auto_switch_enabled: | |
| self._check_auto_switch() | |
| if self.current_provider == "requesty" and self.requesty_proxy: | |
| return self.requesty_proxy | |
| elif self.current_provider == "openrouter" and self.openrouter_proxy: | |
| return self.openrouter_proxy | |
| else: | |
| # Fallback logic | |
| if self.openrouter_proxy: | |
| self.current_provider = "openrouter" | |
| return self.openrouter_proxy | |
| elif self.requesty_proxy: | |
| self.current_provider = "requesty" | |
| return self.requesty_proxy | |
| else: | |
| raise ValueError("No proxy available") | |
| def forward_request(self, data: Dict[Any, Any], api_key: str, is_special_token: bool = False) -> Any: | |
| """Forward request to the current active proxy""" | |
| current_proxy = self.get_current_proxy() | |
| if self.current_provider == "openrouter": | |
| return current_proxy.forward_request(data, self.openrouter_key, is_special_token) | |
| else: | |
| # Requesty handles its own API key rotation internally | |
| # Don't pass the user's api_key as it's not needed for Requesty | |
| return current_proxy.forward_request(data, None, is_special_token) | |
| def switch_to_provider(self, provider: str) -> bool: | |
| """Manually switch to a specific provider""" | |
| if provider == "requesty" and self.requesty_proxy: | |
| self.current_provider = "requesty" | |
| self.last_switch_time = time.time() | |
| return True | |
| elif provider == "openrouter" and self.openrouter_proxy: | |
| self.current_provider = "openrouter" | |
| self.last_switch_time = time.time() | |
| return True | |
| else: | |
| return False | |
| def configure_auto_switch(self, enabled: bool, interval_seconds: int): | |
| """Configure automatic switching between providers""" | |
| with self.scheduler_lock: | |
| self.auto_switch_enabled = enabled | |
| self.switch_interval = interval_seconds | |
| if enabled and self._has_multiple_providers(): | |
| self._start_switch_scheduler() | |
| else: | |
| self._stop_switch_scheduler() | |
| def _has_multiple_providers(self) -> bool: | |
| """Check if we have multiple providers available""" | |
| return bool(self.openrouter_proxy and self.requesty_proxy) | |
| def _check_auto_switch(self): | |
| """Check if it's time for an automatic switch""" | |
| if not self.auto_switch_enabled or not self._has_multiple_providers(): | |
| return | |
| current_time = time.time() | |
| if current_time - self.last_switch_time >= self.switch_interval: | |
| self._perform_auto_switch() | |
| def _start_switch_scheduler(self): | |
| """Start the automatic switching scheduler""" | |
| self._stop_switch_scheduler() # Stop any existing scheduler | |
| if self._has_multiple_providers(): | |
| self.switch_scheduler = threading.Timer( | |
| self.switch_interval, | |
| self._perform_auto_switch | |
| ) | |
| self.switch_scheduler.daemon = True | |
| self.switch_scheduler.start() | |
| def _stop_switch_scheduler(self): | |
| """Stop the automatic switching scheduler""" | |
| if self.switch_scheduler: | |
| self.switch_scheduler.cancel() | |
| self.switch_scheduler = None | |
| def _perform_auto_switch(self): | |
| """Perform the automatic switch between providers""" | |
| with self.scheduler_lock: | |
| # Switch to the other provider | |
| if self.current_provider == "openrouter": | |
| if self.requesty_proxy: | |
| self.current_provider = "requesty" | |
| else: | |
| if self.openrouter_proxy: | |
| self.current_provider = "openrouter" | |
| self.last_switch_time = time.time() | |
| # Schedule the next switch if auto-switching is still enabled | |
| if self.auto_switch_enabled: | |
| self._start_switch_scheduler() | |
| def get_time_until_next_switch(self) -> Optional[int]: | |
| """Get seconds until next automatic switch""" | |
| if not self.auto_switch_enabled or not self._has_multiple_providers(): | |
| return None | |
| elapsed = time.time() - self.last_switch_time | |
| remaining = self.switch_interval - elapsed | |
| return max(0, int(remaining)) | |
| def get_available_providers(self) -> Dict[str, bool]: | |
| """Get list of available providers""" | |
| return { | |
| "openrouter": bool(self.openrouter_proxy), | |
| "requesty": bool(self.requesty_proxy) | |
| } | |
| def check_providers_health(self) -> Dict[str, Dict[str, Any]]: | |
| """Check health status of all providers""" | |
| health = {} | |
| if self.openrouter_proxy: | |
| health['openrouter'] = self._test_provider_connection('openrouter') | |
| if self.requesty_proxy: | |
| health['requesty'] = self._test_provider_connection('requesty') | |
| return health | |
| def _test_provider_connection(self, provider: str) -> Dict[str, Any]: | |
| """Test connection to a specific provider""" | |
| try: | |
| if provider == 'openrouter' and self.openrouter_proxy: | |
| start_time = time.time() | |
| response = self.openrouter_proxy.get_models(self.openrouter_key) | |
| end_time = time.time() | |
| return { | |
| "status": "healthy" if response.status_code == 200 else "unhealthy", | |
| "response_time": end_time - start_time, | |
| "status_code": response.status_code | |
| } | |
| elif provider == 'requesty' and self.requesty_proxy: | |
| # Use admin method to avoid key rotation during health checks | |
| return self.requesty_proxy.test_connection_admin() | |
| else: | |
| return {"status": "unavailable", "error": "Provider not configured"} | |
| except Exception as e: | |
| return {"status": "unhealthy", "error": str(e)} | |
| def get_provider_stats(self) -> Dict[str, Any]: | |
| """Get statistics for all providers""" | |
| stats = { | |
| "current_provider": self.current_provider, | |
| "auto_switch_enabled": self.auto_switch_enabled, | |
| "switch_interval": self.switch_interval, | |
| "time_until_next_switch": self.get_time_until_next_switch(), | |
| "available_providers": self.get_available_providers() | |
| } | |
| # Add OpenRouter stats | |
| if self.openrouter_proxy: | |
| try: | |
| stats["openrouter_stats"] = self.openrouter_proxy.get_cost_optimization_stats() | |
| except: | |
| stats["openrouter_stats"] = {"error": "Could not load OpenRouter stats"} | |
| # Add Requesty stats (including cache from shared cache manager) | |
| if self.requesty_proxy: | |
| try: | |
| key_rotation_stats = self.requesty_proxy.get_key_rotation_stats() | |
| # Get cache stats from shared cache manager | |
| cache_stats = self.requesty_proxy.cache_manager.get_cache_statistics() | |
| stats["requesty_stats"] = { | |
| "key_rotation": key_rotation_stats, | |
| "cache_optimization": cache_stats, | |
| "combined_metrics": { | |
| "total_requests_processed": cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0), | |
| "cache_hit_ratio": cache_stats.get('hit_ratio', 0), | |
| "cost_optimization_enabled": True | |
| } | |
| } | |
| except Exception as e: | |
| stats["requesty_stats"] = {"error": f"Could not load Requesty stats: {str(e)}"} | |
| return stats | |
| def reset_all_stats(self): | |
| """Reset statistics for all providers""" | |
| # Reset shared cache manager (used by both proxies) | |
| from .prompt_cache import get_prompt_cache_manager | |
| try: | |
| cache_manager = get_prompt_cache_manager() | |
| cache_manager.clear_cache() | |
| except Exception as e: | |
| print(f"Warning: Could not reset cache manager: {e}") | |
| if self.openrouter_proxy: | |
| try: | |
| self.openrouter_proxy.reset_optimization_stats() | |
| except: | |
| pass | |
| if self.requesty_proxy: | |
| try: | |
| self.requesty_proxy.key_rotator.reset_stats() | |
| except: | |
| pass | |
| def force_switch_now(self) -> bool: | |
| """Force an immediate switch to the other provider""" | |
| if not self._has_multiple_providers(): | |
| return False | |
| # Switch to the other provider | |
| if self.current_provider == "openrouter": | |
| return self.switch_to_provider("requesty") | |
| else: | |
| return self.switch_to_provider("openrouter") | |
| def get_model_cost(self, model: str) -> int: | |
| """Get the model cost using the provider-aware cost function""" | |
| return get_model_cost_for_provider(model, self.current_provider) | |