Spaces:
Paused
Paused
| """ | |
| Prompt Cache Manager - Sistema de cache obligatorio para ahorrar costos en OpenRouter | |
| """ | |
| import hashlib | |
| import json | |
| import time | |
| from typing import Dict, Any, Optional, List, Tuple | |
| from datetime import datetime, timedelta | |
| class PromptCacheManager: | |
| """Manager de cache de prompts para reducir costos de API""" | |
| def __init__(self): | |
| self.cache = {} # {hash: {data, timestamp, hit_count, model}} | |
| self.cache_hits = 0 | |
| self.cache_misses = 0 | |
| self.total_savings = 0 | |
| # Configuración de cache | |
| self.cache_ttl = 3600 # 1 hora de TTL por defecto | |
| self.max_cache_entries = 1000 | |
| self.min_prompt_length = 50 # Mínimo de caracteres para hacer cache | |
| # Patrones de prompts que siempre deben usar cache | |
| self.always_cache_patterns = [ | |
| "system", | |
| "You are", | |
| "Please analyze", | |
| "Explain the following", | |
| "Generate a", | |
| "Create a", | |
| "Write a" | |
| ] | |
| def generate_cache_key(self, messages: List[Dict], model: str, params: Dict) -> str: | |
| """Genera una clave única para el cache basada en el prompt y parámetros""" | |
| # Extraer solo el contenido relevante para el cache | |
| cache_content = { | |
| "messages": self._normalize_messages(messages), | |
| "model": model, | |
| # Solo incluir parámetros que afectan la respuesta | |
| "temperature": params.get("temperature", 0.7), | |
| "top_p": params.get("top_p", 1.0), | |
| "max_tokens": params.get("max_tokens", 4000) | |
| } | |
| # Crear hash SHA-256 | |
| content_str = json.dumps(cache_content, sort_keys=True) | |
| return hashlib.sha256(content_str.encode()).hexdigest() | |
| def _normalize_messages(self, messages: List[Dict]) -> List[Dict]: | |
| """Normaliza los mensajes para el cache, removiendo variaciones menores""" | |
| normalized = [] | |
| for msg in messages: | |
| normalized_msg = { | |
| "role": msg.get("role", "user"), | |
| "content": msg.get("content", "").strip() | |
| } | |
| normalized.append(normalized_msg) | |
| return normalized | |
| def should_use_cache(self, messages: List[Dict], model: str) -> bool: | |
| """Determina si el prompt debe usar cache obligatoriamente""" | |
| # Cache obligatorio para ciertos patrones | |
| for msg in messages: | |
| content = msg.get("content", "") | |
| # Si el contenido es suficientemente largo | |
| if len(content) >= self.min_prompt_length: | |
| return True | |
| # Si contiene patrones que siempre deben usar cache | |
| for pattern in self.always_cache_patterns: | |
| if pattern.lower() in content.lower(): | |
| return True | |
| return True # Cache obligatorio para todo | |
| def get_from_cache(self, cache_key: str) -> Optional[Dict]: | |
| """Obtiene una respuesta del cache si existe y no ha expirado""" | |
| if cache_key not in self.cache: | |
| return None | |
| cached_item = self.cache[cache_key] | |
| # Verificar TTL | |
| if time.time() - cached_item["timestamp"] > self.cache_ttl: | |
| del self.cache[cache_key] | |
| return None | |
| # Incrementar hit count | |
| cached_item["hit_count"] += 1 | |
| self.cache_hits += 1 | |
| return cached_item["data"] | |
| def store_in_cache(self, cache_key: str, response_data: Dict, model: str): | |
| """Almacena una respuesta en el cache""" | |
| # Limpiar cache si está lleno | |
| if len(self.cache) >= self.max_cache_entries: | |
| self._cleanup_cache() | |
| self.cache[cache_key] = { | |
| "data": response_data, | |
| "timestamp": time.time(), | |
| "hit_count": 0, | |
| "model": model, | |
| "created_at": datetime.utcnow().isoformat() | |
| } | |
| self.cache_misses += 1 | |
| def _cleanup_cache(self): | |
| """Limpia el cache eliminando entradas antiguas y menos usadas""" | |
| # Ordenar por timestamp y hit_count | |
| sorted_items = sorted( | |
| self.cache.items(), | |
| key=lambda x: (x[1]["hit_count"], x[1]["timestamp"]) | |
| ) | |
| # Mantener solo el 70% de las entradas más relevantes | |
| keep_count = int(self.max_cache_entries * 0.7) | |
| items_to_keep = sorted_items[-keep_count:] | |
| self.cache = dict(items_to_keep) | |
| def add_openrouter_cache_headers(self, data: Dict[Any, Any], cache_key: str) -> Dict[Any, Any]: | |
| """Agrega headers específicos de OpenRouter para prompt caching""" | |
| modified_data = data.copy() | |
| # Headers específicos de OpenRouter para caching | |
| if "extra_headers" not in modified_data: | |
| modified_data["extra_headers"] = {} | |
| # Configuración de cache de OpenRouter | |
| modified_data["extra_headers"].update({ | |
| "OpenRouter-Cache": "true", | |
| "OpenRouter-Cache-TTL": str(self.cache_ttl), | |
| "OpenRouter-Cache-Key": cache_key[:16], # Solo primeros 16 chars | |
| }) | |
| # Configurar transformers para caching automático si está disponible | |
| if "transforms" not in modified_data: | |
| modified_data["transforms"] = [] | |
| # Agregar transform de cache si no está presente | |
| cache_transform = "openrouter:cache" | |
| if cache_transform not in modified_data["transforms"]: | |
| modified_data["transforms"].append(cache_transform) | |
| return modified_data | |
| def process_request_with_cache(self, data: Dict[Any, Any], model: str) -> Tuple[bool, Optional[Dict], Dict[Any, Any], str]: | |
| """ | |
| Procesa una request con cache obligatorio | |
| Returns: | |
| (cache_hit, cached_response, modified_request_data, cache_key) | |
| """ | |
| messages = data.get("messages", []) | |
| # Generar clave de cache | |
| cache_key = self.generate_cache_key(messages, model, data) | |
| # Verificar cache obligatorio | |
| if self.should_use_cache(messages, model): | |
| # Buscar en cache | |
| cached_response = self.get_from_cache(cache_key) | |
| if cached_response: | |
| return True, cached_response, data, cache_key | |
| # No hay cache hit, modificar request para incluir headers de cache | |
| modified_data = self.add_openrouter_cache_headers(data, cache_key) | |
| return False, None, modified_data, cache_key | |
| def estimate_cache_savings(self, model: str) -> Dict[str, Any]: | |
| """Estima el ahorro por uso de cache""" | |
| from models.model_costs import MODEL_COSTS | |
| total_requests = self.cache_hits + self.cache_misses | |
| if total_requests == 0: | |
| return {"cache_hit_ratio": 0, "estimated_savings": 0} | |
| cache_hit_ratio = self.cache_hits / total_requests | |
| model_cost = MODEL_COSTS.get(model, 1) | |
| # Estimación: cada cache hit ahorra el costo completo de la request | |
| estimated_savings = self.cache_hits * model_cost | |
| return { | |
| "cache_hit_ratio": cache_hit_ratio, | |
| "cache_hits": self.cache_hits, | |
| "cache_misses": self.cache_misses, | |
| "total_requests": total_requests, | |
| "estimated_savings": estimated_savings, | |
| "model_cost_factor": model_cost | |
| } | |
| def get_cache_statistics(self) -> Dict[str, Any]: | |
| """Obtiene estadísticas completas del cache""" | |
| # Estadísticas por modelo | |
| model_stats = {} | |
| for cache_key, cached_item in self.cache.items(): | |
| model = cached_item["model"] | |
| if model not in model_stats: | |
| model_stats[model] = {"entries": 0, "total_hits": 0} | |
| model_stats[model]["entries"] += 1 | |
| model_stats[model]["total_hits"] += cached_item["hit_count"] | |
| return { | |
| "total_cache_entries": len(self.cache), | |
| "cache_hits": self.cache_hits, | |
| "cache_misses": self.cache_misses, | |
| "hit_ratio": self.cache_hits / (self.cache_hits + self.cache_misses) if (self.cache_hits + self.cache_misses) > 0 else 0, | |
| "model_statistics": model_stats, | |
| "cache_config": { | |
| "ttl_seconds": self.cache_ttl, | |
| "max_entries": self.max_cache_entries, | |
| "min_prompt_length": self.min_prompt_length | |
| } | |
| } | |
| def clear_cache(self): | |
| """Limpia todo el cache""" | |
| self.cache.clear() | |
| self.cache_hits = 0 | |
| self.cache_misses = 0 | |
| # Instancia global del cache manager | |
| _cache_manager = None | |
| _cache_lock = None | |
| def get_prompt_cache_manager() -> PromptCacheManager: | |
| """Obtiene la instancia global del cache manager (thread-safe singleton)""" | |
| global _cache_manager, _cache_lock | |
| if _cache_manager is None: | |
| # Initialize lock if not exists | |
| if _cache_lock is None: | |
| import threading | |
| _cache_lock = threading.Lock() | |
| # Double-checked locking pattern | |
| with _cache_lock: | |
| if _cache_manager is None: | |
| _cache_manager = PromptCacheManager() | |
| print("✓ PromptCacheManager singleton initialized") | |
| return _cache_manager | |