movidik / utils /proxy_manager.py
justadri23's picture
Upload 25 files
f8d7ba5 verified
import os
import time
import threading
from typing import Dict, Any, Optional
from .proxy import OpenRouterProxy
from .requesty_proxy import RequestyProxy
from models.model_costs import get_model_cost_for_provider
class ProxyManager:
"""Manages switching between OpenRouter and Requesty proxies"""
def __init__(self):
# Initialize proxies based on available API keys
self.openrouter_key = os.environ.get('OPENROUTER_KEY')
self.requesty_keys = os.environ.get('REQUESTY_API_KEYS')
self.openrouter_proxy = OpenRouterProxy() if self.openrouter_key else None
self.requesty_proxy = RequestyProxy() if self.requesty_keys else None
# Default to OpenRouter if available, otherwise Requesty
if self.openrouter_proxy:
self.current_provider = "openrouter"
elif self.requesty_proxy:
self.current_provider = "requesty"
else:
raise ValueError("No API keys configured for any provider")
# Auto-switching configuration
self.auto_switch_enabled = False
self.switch_interval = 300 # 5 minutes default
self.last_switch_time = time.time()
self.switch_scheduler = None
self.scheduler_lock = threading.Lock()
def get_current_proxy(self):
"""Get the currently active proxy instance"""
if self.auto_switch_enabled:
self._check_auto_switch()
if self.current_provider == "requesty" and self.requesty_proxy:
return self.requesty_proxy
elif self.current_provider == "openrouter" and self.openrouter_proxy:
return self.openrouter_proxy
else:
# Fallback logic
if self.openrouter_proxy:
self.current_provider = "openrouter"
return self.openrouter_proxy
elif self.requesty_proxy:
self.current_provider = "requesty"
return self.requesty_proxy
else:
raise ValueError("No proxy available")
def forward_request(self, data: Dict[Any, Any], api_key: str, is_special_token: bool = False) -> Any:
"""Forward request to the current active proxy"""
current_proxy = self.get_current_proxy()
if self.current_provider == "openrouter":
return current_proxy.forward_request(data, self.openrouter_key, is_special_token)
else:
# Requesty handles its own API key rotation internally
# Don't pass the user's api_key as it's not needed for Requesty
return current_proxy.forward_request(data, None, is_special_token)
def switch_to_provider(self, provider: str) -> bool:
"""Manually switch to a specific provider"""
if provider == "requesty" and self.requesty_proxy:
self.current_provider = "requesty"
self.last_switch_time = time.time()
return True
elif provider == "openrouter" and self.openrouter_proxy:
self.current_provider = "openrouter"
self.last_switch_time = time.time()
return True
else:
return False
def configure_auto_switch(self, enabled: bool, interval_seconds: int):
"""Configure automatic switching between providers"""
with self.scheduler_lock:
self.auto_switch_enabled = enabled
self.switch_interval = interval_seconds
if enabled and self._has_multiple_providers():
self._start_switch_scheduler()
else:
self._stop_switch_scheduler()
def _has_multiple_providers(self) -> bool:
"""Check if we have multiple providers available"""
return bool(self.openrouter_proxy and self.requesty_proxy)
def _check_auto_switch(self):
"""Check if it's time for an automatic switch"""
if not self.auto_switch_enabled or not self._has_multiple_providers():
return
current_time = time.time()
if current_time - self.last_switch_time >= self.switch_interval:
self._perform_auto_switch()
def _start_switch_scheduler(self):
"""Start the automatic switching scheduler"""
self._stop_switch_scheduler() # Stop any existing scheduler
if self._has_multiple_providers():
self.switch_scheduler = threading.Timer(
self.switch_interval,
self._perform_auto_switch
)
self.switch_scheduler.daemon = True
self.switch_scheduler.start()
def _stop_switch_scheduler(self):
"""Stop the automatic switching scheduler"""
if self.switch_scheduler:
self.switch_scheduler.cancel()
self.switch_scheduler = None
def _perform_auto_switch(self):
"""Perform the automatic switch between providers"""
with self.scheduler_lock:
# Switch to the other provider
if self.current_provider == "openrouter":
if self.requesty_proxy:
self.current_provider = "requesty"
else:
if self.openrouter_proxy:
self.current_provider = "openrouter"
self.last_switch_time = time.time()
# Schedule the next switch if auto-switching is still enabled
if self.auto_switch_enabled:
self._start_switch_scheduler()
def get_time_until_next_switch(self) -> Optional[int]:
"""Get seconds until next automatic switch"""
if not self.auto_switch_enabled or not self._has_multiple_providers():
return None
elapsed = time.time() - self.last_switch_time
remaining = self.switch_interval - elapsed
return max(0, int(remaining))
def get_available_providers(self) -> Dict[str, bool]:
"""Get list of available providers"""
return {
"openrouter": bool(self.openrouter_proxy),
"requesty": bool(self.requesty_proxy)
}
def check_providers_health(self) -> Dict[str, Dict[str, Any]]:
"""Check health status of all providers"""
health = {}
if self.openrouter_proxy:
health['openrouter'] = self._test_provider_connection('openrouter')
if self.requesty_proxy:
health['requesty'] = self._test_provider_connection('requesty')
return health
def _test_provider_connection(self, provider: str) -> Dict[str, Any]:
"""Test connection to a specific provider"""
try:
if provider == 'openrouter' and self.openrouter_proxy:
start_time = time.time()
response = self.openrouter_proxy.get_models(self.openrouter_key)
end_time = time.time()
return {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"response_time": end_time - start_time,
"status_code": response.status_code
}
elif provider == 'requesty' and self.requesty_proxy:
# Use admin method to avoid key rotation during health checks
return self.requesty_proxy.test_connection_admin()
else:
return {"status": "unavailable", "error": "Provider not configured"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
def get_provider_stats(self) -> Dict[str, Any]:
"""Get statistics for all providers"""
stats = {
"current_provider": self.current_provider,
"auto_switch_enabled": self.auto_switch_enabled,
"switch_interval": self.switch_interval,
"time_until_next_switch": self.get_time_until_next_switch(),
"available_providers": self.get_available_providers()
}
# Add OpenRouter stats
if self.openrouter_proxy:
try:
stats["openrouter_stats"] = self.openrouter_proxy.get_cost_optimization_stats()
except:
stats["openrouter_stats"] = {"error": "Could not load OpenRouter stats"}
# Add Requesty stats (including cache from shared cache manager)
if self.requesty_proxy:
try:
key_rotation_stats = self.requesty_proxy.get_key_rotation_stats()
# Get cache stats from shared cache manager
cache_stats = self.requesty_proxy.cache_manager.get_cache_statistics()
stats["requesty_stats"] = {
"key_rotation": key_rotation_stats,
"cache_optimization": cache_stats,
"combined_metrics": {
"total_requests_processed": cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0),
"cache_hit_ratio": cache_stats.get('hit_ratio', 0),
"cost_optimization_enabled": True
}
}
except Exception as e:
stats["requesty_stats"] = {"error": f"Could not load Requesty stats: {str(e)}"}
return stats
def reset_all_stats(self):
"""Reset statistics for all providers"""
# Reset shared cache manager (used by both proxies)
from .prompt_cache import get_prompt_cache_manager
try:
cache_manager = get_prompt_cache_manager()
cache_manager.clear_cache()
except Exception as e:
print(f"Warning: Could not reset cache manager: {e}")
if self.openrouter_proxy:
try:
self.openrouter_proxy.reset_optimization_stats()
except:
pass
if self.requesty_proxy:
try:
self.requesty_proxy.key_rotator.reset_stats()
except:
pass
def force_switch_now(self) -> bool:
"""Force an immediate switch to the other provider"""
if not self._has_multiple_providers():
return False
# Switch to the other provider
if self.current_provider == "openrouter":
return self.switch_to_provider("requesty")
else:
return self.switch_to_provider("openrouter")
def get_model_cost(self, model: str) -> int:
"""Get the model cost using the provider-aware cost function"""
return get_model_cost_for_provider(model, self.current_provider)