import requests import json import logging from typing import Dict, Any, Tuple from .token_limiter import get_token_limiter from .prompt_cache import get_prompt_cache_manager # Configure logging logger = logging.getLogger(__name__) class OpenRouterProxy: """Proxy for OpenRouter API requests with cost optimization""" def __init__(self): self.base_url = "https://openrouter.ai/api/v1" self.timeout = 30 self.token_limiter = get_token_limiter() self.cache_manager = get_prompt_cache_manager() def forward_request(self, data: Dict[Any, Any], api_key: str, is_special_token: bool = False) -> requests.Response: """Forward chat completion request to OpenRouter with cost optimization""" model = data.get('model', '') if not is_special_token: # Apply normal limitations for regular tokens # 1. APLICAR LÍMITE DE TOKENS OBLIGATORIO is_valid, limited_data, limit_reason = self.token_limiter.validate_and_limit_request(data, model) if not is_valid: # Crear respuesta de error por límite de tokens error_response = self._create_error_response( 400, "token_limit_exceeded", limit_reason ) return error_response # 2. APLICAR OPTIMIZACIONES DE COSTO optimized_data = self.token_limiter.add_cost_optimization_to_request(limited_data, model) # 3. PROCESAR CON CACHE OBLIGATORIO cache_hit, cached_response, final_data, cache_key = self.cache_manager.process_request_with_cache( optimized_data, model ) if cache_hit and cached_response: # Retornar respuesta desde cache logger.info(f"✓ OpenRouter Cache HIT for model {model} (key: {cache_key[:16]}...)") return self._create_cached_response(cached_response) else: # Special token - bypass all limitations final_data = data.copy() cache_key = "special_token_bypass" # 4. CLEAN STOP SEQUENCES TO PREVENT API ERRORS final_data = self._clean_stop_sequences(final_data) # 5. PREPARAR HEADERS OPTIMIZADOS headers = self._prepare_optimized_headers(api_key, cache_key) # 6. ENVIAR REQUEST A OPENROUTER url = f"{self.base_url}/chat/completions" try: if final_data.get('stream', False): # Handle streaming requests response = requests.post( url, json=final_data, headers=headers, timeout=self.timeout, stream=True ) else: # Handle non-streaming requests response = requests.post( url, json=final_data, headers=headers, timeout=self.timeout ) # 6. GUARDAR EN CACHE SI LA RESPUESTA ES EXITOSA (only for regular tokens) if not is_special_token and response.status_code == 200 and not final_data.get('stream', False): try: response_json = response.json() self.cache_manager.store_in_cache(cache_key, response_json, model) logger.info(f"✓ OpenRouter Cache STORED for model {model} (key: {cache_key[:16]}...)") except Exception as e: logger.warning(f"Failed to store OpenRouter response in cache: {e}") return response except requests.exceptions.RequestException as e: # Create a mock response for connection errors return self._create_error_response( 500, "connection_error", f"Failed to connect to OpenRouter: {str(e)}", {"provider": "openrouter", "model": model} ) def _clean_stop_sequences(self, data: Dict[Any, Any]) -> Dict[Any, Any]: """Clean stop sequences to avoid 'stop_sequences must contain non-whitespace' errors""" cleaned_data = data.copy() if 'stop' in cleaned_data and isinstance(cleaned_data['stop'], list): # Filter valid stop sequences (non-empty and containing non-whitespace characters) valid_stops = [] for stop in cleaned_data['stop']: if stop and isinstance(stop, str) and stop.strip(): valid_stops.append(stop) if valid_stops: cleaned_data['stop'] = valid_stops else: # If no valid stop sequences remain, remove the field entirely del cleaned_data['stop'] logger.info("Removed empty stop sequences to prevent API error") return cleaned_data def _prepare_optimized_headers(self, api_key: str, cache_key: str) -> Dict[str, str]: """Prepara headers optimizados para OpenRouter con configuración de cache""" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/openrouter-proxy", "X-Title": "Cost-Optimized OpenRouter Proxy" } # Agregar headers específicos de cache si están disponibles cache_headers = { "X-OpenRouter-Cache": "true", "X-OpenRouter-Cache-Key": cache_key[:16], "X-OpenRouter-Cost-Optimization": "enabled" } headers.update(cache_headers) return headers def _create_error_response(self, status_code: int, error_type: str, message: str, context: Dict = None) -> requests.Response: """Crea una respuesta de error mock con contexto adicional""" mock_response = requests.Response() mock_response.status_code = status_code error_data = { "error": { "message": message, "type": error_type, "code": status_code } } # Add context information if provided if context: error_data["error"].update(context) mock_response._content = json.dumps(error_data).encode('utf-8') mock_response.headers['Content-Type'] = 'application/json' return mock_response def _create_cached_response(self, cached_data: Dict) -> requests.Response: """Crea una respuesta mock desde datos del cache""" mock_response = requests.Response() mock_response.status_code = 200 # Agregar metadata de cache a la respuesta enhanced_response = cached_data.copy() enhanced_response['x_cache_info'] = { "cache_hit": True, "source": "local_cache", "cost_savings": True } mock_response._content = json.dumps(enhanced_response).encode('utf-8') mock_response.headers['Content-Type'] = 'application/json' mock_response.headers['X-Cache-Status'] = 'HIT' mock_response.headers['X-Cost-Optimized'] = 'true' return mock_response def get_models(self, api_key: str) -> requests.Response: """Get available models from OpenRouter""" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } url = f"{self.base_url}/models" try: response = requests.get(url, headers=headers, timeout=self.timeout) return response except requests.exceptions.RequestException as e: return self._create_error_response( 500, "connection_error", f"Failed to connect to OpenRouter: {str(e)}" ) def get_cost_optimization_stats(self) -> Dict[str, Any]: """Obtiene estadísticas de optimización de costos""" token_stats = self.token_limiter.get_statistics() cache_stats = self.cache_manager.get_cache_statistics() # Calcular ahorros combinados total_requests = cache_stats.get('cache_hits', 0) + cache_stats.get('cache_misses', 0) return { "token_optimization": token_stats, "cache_optimization": cache_stats, "combined_metrics": { "total_requests_processed": total_requests, "cache_hit_ratio": cache_stats.get('hit_ratio', 0), "tokens_saved": token_stats.get('total_tokens_saved', 0), "requests_rejected": token_stats.get('total_rejected_requests', 0), "cost_optimization_enabled": True } } def reset_optimization_stats(self): """Reinicia las estadísticas de optimización""" self.token_limiter = get_token_limiter() self.cache_manager = get_prompt_cache_manager() self.cache_manager.clear_cache()