movidik / utils /proxy.py
justadri23's picture
Upload 27 files
148f469 verified
import requests
import json
import logging
from typing import Dict, Any, Tuple
from .token_limiter import get_token_limiter
from .prompt_cache import get_prompt_cache_manager
# Configure logging
logger = logging.getLogger(__name__)
class OpenRouterProxy:
"""Proxy for OpenRouter API requests with cost optimization"""
def __init__(self):
self.base_url = "https://openrouter.ai/api/v1"
self.timeout = 30
self.token_limiter = get_token_limiter()
self.cache_manager = get_prompt_cache_manager()
def forward_request(self, data: Dict[Any, Any], api_key: str, is_special_token: bool = False) -> requests.Response:
"""Forward chat completion request to OpenRouter with cost optimization"""
model = data.get('model', '')
if not is_special_token:
# Apply normal limitations for regular tokens
# 1. APLICAR LÍMITE DE TOKENS OBLIGATORIO
is_valid, limited_data, limit_reason = self.token_limiter.validate_and_limit_request(data, model)
if not is_valid:
# Crear respuesta de error por límite de tokens
error_response = self._create_error_response(
400,
"token_limit_exceeded",
limit_reason
)
return error_response
# 2. APLICAR OPTIMIZACIONES DE COSTO
optimized_data = self.token_limiter.add_cost_optimization_to_request(limited_data, model)
# 3. PROCESAR CON CACHE OBLIGATORIO
cache_hit, cached_response, final_data, cache_key = self.cache_manager.process_request_with_cache(
optimized_data, model
)
if cache_hit and cached_response:
# Retornar respuesta desde cache
logger.info(f"✓ OpenRouter Cache HIT for model {model} (key: {cache_key[:16]}...)")
return self._create_cached_response(cached_response)
else:
# Special token - bypass all limitations
final_data = data.copy()
cache_key = "special_token_bypass"
# 4. CLEAN STOP SEQUENCES TO PREVENT API ERRORS
final_data = self._clean_stop_sequences(final_data)
# 5. PREPARAR HEADERS OPTIMIZADOS
headers = self._prepare_optimized_headers(api_key, cache_key)
# 6. ENVIAR REQUEST A OPENROUTER
url = f"{self.base_url}/chat/completions"
try:
if final_data.get('stream', False):
# Handle streaming requests
response = requests.post(
url,
json=final_data,
headers=headers,
timeout=self.timeout,
stream=True
)
else:
# Handle non-streaming requests
response = requests.post(
url,
json=final_data,
headers=headers,
timeout=self.timeout
)
# 6. GUARDAR EN CACHE SI LA RESPUESTA ES EXITOSA (only for regular tokens)
if not is_special_token and response.status_code == 200 and not final_data.get('stream', False):
try:
response_json = response.json()
self.cache_manager.store_in_cache(cache_key, response_json, model)
logger.info(f"✓ OpenRouter Cache STORED for model {model} (key: {cache_key[:16]}...)")
except Exception as e:
logger.warning(f"Failed to store OpenRouter response in cache: {e}")
return response
except requests.exceptions.RequestException as e:
# Create a mock response for connection errors
return self._create_error_response(
500,
"connection_error",
f"Failed to connect to OpenRouter: {str(e)}",
{"provider": "openrouter", "model": model}
)
def _clean_stop_sequences(self, data: Dict[Any, Any]) -> Dict[Any, Any]:
"""Clean stop sequences to avoid 'stop_sequences must contain non-whitespace' errors"""
cleaned_data = data.copy()
if 'stop' in cleaned_data and isinstance(cleaned_data['stop'], list):
# Filter valid stop sequences (non-empty and containing non-whitespace characters)
valid_stops = []
for stop in cleaned_data['stop']:
if stop and isinstance(stop, str) and stop.strip():
valid_stops.append(stop)
if valid_stops:
cleaned_data['stop'] = valid_stops
else:
# If no valid stop sequences remain, remove the field entirely
del cleaned_data['stop']
logger.info("Removed empty stop sequences to prevent API error")
return cleaned_data
def _prepare_optimized_headers(self, api_key: str, cache_key: str) -> Dict[str, str]:
"""Prepara headers optimizados para OpenRouter con configuración de cache"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/openrouter-proxy",
"X-Title": "Cost-Optimized OpenRouter Proxy"
}
# Agregar headers específicos de cache si están disponibles
cache_headers = {
"X-OpenRouter-Cache": "true",
"X-OpenRouter-Cache-Key": cache_key[:16],
"X-OpenRouter-Cost-Optimization": "enabled"
}
headers.update(cache_headers)
return headers
def _create_error_response(self, status_code: int, error_type: str, message: str, context: Dict = None) -> requests.Response:
"""Crea una respuesta de error mock con contexto adicional"""
mock_response = requests.Response()
mock_response.status_code = status_code
error_data = {
"error": {
"message": message,
"type": error_type,
"code": status_code
}
}
# Add context information if provided
if context:
error_data["error"].update(context)
mock_response._content = json.dumps(error_data).encode('utf-8')
mock_response.headers['Content-Type'] = 'application/json'
return mock_response
def _create_cached_response(self, cached_data: Dict) -> requests.Response:
"""Crea una respuesta mock desde datos del cache"""
mock_response = requests.Response()
mock_response.status_code = 200
# Agregar metadata de cache a la respuesta
enhanced_response = cached_data.copy()
enhanced_response['x_cache_info'] = {
"cache_hit": True,
"source": "local_cache",
"cost_savings": True
}
mock_response._content = json.dumps(enhanced_response).encode('utf-8')
mock_response.headers['Content-Type'] = 'application/json'
mock_response.headers['X-Cache-Status'] = 'HIT'
mock_response.headers['X-Cost-Optimized'] = 'true'
return mock_response
def get_models(self, api_key: str) -> requests.Response:
"""Get available models from OpenRouter"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
url = f"{self.base_url}/models"
try:
response = requests.get(url, headers=headers, timeout=self.timeout)
return response
except requests.exceptions.RequestException as e:
return self._create_error_response(
500,
"connection_error",
f"Failed to connect to OpenRouter: {str(e)}"
)
def get_cost_optimization_stats(self) -> Dict[str, Any]:
"""Obtiene estadísticas de optimización de costos"""
token_stats = self.token_limiter.get_statistics()
cache_stats = self.cache_manager.get_cache_statistics()
# Calcular ahorros combinados
total_requests = cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0)
return {
"token_optimization": token_stats,
"cache_optimization": cache_stats,
"combined_metrics": {
"total_requests_processed": total_requests,
"cache_hit_ratio": cache_stats.get('hit_ratio', 0),
"tokens_saved": token_stats.get('total_tokens_saved', 0),
"requests_rejected": token_stats.get('total_rejected_requests', 0),
"cost_optimization_enabled": True
}
}
def reset_optimization_stats(self):
"""Reinicia las estadísticas de optimización"""
self.token_limiter = get_token_limiter()
self.cache_manager = get_prompt_cache_manager()
self.cache_manager.clear_cache()