Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| SISTEMA DE CARGA DE CHUNKS RAG DESDE JSONL | |
| Carga los chunks generados al vector store FAISS (sin generador) | |
| """ | |
| import os | |
| import sys | |
| import json | |
| from typing import List, Dict, Any | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Configurar path | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| project_root = os.path.dirname(current_dir) | |
| sys.path.insert(0, project_root) | |
| print(f"📂 Directorio raíz: {project_root}") | |
| from rag.embeddings import EmbeddingModel | |
| from rag.retriever import VectorStoreFAISS | |
| import logging | |
| import numpy as np | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ChunksRAGLoader: | |
| """Cargador de chunks JSONL al vector store FAISS""" | |
| def __init__(self): | |
| print("🔄 Inicializando embeddings...") | |
| self.embeddings = EmbeddingModel() | |
| self.vector_store = VectorStoreFAISS() | |
| print(f" ✅ VectorStoreFAISS inicializado") | |
| self.stats = { | |
| "total_chunks": 0, | |
| "by_type": {}, | |
| "by_source": {}, | |
| "loaded_at": None | |
| } | |
| def _generate_embeddings_batch(self, texts: List[str]) -> np.ndarray: | |
| """Genera embeddings para una lista de textos""" | |
| # Ya se aplicó prefijo "passage: " antes de llamar | |
| return self.embeddings.embed_batch(texts, is_passage=True) | |
| def load_chunks_file(self, chunks_path: str) -> Dict: | |
| """Carga archivo JSONL de chunks al vector store""" | |
| print("=" * 60) | |
| print("📊 CARGA DE CHUNKS AL VECTOR STORE") | |
| print("=" * 60) | |
| if not os.path.exists(chunks_path): | |
| print(f"❌ ERROR: El archivo {chunks_path} no existe") | |
| return {"error": "Archivo no encontrado"} | |
| try: | |
| print(f"\n📁 Cargando chunks desde: {chunks_path}") | |
| # Primero, leer todos los chunks | |
| chunks_list = [] | |
| with open(chunks_path, 'r', encoding='utf-8') as f: | |
| for line_num, line in enumerate(f, 1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| chunk = json.loads(line) | |
| chunks_list.append(chunk) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Error decodificando línea {line_num}: {e}") | |
| print(f" 📄 Total chunks encontrados: {len(chunks_list)}") | |
| if not chunks_list: | |
| return {"error": "No se encontraron chunks válidos"} | |
| # Preparar documentos y embeddings | |
| documents = [] | |
| embeddings_list = [] | |
| texts_for_embedding = [] | |
| for chunk in chunks_list: | |
| text = chunk.get('text', '') | |
| if not text: | |
| continue | |
| doc_type = chunk.get('doc_type', 'unknown') | |
| source_file = chunk.get('source_file', 'unknown') | |
| metadata = chunk.get('metadata', {}) | |
| chunk_id = chunk.get('chunk_id', 'unknown') | |
| # Crear contenido enriquecido | |
| content = self._create_rag_content(chunk) | |
| texts_for_embedding.append(content) | |
| # Consolidar TODOS los metadatos del chunk original | |
| all_metadata = { | |
| **metadata, | |
| 'chunk_id': chunk_id, | |
| 'doc_type': doc_type, | |
| 'source_file': source_file, | |
| 'page_range': chunk.get('page_range', '1-1'), | |
| 'imported_at': datetime.now().isoformat() | |
| } | |
| all_metadata.update({ | |
| 'chunk_type': chunk.get('chunk_type', self._infer_chunk_type(doc_type, metadata)), | |
| 'importance': chunk.get('importance', 0.5), | |
| 'severity': chunk.get('severity', ''), | |
| 'action_type': chunk.get('action_type', ''), | |
| 'has_dates': chunk.get('has_dates', self._contains_dates(content)), | |
| 'title': metadata.get('title', ''), | |
| 'section': metadata.get('section', ''), | |
| 'term': metadata.get('term', ''), | |
| 'article': metadata.get('article', ''), | |
| 'chapter': metadata.get('chapter', ''), | |
| 'step_number': metadata.get('step_number', ''), | |
| 'principle_number': metadata.get('principle_number', ''), | |
| 'conduct_name': metadata.get('conduct_name', ''), | |
| 'rule_title': metadata.get('rule_title', ''), | |
| 'section_number': metadata.get('section_number', '') | |
| }) | |
| doc = { | |
| 'content': content, | |
| 'metadata': all_metadata | |
| } | |
| documents.append(doc) | |
| # Actualizar estadísticas | |
| self.stats["by_type"][doc_type] = self.stats["by_type"].get(doc_type, 0) + 1 | |
| self.stats["by_source"][source_file] = self.stats["by_source"].get(source_file, 0) + 1 | |
| print(f" 🧠 Generando embeddings para {len(texts_for_embedding)} textos...") | |
| # Generar embeddings en batches para eficiencia | |
| # IMPORTANTE: Usar prefijo "passage: " para indexar chunks (requerido por e5-small) | |
| batch_size = 32 | |
| all_embeddings = [] | |
| for i in range(0, len(texts_for_embedding), batch_size): | |
| batch = texts_for_embedding[i:i+batch_size] | |
| # Aplicar prefijo de passage para mejor retrieval | |
| batch_with_prefix = ["passage: " + text for text in batch] | |
| batch_embeddings = self._generate_embeddings_batch(batch_with_prefix) | |
| all_embeddings.append(batch_embeddings) | |
| print(f" Batch {i//batch_size + 1}: {len(batch)} embeddings generados (con prefijo 'passage:')") | |
| embeddings = np.vstack(all_embeddings) | |
| print(f" ✅ {len(embeddings)} embeddings generados (dimensión: {embeddings.shape[1]})") | |
| # Añadir al vector store | |
| print(f" 💾 Añadiendo documentos al índice FAISS...") | |
| self.vector_store.add_documents(documents, embeddings) | |
| self.stats["total_chunks"] = len(documents) | |
| self.stats["loaded_at"] = datetime.now().isoformat() | |
| self._generate_report(chunks_path) | |
| return self.stats | |
| except Exception as e: | |
| logger.error(f"Error cargando chunks: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {"error": str(e)} | |
| def _create_rag_content(self, chunk: Dict) -> str: | |
| """Crea contenido enriquecido para el vector store""" | |
| parts = [] | |
| doc_type = chunk.get('doc_type', 'documento').upper() | |
| parts.append(f"[{doc_type}]") | |
| metadata = chunk.get('metadata', {}) | |
| if 'title' in metadata: | |
| parts.append(f"\n## {metadata['title']}") | |
| elif 'section' in metadata: | |
| parts.append(f"\n## {metadata['section']}") | |
| page_range = chunk.get('page_range', '1-1') | |
| parts.append(f"\n📄 Páginas: {page_range}") | |
| text = chunk.get('text', '').strip() | |
| if text: | |
| parts.append(f"\n{text}") | |
| # Metadatos específicos según tipo de documento | |
| specific_metadata = [] | |
| if doc_type.lower() == 'convocatoria': | |
| if 'section_number' in metadata: | |
| specific_metadata.append(f"Sección: {metadata.get('section_number')}") | |
| elif doc_type.lower() == 'normativa': | |
| if 'article' in metadata: | |
| specific_metadata.append(f"Artículo: {metadata.get('article')}") | |
| if 'chapter' in metadata: | |
| specific_metadata.append(f"Capítulo: {metadata.get('chapter')}") | |
| elif doc_type.lower() == 'guia': | |
| if 'step_number' in metadata: | |
| specific_metadata.append(f"Paso: {metadata.get('step_number')}") | |
| elif doc_type.lower() == 'decalogo': | |
| if 'principle_number' in metadata: | |
| specific_metadata.append(f"Principio {metadata.get('principle_number')}") | |
| elif doc_type.lower() == 'politica_cero_tolerancia': | |
| if 'conduct_name' in metadata: | |
| specific_metadata.append(f"Conducta: {metadata.get('conduct_name')}") | |
| elif doc_type.lower() == 'protocolo': | |
| if 'term' in metadata: | |
| specific_metadata.append(f"Término: {metadata.get('term')}") | |
| elif doc_type.lower() == 'reglas_comunicacion': | |
| if 'rule_title' in metadata: | |
| specific_metadata.append(f"Regla: {metadata.get('rule_title')}") | |
| if specific_metadata: | |
| parts.append(f"\n📌 {', '.join(specific_metadata)}") | |
| source_file = chunk.get('source_file', 'Documento oficial') | |
| parts.append(f"\n📚 Fuente: {source_file}") | |
| return "\n".join(parts) | |
| def _infer_chunk_type(self, doc_type: str, metadata: Dict) -> str: | |
| """Inferir chunk_type basado en doc_type y metadatos""" | |
| doc_type_lower = doc_type.lower() | |
| if 'guia' in doc_type_lower or 'aspirante' in doc_type_lower: | |
| return 'paso' | |
| elif 'normativa' in doc_type_lower or 'reglamento' in doc_type_lower: | |
| return 'articulo' | |
| elif 'protocolo' in doc_type_lower: | |
| return 'conducta' | |
| elif 'decalogo' in doc_type_lower: | |
| return 'principio' | |
| elif 'glosario' in doc_type_lower: | |
| return 'termino' | |
| elif 'reglas_comunicacion' in doc_type_lower: | |
| return 'regla' | |
| elif 'convocatoria' in doc_type_lower: | |
| return 'pregunta' | |
| if metadata.get('step_number'): | |
| return 'paso' | |
| if metadata.get('article'): | |
| return 'articulo' | |
| if metadata.get('principle_number'): | |
| return 'principio' | |
| if metadata.get('term'): | |
| return 'termino' | |
| if metadata.get('conduct_name'): | |
| return 'conducta' | |
| return 'general' | |
| def _contains_dates(self, text: str) -> bool: | |
| """Detectar si el texto contiene fechas o plazos""" | |
| import re | |
| date_patterns = [ | |
| r'\d{1,2}/\d{1,2}/\d{2,4}', | |
| r'\d{1,2}-\d{1,2}-\d{2,4}', | |
| r'\d{4}-\d{2}-\d{2}', | |
| r'enero|febrero|marzo|abril|mayo|junio|julio|agosto|septiembre|octubre|noviembre|diciembre', | |
| r'vigencia|vence|plazo|días?|fecha' | |
| ] | |
| return any(re.search(p, text, re.IGNORECASE) for p in date_patterns) | |
| def _generate_report(self, chunks_path: str): | |
| """Genera un reporte de carga""" | |
| print("\n" + "=" * 60) | |
| print("📈 REPORTE DE CARGA COMPLETADO") | |
| print("=" * 60) | |
| print(f"\n📊 ESTADÍSTICAS:") | |
| print(f" 📂 Archivo fuente: {os.path.basename(chunks_path)}") | |
| print(f" ✅ Total chunks cargados: {self.stats['total_chunks']}") | |
| print(f" 🕐 Fecha de carga: {self.stats['loaded_at']}") | |
| if self.stats["by_type"]: | |
| print(f"\n📋 DISTRIBUCIÓN POR TIPO DE DOCUMENTO:") | |
| for doc_type, cantidad in sorted(self.stats["by_type"].items()): | |
| porcentaje = (cantidad / self.stats['total_chunks']) * 100 if self.stats['total_chunks'] > 0 else 0 | |
| barra = "█" * int(porcentaje / 5) | |
| print(f" • {doc_type:<35} {cantidad:3d} ({porcentaje:5.1f}%) {barra}") | |
| if self.stats["by_source"]: | |
| print(f"\n📋 DISTRIBUCIÓN POR ARCHIVO FUENTE:") | |
| for source, cantidad in sorted(self.stats["by_source"].items()): | |
| print(f" • {source[:60]:<60} {cantidad:3d} chunks") | |
| # Mostrar estado del índice FAISS | |
| faiss_stats = self.vector_store.get_stats() | |
| print(f"\n📊 ESTADO DEL ÍNDICE FAISS:") | |
| print(f" • Total vectores: {faiss_stats.get('index_size', 0)}") | |
| print(f" • Dimensión: {faiss_stats.get('embedding_dim', 'N/A')}") | |
| print(f" • Directorio persistencia: {self.vector_store.persist_directory}") | |
| print("\n✅ CARGA COMPLETADA EXITOSAMENTE") | |
| print("\n" + "=" * 60) | |
| print("🎯 VECTOR STORE ACTUALIZADO") | |
| print("=" * 60) | |
| print(f"\n📚 Total documentos en índice: {self.stats['total_chunks']}") | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description='Sistema de Carga de Chunks al Vector Store FAISS' | |
| ) | |
| parser.add_argument( | |
| '--chunks', | |
| type=str, | |
| default='../Documentos RAG/output/chunks/ready_for_rag/all_chunks.jsonl', | |
| help='Ruta al archivo all_chunks.jsonl' | |
| ) | |
| args = parser.parse_args() | |
| chunks_path = Path(args.chunks) | |
| if not chunks_path.exists(): | |
| print(f"\n❌ ERROR: No se encuentra el archivo: {chunks_path}") | |
| print(f" 📍 Ruta absoluta: {chunks_path.absolute()}") | |
| return | |
| loader = ChunksRAGLoader() | |
| stats = loader.load_chunks_file(str(chunks_path)) | |
| if "error" not in stats and stats.get('total_chunks', 0) > 0: | |
| print(f"\n✅ ÉXITO: {stats['total_chunks']} chunks cargados") | |
| elif "error" in stats: | |
| print(f"\n❌ Error: {stats['error']}") | |
| if __name__ == "__main__": | |
| main() |