movidik / utils /requesty_proxy.py
justadri23's picture
Update utils/requesty_proxy.py
b6f3322 verified
import requests
import json
import os
import time
import threading
import logging
from typing import Dict, Any, List
from .token_limiter import get_token_limiter
from .prompt_cache import get_prompt_cache_manager
from models.model_costs import translate_model_to_requesty, translate_model_to_openrouter
# Configure logging
logger = logging.getLogger(__name__)
class RequestyKeyRotator:
"""Manages rotation of multiple Requesty API keys with auto-disable functionality"""
def __init__(self):
self.api_keys = self._load_api_keys()
self.current_index = 0
self.usage_stats = {key: 0 for key in self.api_keys}
self.lock = threading.Lock()
# Auto-disable functionality
self.active_keys = set(self.api_keys)
self.disabled_keys = set()
self.failure_counts = {key: 0 for key in self.api_keys}
self.success_counts = {key: 0 for key in self.api_keys}
self.last_failure_time = {}
self.last_used_time = {}
self.disable_reasons = {}
self.failure_threshold = 10
self.cooldown_period = 300 # 5 minutes
# Log initialization info
if self.api_keys:
logger.info(f"RequestyKeyRotator initialized with {len(self.api_keys)} API keys")
else:
logger.warning("RequestyKeyRotator: No API keys found in REQUESTY_API_KEYS environment variable")
def _load_api_keys(self) -> List[str]:
"""Load API keys from environment variable"""
keys_str = os.environ.get('REQUESTY_API_KEYS', '')
return [key.strip() for key in keys_str.split(',') if key.strip()]
def get_next_key(self) -> str:
"""Get the next API key in rotation"""
with self.lock:
if not self.api_keys:
raise ValueError("No Requesty API keys configured")
key = self.api_keys[self.current_index]
old_index = self.current_index
self.current_index = (self.current_index + 1) % len(self.api_keys)
self.usage_stats[key] += 1
# Log key rotation for visibility
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***"
logger.info(f"Requesty API Key Rotation: Using key {old_index + 1}/{len(self.api_keys)} ({masked_key})")
return key
def get_current_key(self) -> str:
"""Get the current API key without rotating (for admin operations)"""
with self.lock:
if not self.api_keys:
raise ValueError("No Requesty API keys configured")
key = self.api_keys[self.current_index]
return key
def get_usage_stats(self) -> Dict[str, int]:
"""Get usage statistics for all keys"""
return self.usage_stats.copy()
def get_next_active_key(self) -> str:
"""Get the next active API key in rotation, skipping disabled ones"""
with self.lock:
if not self.active_keys:
raise ValueError("No active Requesty API keys available")
# Try to reactivate keys that have been in cooldown
self._try_reactivate_keys()
if not self.active_keys:
raise ValueError("No active Requesty API keys available after cooldown check")
# Find next active key starting from current index
attempts = 0
while attempts < len(self.api_keys):
key = self.api_keys[self.current_index]
self.current_index = (self.current_index + 1) % len(self.api_keys)
if key in self.active_keys:
self.usage_stats[key] += 1
self.last_used_time[key] = time.time()
# Log key rotation for visibility
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***"
logger.info(f"Requesty Active Key: Using {masked_key} ({len(self.active_keys)} active keys)")
return key
attempts += 1
raise ValueError("No active Requesty API keys available")
def record_failure(self, api_key: str, error_code: int = None):
"""Record a failure for an API key and auto-disable if threshold reached"""
with self.lock:
if api_key not in self.api_keys:
return
self.failure_counts[api_key] += 1
self.last_failure_time[api_key] = time.time()
masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***"
logger.warning(f"Requesty Key Failure: {masked_key} failed (count: {self.failure_counts[api_key]}, error: {error_code})")
# Auto-disable if threshold reached
if self.failure_counts[api_key] >= self.failure_threshold and api_key in self.active_keys:
self.active_keys.remove(api_key)
self.disabled_keys.add(api_key)
self.disable_reasons[api_key] = f"Auto-disabled after {self.failure_threshold} consecutive failures"
logger.error(f"Requesty Key Auto-Disabled: {masked_key} (failures: {self.failure_counts[api_key]})")
# Alert if running low on active keys
if len(self.active_keys) <= 1:
logger.critical(f"ALERT: Only {len(self.active_keys)} Requesty API keys remaining active!")
def record_success(self, api_key: str):
"""Record a successful request for an API key, resetting failure count"""
with self.lock:
if api_key not in self.api_keys:
return
# Reset failure count on success
old_failures = self.failure_counts[api_key]
self.failure_counts[api_key] = 0
self.success_counts[api_key] += 1
if old_failures > 0:
masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***"
logger.info(f"Requesty Key Recovered: {masked_key} success after {old_failures} failures")
def _try_reactivate_keys(self):
"""Try to reactivate keys that have been in cooldown period"""
current_time = time.time()
reactivated_keys = []
for key in list(self.disabled_keys):
last_failure = self.last_failure_time.get(key, 0)
if current_time - last_failure >= self.cooldown_period:
# Only reactivate if it was auto-disabled (not manually disabled)
if self.disable_reasons.get(key, "").startswith("Auto-disabled"):
self.disabled_keys.remove(key)
self.active_keys.add(key)
self.failure_counts[key] = 0 # Reset failure count
self.disable_reasons.pop(key, None)
reactivated_keys.append(key)
if reactivated_keys:
masked_keys = [f"{key[:8]}...{key[-4:]}" for key in reactivated_keys]
logger.info(f"Requesty Keys Reactivated: {', '.join(masked_keys)} after cooldown period")
def manually_disable_key(self, api_key: str, reason: str = "Manually disabled") -> bool:
"""Manually disable an API key"""
with self.lock:
if api_key not in self.api_keys:
return False
if api_key in self.active_keys:
self.active_keys.remove(api_key)
self.disabled_keys.add(api_key)
self.disable_reasons[api_key] = reason
masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***"
logger.info(f"Requesty Key Manually Disabled: {masked_key} - {reason}")
return True
return False
def manually_enable_key(self, api_key: str) -> bool:
"""Manually enable a disabled API key"""
with self.lock:
if api_key not in self.api_keys:
return False
if api_key in self.disabled_keys:
self.disabled_keys.remove(api_key)
self.active_keys.add(api_key)
self.failure_counts[api_key] = 0 # Reset failure count
self.disable_reasons.pop(api_key, None)
masked_key = f"{api_key[:8]}...{api_key[-4:]}" if len(api_key) > 12 else "***"
logger.info(f"Requesty Key Manually Enabled: {masked_key}")
return True
return False
def get_detailed_key_stats(self) -> Dict[str, Any]:
"""Get detailed statistics for all API keys"""
with self.lock:
keys_detail = []
for key in self.api_keys:
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***"
# Determine status
if key in self.active_keys:
if self.failure_counts[key] == 0:
status = "healthy"
elif self.failure_counts[key] < self.failure_threshold:
status = "warning"
else:
status = "active" # Should not happen, but just in case
else:
status = "disabled"
keys_detail.append({
"key_id": masked_key,
"full_key": key, # For internal operations only
"status": status,
"is_active": key in self.active_keys,
"failure_count": self.failure_counts[key],
"success_count": self.success_counts[key],
"total_usage": self.usage_stats[key],
"last_used": self.last_used_time.get(key),
"last_failure": self.last_failure_time.get(key),
"disable_reason": self.disable_reasons.get(key, ""),
"can_be_reactivated": (
key in self.disabled_keys and
self.disable_reasons.get(key, "").startswith("Auto-disabled") and
time.time() - self.last_failure_time.get(key, 0) >= self.cooldown_period
)
})
return {
"total_keys": len(self.api_keys),
"active_keys": len(self.active_keys),
"disabled_keys": len(self.disabled_keys),
"healthy_keys": len([k for k in self.active_keys if self.failure_counts[k] == 0]),
"warning_keys": len([k for k in self.active_keys if 0 < self.failure_counts[k] < self.failure_threshold]),
"failure_threshold": self.failure_threshold,
"cooldown_period": self.cooldown_period,
"keys_detail": keys_detail
}
def reset_stats(self):
"""Reset usage statistics"""
with self.lock:
self.usage_stats = {key: 0 for key in self.api_keys}
self.failure_counts = {key: 0 for key in self.api_keys}
self.success_counts = {key: 0 for key in self.api_keys}
self.last_failure_time = {}
self.last_used_time = {}
class RequestyProxy:
"""Proxy for Requesty API requests with key rotation and auto-cache"""
def __init__(self):
self.base_url = "https://router.requesty.ai/v1"
self.timeout = 30
self.key_rotator = RequestyKeyRotator()
self.token_limiter = get_token_limiter()
self.cache_manager = get_prompt_cache_manager()
self.site_url = os.environ.get('REQUESTY_SITE_URL', 'https://github.com/openrouter-proxy')
self.site_name = os.environ.get('REQUESTY_SITE_NAME', 'OpenRouter Proxy')
def forward_request(self, data: Dict[Any, Any], api_key: str = None, is_special_token: bool = False) -> requests.Response:
"""Forward chat completion request to Requesty with cache optimization"""
model = data.get('model', '')
if not is_special_token:
# Apply normal limitations for regular tokens
# 1. Apply token limits (same as OpenRouter)
is_valid, limited_data, limit_reason = self.token_limiter.validate_and_limit_request(data, model)
if not is_valid:
return self._create_error_response(
400,
"token_limit_exceeded",
limit_reason
)
# 2. Apply cost optimizations
optimized_data = self.token_limiter.add_cost_optimization_to_request(limited_data, model)
# 3. PROCESS WITH MANDATORY CACHE (same logic as OpenRouter)
cache_hit, cached_response, final_data, cache_key = self.cache_manager.process_request_with_cache(
optimized_data, model
)
if cache_hit and cached_response:
# Return cached response
logger.info(f"✓ Requesty Cache HIT for model {model} (key: {cache_key[:16]}...)")
return self._create_cached_response(cached_response)
# 4. Add Requesty-specific optimizations
final_data = self._add_requesty_optimizations(final_data)
else:
# Special token - bypass all limitations and cache
final_data = self._add_requesty_optimizations(data, bypass_limitations=True)
cache_key = "special_token_bypass"
# 5. CLEAN STOP SEQUENCES TO PREVENT API ERRORS
final_data = self._clean_stop_sequences(final_data)
# 6. Retry logic with all active API keys
url = f"{self.base_url}/chat/completions"
max_attempts = len(self.key_rotator.active_keys) if self.key_rotator.active_keys else 1
attempts = 0
last_error = None
while attempts < max_attempts:
try:
# Get next active API key
current_key = self.key_rotator.get_next_active_key()
except ValueError as e:
# No active keys available
return self._create_error_response(500, "no_active_keys", str(e))
# Prepare headers for current key
headers = self._prepare_headers(current_key)
try:
# Send request to Requesty
if final_data.get('stream', False):
response = requests.post(
url,
json=final_data,
headers=headers,
timeout=self.timeout,
stream=True
)
else:
response = requests.post(
url,
json=final_data,
headers=headers,
timeout=self.timeout
)
# Check response status
if response.status_code == 200:
# Success! Record it and return
self.key_rotator.record_success(current_key)
# Store in cache if successful and not streaming (only for regular tokens)
if not is_special_token and not final_data.get('stream', False):
try:
response_json = response.json()
self.cache_manager.store_in_cache(cache_key, response_json, model)
logger.info(f"✓ Requesty Cache STORED for model {model} (key: {cache_key[:16]}...)")
except Exception as e:
logger.warning(f"Failed to store Requesty response in cache: {e}")
return response
elif response.status_code in [401, 403, 429]:
# API key related errors - record failure and try next key
self.key_rotator.record_failure(current_key, response.status_code)
attempts += 1
try:
error_data = response.json()
last_error = error_data.get('error', {}).get('message', f'HTTP {response.status_code}')
# Store detailed error for final response
error_details = {
"status_code": response.status_code,
"response_body": error_data,
"headers": dict(response.headers),
"url": url
}
except:
last_error = f'HTTP {response.status_code} - API key authentication failed'
error_details = {
"status_code": response.status_code,
"response_body": response.text,
"headers": dict(response.headers),
"url": url
}
logger.warning(f"Requesty API key failed with {response.status_code}, trying next key...")
continue
elif response.status_code >= 500:
# Server errors - record failure and try next key
self.key_rotator.record_failure(current_key, response.status_code)
attempts += 1
try:
error_data = response.json()
last_error = error_data.get('error', {}).get('message', f'HTTP {response.status_code}')
# Store detailed error for final response
error_details = {
"status_code": response.status_code,
"response_body": error_data,
"headers": dict(response.headers),
"url": url
}
except:
last_error = f'HTTP {response.status_code} - Server error'
error_details = {
"status_code": response.status_code,
"response_body": response.text,
"headers": dict(response.headers),
"url": url
}
logger.warning(f"Requesty server error {response.status_code}, trying next key...")
continue
else:
# Other HTTP errors (4xx client errors) - don't retry, return with detailed error
logger.warning(f"Requesty client error {response.status_code}, not retrying")
try:
error_data = response.json()
# Enhance the response with additional context
if 'error' in error_data:
error_data['error']['provider'] = 'requesty'
error_data['error']['url'] = url
error_data['error']['headers'] = dict(response.headers)
# Update response content with enhanced error data
response._content = json.dumps(error_data).encode('utf-8')
except:
# If we can't parse JSON, create our own error response
return self._create_error_response(
response.status_code,
"client_error",
f"HTTP {response.status_code}: {response.text}",
{
"original_status_code": response.status_code,
"response_body": response.text,
"headers": dict(response.headers),
"url": url
}
)
return response
except requests.exceptions.RequestException as e:
# Network/connection errors - record failure and try next key
self.key_rotator.record_failure(current_key, None)
attempts += 1
last_error = str(e)
logger.warning(f"Requesty connection error: {e}, trying next key...")
continue
# If we get here, all keys failed
logger.error(f"All Requesty API keys failed after {attempts} attempts")
# Include detailed error information if available
error_context = {
"attempts_made": attempts,
"max_attempts": max_attempts,
"active_keys_count": len(self.key_rotator.active_keys),
"last_error_details": locals().get('error_details', None)
}
return self._create_error_response(
500,
"all_keys_failed",
f"All API keys are currently unavailable. Last error: {last_error}",
error_context
)
def _clean_stop_sequences(self, data: Dict[Any, Any]) -> Dict[Any, Any]:
"""Clean stop sequences to avoid 'stop_sequences must contain non-whitespace' errors"""
cleaned_data = data.copy()
if 'stop' in cleaned_data and isinstance(cleaned_data['stop'], list):
# Filter valid stop sequences (non-empty and containing non-whitespace characters)
valid_stops = []
for stop in cleaned_data['stop']:
if stop and isinstance(stop, str) and stop.strip():
valid_stops.append(stop)
if valid_stops:
cleaned_data['stop'] = valid_stops
else:
# If no valid stop sequences remain, remove the field entirely
del cleaned_data['stop']
logger.info("Removed empty stop sequences to prevent API error")
return cleaned_data
def _add_requesty_optimizations(self, data: Dict[Any, Any], bypass_limitations: bool = False) -> Dict[Any, Any]:
"""Add Requesty-specific optimizations and model translation"""
optimized_data = data.copy()
# Translate model name from OpenRouter format to Requesty format
original_model = optimized_data.get('model', '')
requesty_model = translate_model_to_requesty(original_model)
optimized_data['model'] = requesty_model
# Add auto_cache flag for Requesty (always enabled)
requesty_config = {
"auto_cache": True
}
# Add special token indication if bypassing limitations
if bypass_limitations:
requesty_config["special_token"] = True
requesty_config["unlimited"] = True
optimized_data["requesty"] = requesty_config
return optimized_data
def _prepare_headers(self, api_key: str) -> Dict[str, str]:
"""Prepare headers for Requesty API"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": self.site_url,
"X-Title": self.site_name
}
return headers
def _create_error_response(self, status_code: int, error_type: str, message: str, context: Dict = None) -> requests.Response:
"""Create a mock error response with additional context"""
mock_response = requests.Response()
mock_response.status_code = status_code
error_data = {
"error": {
"message": message,
"type": error_type,
"code": status_code,
"provider": "requesty"
}
}
# Add context information if provided
if context:
error_data["error"].update(context)
mock_response._content = json.dumps(error_data).encode('utf-8')
mock_response.headers['Content-Type'] = 'application/json'
return mock_response
def _create_cached_response(self, cached_data: Dict) -> requests.Response:
"""Create a mock response from cached data"""
mock_response = requests.Response()
mock_response.status_code = 200
# Add cache metadata to the response
enhanced_response = cached_data.copy()
enhanced_response['x_cache_info'] = {
"cache_hit": True,
"source": "requesty_local_cache",
"cost_savings": True
}
mock_response._content = json.dumps(enhanced_response).encode('utf-8')
mock_response.headers['Content-Type'] = 'application/json'
mock_response.headers['X-Cache-Status'] = 'HIT'
mock_response.headers['X-Cost-Optimized'] = 'true'
mock_response.headers['X-Provider'] = 'requesty'
return mock_response
def get_models(self) -> requests.Response:
"""Get available models from Requesty (with rotation)"""
try:
current_key = self.key_rotator.get_next_key()
headers = self._prepare_headers(current_key)
url = f"{self.base_url}/models"
response = requests.get(url, headers=headers, timeout=self.timeout)
return response
except Exception as e:
return self._create_error_response(
500,
"connection_error",
f"Failed to get models from Requesty: {str(e)}"
)
def get_models_admin(self) -> requests.Response:
"""Get available models from Requesty (admin - no rotation)"""
try:
current_key = self.key_rotator.get_current_key()
headers = self._prepare_headers(current_key)
url = f"{self.base_url}/models"
response = requests.get(url, headers=headers, timeout=self.timeout)
return response
except Exception as e:
return self._create_error_response(
500,
"connection_error",
f"Failed to get models from Requesty: {str(e)}"
)
def get_key_rotation_stats(self) -> Dict[str, Any]:
"""Get key rotation statistics"""
usage_stats = self.key_rotator.get_usage_stats()
# Create masked version of usage stats for display
masked_usage_stats = {}
for key, usage in usage_stats.items():
masked_key = f"{key[:8]}...{key[-4:]}" if len(key) > 12 else "***"
masked_usage_stats[masked_key] = usage
return {
"total_keys": len(self.key_rotator.api_keys),
"usage_per_key": masked_usage_stats,
"current_index": self.key_rotator.current_index,
"next_key_index": (self.key_rotator.current_index) % len(self.key_rotator.api_keys) if self.key_rotator.api_keys else 0,
"rotation_working": len(self.key_rotator.api_keys) > 1,
"total_requests": sum(usage_stats.values())
}
def test_connection(self) -> Dict[str, Any]:
"""Test connection to Requesty API (with rotation)"""
try:
start_time = time.time()
response = self.get_models()
end_time = time.time()
return {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"response_time": end_time - start_time,
"status_code": response.status_code
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"response_time": None
}
def test_connection_admin(self) -> Dict[str, Any]:
"""Test connection to Requesty API (admin - no rotation)"""
try:
start_time = time.time()
response = self.get_models_admin()
end_time = time.time()
return {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"response_time": end_time - start_time,
"status_code": response.status_code
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"response_time": None
}
def get_detailed_key_stats(self) -> Dict[str, Any]:
"""Get detailed statistics for all API keys"""
return self.key_rotator.get_detailed_key_stats()
def manually_disable_key(self, api_key: str, reason: str = "Manually disabled") -> bool:
"""Manually disable an API key"""
return self.key_rotator.manually_disable_key(api_key, reason)
def manually_enable_key(self, api_key: str) -> bool:
"""Manually enable a disabled API key"""
return self.key_rotator.manually_enable_key(api_key)