Spaces:
Running
Running
| """ | |
| Módulo principal del sistema RAG (Retrieval-Augmented Generation) | |
| """ | |
| import logging | |
| from typing import Tuple, Dict, Any, List | |
| import json | |
| import os | |
| import random | |
| import re | |
| from config.settings import settings | |
| from .embeddings import EmbeddingModel | |
| from .retriever import VectorStoreFAISS | |
| from .generator import TinyLlamaGenerator | |
| from .gemma_generator import GemmaGenerator | |
| from .optimized_retriever import OptimizedRetriever | |
| try: | |
| from evaluation.performance_logger import log_latency | |
| _LOGGING_ENABLED = True | |
| except ImportError: | |
| _LOGGING_ENABLED = False | |
| def log_latency(*args, **kwargs): | |
| pass | |
| logger = logging.getLogger(__name__) | |
| class RAGSystem: | |
| def __init__(self): | |
| self.embedder = EmbeddingModel() | |
| self.vector_store = VectorStoreFAISS() | |
| self.generator = GemmaGenerator() | |
| self.optimized_retriever = OptimizedRetriever(self.vector_store) | |
| self.intents_loaded = False | |
| self.top_k = settings.TOP_K_RESULTS | |
| self.similarity_threshold = settings.SIMILARITY_THRESHOLD | |
| logger.info("RAG System initialized con OptimizedRetriever") | |
| def load_intents(self, intents_file: str = "data/vector_store/intents.json"): | |
| """Carga intents al sistema""" | |
| try: | |
| self.vector_store.store_intents(intents_file) | |
| self.intents_loaded = True | |
| logger.info("Intents loaded into FAISS vector store") | |
| except Exception as e: | |
| logger.error(f"Error loading intents: {e}") | |
| # Crear archivo básico si no existe | |
| if not os.path.exists(intents_file): | |
| basic_intents = { | |
| "intents": [ | |
| { | |
| "tag": "saludo", | |
| "patterns": ["hola", "buenos días", "buenas tardes"], | |
| "responses": ["¡Hola! ¿En qué puedo ayudarte?"], | |
| "context": "welcome" | |
| } | |
| ] | |
| } | |
| os.makedirs(os.path.dirname(intents_file), exist_ok=True) | |
| with open(intents_file, 'w', encoding='utf-8') as f: | |
| json.dump(basic_intents, f, ensure_ascii=False, indent=2) | |
| logger.info("Created basic intents file") | |
| def _clean_query(self, query: str) -> str: | |
| """ | |
| Limpiar consulta para mejor matching con intents. | |
| """ | |
| # Quitar signos de puntuación y convertir a minúsculas | |
| clean = re.sub(r'[^\w\sáéíóúüñÁÉÍÓÚÜÑ¿?]', '', query.lower()) | |
| return clean.strip() | |
| def _classify_query_type(self, query: str) -> str: | |
| """ | |
| Clasificar el tipo de consulta para decidir prioridad. | |
| """ | |
| query_lower = query.lower() | |
| # Palabras clave para intents (saludos, despedidas, etc.) | |
| intent_keywords = { | |
| 'saludo': ['hola', 'buen día', 'buenas', 'saludos', 'qué tal', 'cómo estás'], | |
| 'despedida': ['adiós', 'hasta luego', 'chao', 'bye', 'nos vemos'], | |
| 'gracias': ['gracias', 'agradecido', 'agradezco'], | |
| 'ayuda_general': ['ayuda', 'ayúdame', 'asistencia', 'soporte'] | |
| } | |
| # Verificar si es un intent básico | |
| for intent_type, keywords in intent_keywords.items(): | |
| if any(keyword in query_lower for keyword in keywords): | |
| return intent_type | |
| # Preguntas técnicas/complejas van a RAG | |
| question_words = ['cómo', 'dónde', 'cuándo', 'qué', 'por qué', 'cuál', 'cuánto'] | |
| if any(query_lower.startswith(word) for word in question_words): | |
| return 'rag_preferido' | |
| return 'neutral' | |
| def _should_use_intent(self, query: str, intent_results: Dict, intent_priority: str) -> bool: | |
| """ | |
| Decidir si usar intent basado en múltiples criterios. | |
| """ | |
| # 1. Si es saludo/despedida, SIEMPRE usar intent | |
| if intent_priority in ['saludo', 'despedida', 'gracias']: | |
| return True | |
| # 2. Si no hay resultados de intent, usar RAG | |
| if not intent_results.get('metadatas') or not intent_results['metadatas'][0]: | |
| return False | |
| # 3. Verificar calidad del match | |
| best_intent = intent_results['metadatas'][0][0] if intent_results['metadatas'][0] else None | |
| best_distance = intent_results['distances'][0][0] if intent_results['distances'][0] else 1.0 | |
| # 4. Reglas de decisión | |
| if intent_priority == 'rag_preferido': | |
| # Preguntas técnicas: solo usar intent si es MUY bueno | |
| return best_distance < 0.3 # Match muy cercano | |
| # Caso general: balancear longitud y calidad | |
| if len(query) < 100: # No demasiado larga | |
| # Distancia baja = buen match | |
| if best_distance < 0.5: # Ajusta según necesidad | |
| return True | |
| return False | |
| def _format_intent_response(self, intent_results: Dict) -> Tuple[str, bool, float, list]: | |
| """ | |
| Formatear respuesta de intent para compatibilidad. | |
| """ | |
| if not intent_results.get('metadatas') or not intent_results['metadatas'][0]: | |
| return ("", False, 0.0, []) | |
| best_intent = intent_results['metadatas'][0][0] | |
| best_distance = intent_results['distances'][0][0] | |
| # Convertir distancia a confianza | |
| confidence = max(0.0, 1.0 - best_distance) | |
| # Seleccionar respuesta aleatoria del intent | |
| responses = best_intent.get('responses', ['Lo siento, no tengo una respuesta preparada.']) | |
| response = random.choice(responses) | |
| return (response, False, confidence, []) | |
| def _rag_process(self, query: str) -> Tuple[str, bool, float, list]: | |
| """ | |
| Procesar consulta usando RAG con OptimizedRetriever. | |
| """ | |
| import time | |
| try: | |
| # 1. Generar embedding de la consulta | |
| query_embedding = self.embedder.embed_text(query) | |
| # 2. Usar OptimizedRetriever para búsqueda avanzada | |
| results = self.optimized_retriever.retrieve( | |
| query, | |
| query_embedding, | |
| top_k=settings.TOP_K_RESULTS | |
| ) | |
| # 3. Verificar si hay resultados relevantes | |
| if not results: | |
| return "No encontré información específica sobre eso en los materiales de Prepa en Línea SEP.", False, 0.0, [] | |
| # 4. Extraer contextos y metadatos | |
| contexts = [r.get("content", r.get("text", "")) for r in results] | |
| metadatas = [r.get("metadata", {}) for r in results] | |
| context_str = " ".join(contexts) | |
| if not context_str.strip(): | |
| return "No encontré información específica sobre eso en los materiales de Prepa en Línea SEP.", False, 0.0, [] | |
| # Variables para métricas | |
| generation_start = time.time() | |
| tokens_generated = 0 | |
| # 5. Generar respuesta RAG con Gemma (con callback para métricas) | |
| def on_tokens(tokens: int, elapsed: float): | |
| nonlocal tokens_generated | |
| tokens_generated = tokens | |
| logger.info(f"🔄 Calling generator.generate() with query length: {len(query)}, context length: {len(context_str)}") | |
| try: | |
| response = self.generator.generate(query, context_str, on_tokens_generated=on_tokens) | |
| logger.info(f"✅ Response received: {response[:100]}...") | |
| except Exception as e: | |
| logger.error(f"❌ Error in generator.generate(): {e}", exc_info=True) | |
| raise | |
| generation_time = (time.time() - generation_start) * 1000 | |
| # Log de latencia | |
| if _LOGGING_ENABLED and tokens_generated > 0: | |
| log_latency( | |
| retrieval_time_ms=0, | |
| generation_time_ms=generation_time, | |
| total_time_ms=generation_time, | |
| tokens_generated=tokens_generated, | |
| question=query | |
| ) | |
| # 6. Preparar fuentes para mostrar | |
| sources = [] | |
| for i, (context, metadata) in enumerate(zip(contexts, metadatas)): | |
| if i < 3 and context: | |
| source_info = { | |
| "content": context, | |
| "metadata": metadata | |
| } | |
| sources.append(source_info) | |
| # 7. Calcular confianza basada en reranked_score | |
| confidence = 0.0 | |
| if results and results[0].get("reranked_score"): | |
| confidence = results[0]["reranked_score"] | |
| elif results and results[0].get("similarity"): | |
| confidence = results[0]["similarity"] | |
| return response, True, confidence, sources | |
| except Exception as e: | |
| logger.error(f"Error en RAG process: {e}") | |
| return "Lo siento, tuve un problema procesando tu pregunta. ¿Podrías intentarlo de nuevo?", False, 0.0, [] | |
| def process_query(self, query: str) -> Tuple[str, bool, float, list]: | |
| """ | |
| Procesa una consulta y retorna respuesta y metadata | |
| Returns: | |
| Tuple[str, bool, float, list]: (respuesta, es_rag, confianza, fuentes) | |
| """ | |
| query = query.strip() | |
| # Solo usar intents para despedidas EXPLÍCITAS | |
| if self.intents_loaded: | |
| despedidas = ['adiós', 'adios', 'bye', 'chao', 'nos vemos', 'hasta luego', 'me voy', 'me retiro'] | |
| if any(palabra in query.lower() for palabra in despedidas): | |
| logger.info(f"Detectada despedida, buscando intent: '{query[:50]}...'") | |
| intent_results = self.vector_store.search_intents( | |
| query_text=query.lower(), | |
| top_k=1 | |
| ) | |
| if intent_results.get('metadatas') and intent_results['metadatas'][0]: | |
| return self._format_intent_response(intent_results) | |
| # Para TODO lo demás (incluyendo saludos), usar RAG con OptimizedRetriever | |
| logger.info(f"Usando RAG + OptimizedRetriever para: '{query[:50]}...'") | |
| return self._rag_process(query) | |
| def add_document(self, content: str, metadata: Dict[str, Any] = None): | |
| """Añade un documento al sistema""" | |
| if metadata is None: | |
| metadata = {} | |
| try: | |
| # Generar embedding | |
| embedding = self.embedder.embed_text(content) | |
| # Añadir al vector store | |
| self.vector_store.add_document(content, metadata, embedding) | |
| logger.info(f"Document added: {metadata.get('title', 'No title')}") | |
| except Exception as e: | |
| logger.error(f"Error adding document: {e}") | |
| def add_documents_batch(self, documents: List[Dict[str, Any]]): | |
| """Añade múltiples documentos en lote""" | |
| if not documents: | |
| return | |
| try: | |
| # Extraer textos | |
| texts = [doc['content'] for doc in documents] | |
| # Generar embeddings en batch (usar prefijo de passage para indexar) | |
| embeddings = self.embedder.embed_batch(texts, is_passage=True) | |
| # Añadir al vector store | |
| self.vector_store.add_documents(documents, embeddings) | |
| logger.info(f"Added {len(documents)} documents in batch") | |
| except Exception as e: | |
| logger.error(f"Error adding documents batch: {e}") | |
| def get_stats(self): | |
| """Obtener estadísticas del sistema""" | |
| try: | |
| return { | |
| "vector_store": self.vector_store.get_stats(), | |
| "embedding_model": self.embedder.model_name, | |
| "intents_loaded": self.intents_loaded | |
| } | |
| except: | |
| return {"status": "unknown"} | |
| def _simple_extract_response(self, query: str, context: str) -> str: | |
| """Fallback: Extraer respuesta directamente del contexto sin TinyLlama""" | |
| import re | |
| # Limpiar el contexto | |
| lines = context.split('\n') | |
| clean_lines = [] | |
| for line in lines: | |
| if re.match(r'^\[.*?\]$', line): | |
| continue | |
| if re.match(r'^#{2,}', line): | |
| continue | |
| if re.match(r'^📄', line): | |
| continue | |
| if re.match(r'^Fila:', line): | |
| continue | |
| if re.match(r'^Hoja:', line): | |
| continue | |
| if line.strip() and len(line.strip()) > 10: | |
| clean_lines.append(line.strip()) | |
| context_clean = ' '.join(clean_lines) | |
| # Extraer oraciones que contengan palabras clave de la pregunta | |
| query_words = set(query.lower().split()) | |
| sentences = context_clean.split('. ') | |
| relevant = [] | |
| for sent in sentences: | |
| sent_lower = sent.lower() | |
| # Buscar coincidencia de palabras | |
| matches = sum(1 for w in query_words if w in sent_lower and len(w) > 3) | |
| if matches > 0: | |
| relevant.append(sent.strip()) | |
| if relevant: | |
| return '. '.join(relevant[:3]) + '.' | |
| # Si no hay coincidencias, devolver el inicio del contexto | |
| return context_clean[:300] + '...' if len(context_clean) > 300 else context_clean |