# semantic_database.py import sqlite3 import hashlib import json import logging import time from datetime import datetime, timedelta from typing import Optional, Dict, List, Any, Tuple import threading # Configurar logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SemanticDatabase: """ 💾 SISTEMA DE BASE DE DATOS SEMÁNTICA — ¡AHORA EN USO ACTIVO! Gestiona: - Caché de prompts de alta calidad (¡priorizados por el agente!) - Memoria episódica de sesiones - Estadísticas del sistema - Optimización y limpieza automática """ def __init__(self, db_path: str = "semantic_cache.db"): self.db_path = db_path self.lock = threading.Lock() self._initialize_database() logger.info(f"📊 Base de datos semántica inicializada: {db_path}") def _initialize_database(self): """🏗️ Crea las tablas necesarias si no existen""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Tabla de caché de prompts — ¡AHORA USADA ACTIVAMENTE! cursor.execute(""" CREATE TABLE IF NOT EXISTS prompt_cache ( prompt_hash TEXT PRIMARY KEY, original_prompt TEXT NOT NULL, enhanced_prompt TEXT NOT NULL, category TEXT NOT NULL, similarity_score REAL NOT NULL, source_field TEXT NOT NULL, hit_count INTEGER DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Tabla de memoria episódica cursor.execute(""" CREATE TABLE IF NOT EXISTS episodic_memory ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, prompt TEXT NOT NULL, category TEXT NOT NULL, strategy TEXT NOT NULL, similarity_score REAL NOT NULL, processing_time REAL NOT NULL, models_used TEXT NOT NULL, success BOOLEAN NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Tabla de estadísticas diarias cursor.execute(""" CREATE TABLE IF NOT EXISTS daily_stats ( date TEXT PRIMARY KEY, total_searches INTEGER DEFAULT 0, cache_hits INTEGER DEFAULT 0, unique_prompts INTEGER DEFAULT 0, avg_processing_time REAL DEFAULT 0.0, successful_enhancements INTEGER DEFAULT 0, failed_enhancements INTEGER DEFAULT 0 ) """) # Tabla de configuración del sistema cursor.execute(""" CREATE TABLE IF NOT EXISTS system_config ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Índices para optimización — ¡CRÍTICOS PARA PERFORMANCE! cursor.execute("CREATE INDEX IF NOT EXISTS idx_prompt_hash ON prompt_cache(prompt_hash)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_category_score ON prompt_cache(category, similarity_score)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_hit_count ON prompt_cache(hit_count)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_session_id ON episodic_memory(session_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON episodic_memory(timestamp)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_last_accessed ON prompt_cache(last_accessed)") conn.commit() logger.info("✅ Tablas de base de datos inicializadas correctamente") except Exception as e: logger.error(f"❌ Error inicializando base de datos: {e}") raise def _get_prompt_hash(self, prompt: str) -> str: """🔐 Genera hash único para un prompt""" return hashlib.md5(prompt.strip().lower().encode()).hexdigest() def store_cache_result(self, original_prompt: str, enhanced_prompt: str, category: str, similarity_score: float, source_field: str) -> bool: """💾 Almacena resultado en caché — ¡USADO POR EL AGENTE!""" try: prompt_hash = self._get_prompt_hash(original_prompt) with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Verificar si ya existe cursor.execute( "SELECT hit_count FROM prompt_cache WHERE prompt_hash = ?", (prompt_hash,) ) existing = cursor.fetchone() if existing: # Actualizar hit count y última vez accedido cursor.execute(""" UPDATE prompt_cache SET hit_count = hit_count + 1, last_accessed = CURRENT_TIMESTAMP, enhanced_prompt = ?, similarity_score = ? WHERE prompt_hash = ? """, (enhanced_prompt, similarity_score, prompt_hash)) else: # Insertar nuevo registro cursor.execute(""" INSERT INTO prompt_cache (prompt_hash, original_prompt, enhanced_prompt, category, similarity_score, source_field) VALUES (?, ?, ?, ?, ?, ?) """, (prompt_hash, original_prompt, enhanced_prompt, category, similarity_score, source_field)) conn.commit() return True except Exception as e: logger.error(f"❌ Error almacenando en caché: {e}") return False def get_cached_result(self, prompt: str) -> Optional[Dict]: """🔍 Busca resultado en caché""" try: prompt_hash = self._get_prompt_hash(prompt) with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT enhanced_prompt, category, similarity_score, source_field, hit_count, created_at, last_accessed FROM prompt_cache WHERE prompt_hash = ? """, (prompt_hash,)) result = cursor.fetchone() if result: # Actualizar hit_count y last_accessed cursor.execute(""" UPDATE prompt_cache SET hit_count = hit_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE prompt_hash = ? """, (prompt_hash,)) # Volver a leer para obtener valores actualizados cursor.execute(""" SELECT enhanced_prompt, category, similarity_score, source_field, hit_count, created_at, last_accessed FROM prompt_cache WHERE prompt_hash = ? """, (prompt_hash,)) updated_result = cursor.fetchone() conn.commit() if updated_result: return { 'enhanced_prompt': updated_result[0], 'category': updated_result[1], 'similarity_score': updated_result[2], 'source_field': updated_result[3], 'hit_count': updated_result[4], 'created_at': updated_result[5], 'last_accessed': updated_result[6] } return None except Exception as e: logger.error(f"❌ Error buscando en caché: {e}") return None def get_top_cached_prompts(self, prompt: str, category: str, top_k: int = 5) -> List[Dict]: """🌟 OBTIENE LOS MEJORES PROMPTS CURADOS — ¡NUEVO MÉTODO CLAVE! Busca prompts de alta calidad (score > 0.8) en la misma categoría, ordenados por relevancia y uso.""" try: with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT enhanced_prompt, category, similarity_score, source_field, hit_count FROM prompt_cache WHERE category = ? AND similarity_score > 0.8 ORDER BY hit_count DESC, similarity_score DESC LIMIT ? """, (category, top_k)) results = cursor.fetchall() cached_prompts = [] for row in results: cached_prompts.append({ 'enhanced_prompt': row[0], 'category': row[1], 'similarity_score': row[2], 'source_field': row[3], 'hit_count': row[4] }) if cached_prompts: logger.info(f"🧠 Encontrados {len(cached_prompts)} prompts curados de alta calidad para categoría '{category}'") return cached_prompts except Exception as e: logger.error(f"❌ Error obteniendo prompts curados: {e}") return [] def store_episodic_memory(self, session_id: str, prompt: str, category: str, strategy: str, similarity_score: float, processing_time: float, models_used: List[str], success: bool) -> bool: """🧠 Almacena memoria episódica""" try: models_json = json.dumps(models_used) with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO episodic_memory (session_id, prompt, category, strategy, similarity_score, processing_time, models_used, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (session_id, prompt, category, strategy, similarity_score, processing_time, models_json, success)) conn.commit() return True except Exception as e: logger.error(f"❌ Error almacenando memoria episódica: {e}") return False def get_session_history(self, session_id: str, limit: int = 50) -> List[Dict]: """📚 Obtiene historial de una sesión""" try: with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT prompt, category, strategy, similarity_score, processing_time, models_used, success, timestamp FROM episodic_memory WHERE session_id = ? ORDER BY timestamp DESC LIMIT ? """, (session_id, limit)) results = cursor.fetchall() history = [] for row in results: history.append({ 'prompt': row[0], 'category': row[1], 'strategy': row[2], 'similarity_score': row[3], 'processing_time': row[4], 'models_used': json.loads(row[5]), 'success': bool(row[6]), 'timestamp': row[7] }) return history except Exception as e: logger.error(f"❌ Error obteniendo historial: {e}") return [] def update_daily_stats(self, searches: int = 0, cache_hits: int = 0, processing_time: float = 0.0, success: bool = True) -> bool: """📊 Actualiza estadísticas diarias""" try: today = datetime.now().strftime('%Y-%m-%d') with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "SELECT total_searches, cache_hits, avg_processing_time, successful_enhancements, failed_enhancements FROM daily_stats WHERE date = ?", (today,) ) existing = cursor.fetchone() if existing: new_total = existing[0] + searches new_cache_hits = existing[1] + cache_hits new_successful = existing[3] + (1 if success else 0) new_failed = existing[4] + (0 if success else 1) if new_total > 0: new_avg_time = ((existing[2] * existing[0]) + processing_time) / new_total else: new_avg_time = 0.0 cursor.execute(""" UPDATE daily_stats SET total_searches = ?, cache_hits = ?, avg_processing_time = ?, successful_enhancements = ?, failed_enhancements = ? WHERE date = ? """, (new_total, new_cache_hits, new_avg_time, new_successful, new_failed, today)) else: cursor.execute(""" INSERT INTO daily_stats (date, total_searches, cache_hits, avg_processing_time, successful_enhancements, failed_enhancements) VALUES (?, ?, ?, ?, ?, ?) """, (today, searches, cache_hits, processing_time, 1 if success else 0, 0 if success else 1)) conn.commit() return True except Exception as e: logger.error(f"❌ Error actualizando estadísticas: {e}") return False def get_system_stats(self) -> Dict[str, Any]: """📈 Obtiene estadísticas del sistema""" try: with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() stats = {} # Estadísticas de caché cursor.execute("SELECT COUNT(*), AVG(hit_count), AVG(similarity_score) FROM prompt_cache") cache_stats = cursor.fetchone() stats['caché'] = { 'total_entradas': cache_stats[0] or 0, 'promedio_usos': round(cache_stats[1] or 0, 2), 'promedio_similitud': round(cache_stats[2] or 0, 3) } # Estadísticas de memoria episódica cursor.execute(""" SELECT COUNT(*), COUNT(DISTINCT session_id), AVG(similarity_score), AVG(processing_time), SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) FROM episodic_memory """) memory_stats = cursor.fetchone() stats['episódica'] = { 'búsquedas_totales': memory_stats[0] or 0, 'sesiones_únicas': memory_stats[1] or 0, 'promedio_similitud': round(memory_stats[2] or 0, 3), 'tiempo_promedio': round(memory_stats[3] or 0, 2), 'tasa_éxito': round(memory_stats[4] or 0, 1) } # Estadísticas de últimos 7 días cursor.execute(""" SELECT SUM(total_searches), SUM(cache_hits), AVG(avg_processing_time) FROM daily_stats WHERE date >= date('now', '-7 days') """) weekly_stats = cursor.fetchone() stats['semanal'] = { 'búsquedas_totales': weekly_stats[0] or 0, 'aciertos_caché': weekly_stats[1] or 0, 'tiempo_promedio': round(weekly_stats[2] or 0, 2) } # Información de la base de datos cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'") table_count = cursor.fetchone()[0] stats['base_de_datos'] = { 'tablas': table_count, 'tamaño_mb': self._get_db_size_mb(), 'última_limpieza': self._get_config_value('last_cleanup'), 'versión': '2.0' } return stats except Exception as e: logger.error(f"❌ Error obteniendo estadísticas: {e}") return {} def cleanup_cache(self, max_age_days: int = 30, max_entries: int = 1000) -> Tuple[int, int]: """🧹 Limpieza de caché antiguo""" try: with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Eliminar entradas muy antiguas cutoff_date = (datetime.now() - timedelta(days=max_age_days)).isoformat() cursor.execute( "DELETE FROM prompt_cache WHERE last_accessed < ?", (cutoff_date,) ) old_deleted = cursor.rowcount # Eliminar entradas menos usadas si hay demasiadas cursor.execute("SELECT COUNT(*) FROM prompt_cache") total_count = cursor.fetchone()[0] excess_deleted = 0 if total_count > max_entries: excess_count = total_count - max_entries cursor.execute(""" DELETE FROM prompt_cache WHERE prompt_hash IN ( SELECT prompt_hash FROM prompt_cache ORDER BY hit_count ASC, last_accessed ASC LIMIT ? ) """, (excess_count,)) excess_deleted = cursor.rowcount # Limpiar memoria episódica antigua (más de 90 días) old_memory_cutoff = (datetime.now() - timedelta(days=90)).isoformat() cursor.execute( "DELETE FROM episodic_memory WHERE timestamp < ?", (old_memory_cutoff,) ) # Actualizar configuración self._set_config_value('last_cleanup', datetime.now().isoformat()) conn.commit() total_deleted = old_deleted + excess_deleted logger.info(f"🧹 Limpieza completada: {total_deleted} entradas eliminadas") return old_deleted, excess_deleted except Exception as e: logger.error(f"❌ Error en limpieza: {e}") return 0, 0 def optimize_databases(self) -> bool: """⚡ Optimiza la base de datos""" try: with self.lock: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("VACUUM") cursor.execute("ANALYZE") conn.commit() logger.info("⚡ Base de datos optimizada") return True except Exception as e: logger.error(f"❌ Error optimizando base de datos: {e}") return False def _get_db_size_mb(self) -> float: """📏 Obtiene tamaño de la base de datos en MB""" try: import os size_bytes = os.path.getsize(self.db_path) return round(size_bytes / (1024 * 1024), 2) except: return 0.0 def _get_config_value(self, key: str) -> Optional[str]: """⚙️ Obtiene valor de configuración""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT value FROM system_config WHERE key = ?", (key,)) result = cursor.fetchone() return result[0] if result else None except: return None def _set_config_value(self, key: str, value: str) -> bool: """⚙️ Establece valor de configuración""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO system_config (key, value) VALUES (?, ?) """, (key, value)) conn.commit() return True except: return False def get_cache_size(self) -> int: """📊 Obtiene número de entradas en caché""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM prompt_cache") return cursor.fetchone()[0] except: return 0 def close(self): """🔒 Cierra conexiones (cleanup)""" logger.info("📊 Base de datos cerrada correctamente")