movidik / utils /prompt_cache.py
justadri23's picture
Upload 25 files
f8d7ba5 verified
"""
Prompt Cache Manager - Sistema de cache obligatorio para ahorrar costos en OpenRouter
"""
import hashlib
import json
import time
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
class PromptCacheManager:
"""Manager de cache de prompts para reducir costos de API"""
def __init__(self):
self.cache = {} # {hash: {data, timestamp, hit_count, model}}
self.cache_hits = 0
self.cache_misses = 0
self.total_savings = 0
# Configuración de cache
self.cache_ttl = 3600 # 1 hora de TTL por defecto
self.max_cache_entries = 1000
self.min_prompt_length = 50 # Mínimo de caracteres para hacer cache
# Patrones de prompts que siempre deben usar cache
self.always_cache_patterns = [
"system",
"You are",
"Please analyze",
"Explain the following",
"Generate a",
"Create a",
"Write a"
]
def generate_cache_key(self, messages: List[Dict], model: str, params: Dict) -> str:
"""Genera una clave única para el cache basada en el prompt y parámetros"""
# Extraer solo el contenido relevante para el cache
cache_content = {
"messages": self._normalize_messages(messages),
"model": model,
# Solo incluir parámetros que afectan la respuesta
"temperature": params.get("temperature", 0.7),
"top_p": params.get("top_p", 1.0),
"max_tokens": params.get("max_tokens", 4000)
}
# Crear hash SHA-256
content_str = json.dumps(cache_content, sort_keys=True)
return hashlib.sha256(content_str.encode()).hexdigest()
def _normalize_messages(self, messages: List[Dict]) -> List[Dict]:
"""Normaliza los mensajes para el cache, removiendo variaciones menores"""
normalized = []
for msg in messages:
normalized_msg = {
"role": msg.get("role", "user"),
"content": msg.get("content", "").strip()
}
normalized.append(normalized_msg)
return normalized
def should_use_cache(self, messages: List[Dict], model: str) -> bool:
"""Determina si el prompt debe usar cache obligatoriamente"""
# Cache obligatorio para ciertos patrones
for msg in messages:
content = msg.get("content", "")
# Si el contenido es suficientemente largo
if len(content) >= self.min_prompt_length:
return True
# Si contiene patrones que siempre deben usar cache
for pattern in self.always_cache_patterns:
if pattern.lower() in content.lower():
return True
return True # Cache obligatorio para todo
def get_from_cache(self, cache_key: str) -> Optional[Dict]:
"""Obtiene una respuesta del cache si existe y no ha expirado"""
if cache_key not in self.cache:
return None
cached_item = self.cache[cache_key]
# Verificar TTL
if time.time() - cached_item["timestamp"] > self.cache_ttl:
del self.cache[cache_key]
return None
# Incrementar hit count
cached_item["hit_count"] += 1
self.cache_hits += 1
return cached_item["data"]
def store_in_cache(self, cache_key: str, response_data: Dict, model: str):
"""Almacena una respuesta en el cache"""
# Limpiar cache si está lleno
if len(self.cache) >= self.max_cache_entries:
self._cleanup_cache()
self.cache[cache_key] = {
"data": response_data,
"timestamp": time.time(),
"hit_count": 0,
"model": model,
"created_at": datetime.utcnow().isoformat()
}
self.cache_misses += 1
def _cleanup_cache(self):
"""Limpia el cache eliminando entradas antiguas y menos usadas"""
# Ordenar por timestamp y hit_count
sorted_items = sorted(
self.cache.items(),
key=lambda x: (x[1]["hit_count"], x[1]["timestamp"])
)
# Mantener solo el 70% de las entradas más relevantes
keep_count = int(self.max_cache_entries * 0.7)
items_to_keep = sorted_items[-keep_count:]
self.cache = dict(items_to_keep)
def add_openrouter_cache_headers(self, data: Dict[Any, Any], cache_key: str) -> Dict[Any, Any]:
"""Agrega headers específicos de OpenRouter para prompt caching"""
modified_data = data.copy()
# Headers específicos de OpenRouter para caching
if "extra_headers" not in modified_data:
modified_data["extra_headers"] = {}
# Configuración de cache de OpenRouter
modified_data["extra_headers"].update({
"OpenRouter-Cache": "true",
"OpenRouter-Cache-TTL": str(self.cache_ttl),
"OpenRouter-Cache-Key": cache_key[:16], # Solo primeros 16 chars
})
# Configurar transformers para caching automático si está disponible
if "transforms" not in modified_data:
modified_data["transforms"] = []
# Agregar transform de cache si no está presente
cache_transform = "openrouter:cache"
if cache_transform not in modified_data["transforms"]:
modified_data["transforms"].append(cache_transform)
return modified_data
def process_request_with_cache(self, data: Dict[Any, Any], model: str) -> Tuple[bool, Optional[Dict], Dict[Any, Any], str]:
"""
Procesa una request con cache obligatorio
Returns:
(cache_hit, cached_response, modified_request_data, cache_key)
"""
messages = data.get("messages", [])
# Generar clave de cache
cache_key = self.generate_cache_key(messages, model, data)
# Verificar cache obligatorio
if self.should_use_cache(messages, model):
# Buscar en cache
cached_response = self.get_from_cache(cache_key)
if cached_response:
return True, cached_response, data, cache_key
# No hay cache hit, modificar request para incluir headers de cache
modified_data = self.add_openrouter_cache_headers(data, cache_key)
return False, None, modified_data, cache_key
def estimate_cache_savings(self, model: str) -> Dict[str, Any]:
"""Estima el ahorro por uso de cache"""
from models.model_costs import MODEL_COSTS
total_requests = self.cache_hits + self.cache_misses
if total_requests == 0:
return {"cache_hit_ratio": 0, "estimated_savings": 0}
cache_hit_ratio = self.cache_hits / total_requests
model_cost = MODEL_COSTS.get(model, 1)
# Estimación: cada cache hit ahorra el costo completo de la request
estimated_savings = self.cache_hits * model_cost
return {
"cache_hit_ratio": cache_hit_ratio,
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"total_requests": total_requests,
"estimated_savings": estimated_savings,
"model_cost_factor": model_cost
}
def get_cache_statistics(self) -> Dict[str, Any]:
"""Obtiene estadísticas completas del cache"""
# Estadísticas por modelo
model_stats = {}
for cache_key, cached_item in self.cache.items():
model = cached_item["model"]
if model not in model_stats:
model_stats[model] = {"entries": 0, "total_hits": 0}
model_stats[model]["entries"] += 1
model_stats[model]["total_hits"] += cached_item["hit_count"]
return {
"total_cache_entries": len(self.cache),
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_ratio": self.cache_hits / (self.cache_hits + self.cache_misses) if (self.cache_hits + self.cache_misses) > 0 else 0,
"model_statistics": model_stats,
"cache_config": {
"ttl_seconds": self.cache_ttl,
"max_entries": self.max_cache_entries,
"min_prompt_length": self.min_prompt_length
}
}
def clear_cache(self):
"""Limpia todo el cache"""
self.cache.clear()
self.cache_hits = 0
self.cache_misses = 0
# Instancia global del cache manager
_cache_manager = None
_cache_lock = None
def get_prompt_cache_manager() -> PromptCacheManager:
"""Obtiene la instancia global del cache manager (thread-safe singleton)"""
global _cache_manager, _cache_lock
if _cache_manager is None:
# Initialize lock if not exists
if _cache_lock is None:
import threading
_cache_lock = threading.Lock()
# Double-checked locking pattern
with _cache_lock:
if _cache_manager is None:
_cache_manager = PromptCacheManager()
print("✓ PromptCacheManager singleton initialized")
return _cache_manager