Chatbot-RAG-v4 / scripts /load_chunks_to_rag.py
NoeMartinezSanchez
feat: cambiar embedding a e5-small con prefijos passage/query
2f12dea
#!/usr/bin/env python3
"""
SISTEMA DE CARGA DE CHUNKS RAG DESDE JSONL
Carga los chunks generados al vector store FAISS (sin generador)
"""
import os
import sys
import json
from typing import List, Dict, Any
from datetime import datetime
from pathlib import Path
# Configurar path
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.insert(0, project_root)
print(f"📂 Directorio raíz: {project_root}")
from rag.embeddings import EmbeddingModel
from rag.retriever import VectorStoreFAISS
import logging
import numpy as np
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ChunksRAGLoader:
"""Cargador de chunks JSONL al vector store FAISS"""
def __init__(self):
print("🔄 Inicializando embeddings...")
self.embeddings = EmbeddingModel()
self.vector_store = VectorStoreFAISS()
print(f" ✅ VectorStoreFAISS inicializado")
self.stats = {
"total_chunks": 0,
"by_type": {},
"by_source": {},
"loaded_at": None
}
def _generate_embeddings_batch(self, texts: List[str]) -> np.ndarray:
"""Genera embeddings para una lista de textos"""
# Ya se aplicó prefijo "passage: " antes de llamar
return self.embeddings.embed_batch(texts, is_passage=True)
def load_chunks_file(self, chunks_path: str) -> Dict:
"""Carga archivo JSONL de chunks al vector store"""
print("=" * 60)
print("📊 CARGA DE CHUNKS AL VECTOR STORE")
print("=" * 60)
if not os.path.exists(chunks_path):
print(f"❌ ERROR: El archivo {chunks_path} no existe")
return {"error": "Archivo no encontrado"}
try:
print(f"\n📁 Cargando chunks desde: {chunks_path}")
# Primero, leer todos los chunks
chunks_list = []
with open(chunks_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
chunk = json.loads(line)
chunks_list.append(chunk)
except json.JSONDecodeError as e:
logger.warning(f"Error decodificando línea {line_num}: {e}")
print(f" 📄 Total chunks encontrados: {len(chunks_list)}")
if not chunks_list:
return {"error": "No se encontraron chunks válidos"}
# Preparar documentos y embeddings
documents = []
embeddings_list = []
texts_for_embedding = []
for chunk in chunks_list:
text = chunk.get('text', '')
if not text:
continue
doc_type = chunk.get('doc_type', 'unknown')
source_file = chunk.get('source_file', 'unknown')
metadata = chunk.get('metadata', {})
chunk_id = chunk.get('chunk_id', 'unknown')
# Crear contenido enriquecido
content = self._create_rag_content(chunk)
texts_for_embedding.append(content)
# Consolidar TODOS los metadatos del chunk original
all_metadata = {
**metadata,
'chunk_id': chunk_id,
'doc_type': doc_type,
'source_file': source_file,
'page_range': chunk.get('page_range', '1-1'),
'imported_at': datetime.now().isoformat()
}
all_metadata.update({
'chunk_type': chunk.get('chunk_type', self._infer_chunk_type(doc_type, metadata)),
'importance': chunk.get('importance', 0.5),
'severity': chunk.get('severity', ''),
'action_type': chunk.get('action_type', ''),
'has_dates': chunk.get('has_dates', self._contains_dates(content)),
'title': metadata.get('title', ''),
'section': metadata.get('section', ''),
'term': metadata.get('term', ''),
'article': metadata.get('article', ''),
'chapter': metadata.get('chapter', ''),
'step_number': metadata.get('step_number', ''),
'principle_number': metadata.get('principle_number', ''),
'conduct_name': metadata.get('conduct_name', ''),
'rule_title': metadata.get('rule_title', ''),
'section_number': metadata.get('section_number', '')
})
doc = {
'content': content,
'metadata': all_metadata
}
documents.append(doc)
# Actualizar estadísticas
self.stats["by_type"][doc_type] = self.stats["by_type"].get(doc_type, 0) + 1
self.stats["by_source"][source_file] = self.stats["by_source"].get(source_file, 0) + 1
print(f" 🧠 Generando embeddings para {len(texts_for_embedding)} textos...")
# Generar embeddings en batches para eficiencia
# IMPORTANTE: Usar prefijo "passage: " para indexar chunks (requerido por e5-small)
batch_size = 32
all_embeddings = []
for i in range(0, len(texts_for_embedding), batch_size):
batch = texts_for_embedding[i:i+batch_size]
# Aplicar prefijo de passage para mejor retrieval
batch_with_prefix = ["passage: " + text for text in batch]
batch_embeddings = self._generate_embeddings_batch(batch_with_prefix)
all_embeddings.append(batch_embeddings)
print(f" Batch {i//batch_size + 1}: {len(batch)} embeddings generados (con prefijo 'passage:')")
embeddings = np.vstack(all_embeddings)
print(f" ✅ {len(embeddings)} embeddings generados (dimensión: {embeddings.shape[1]})")
# Añadir al vector store
print(f" 💾 Añadiendo documentos al índice FAISS...")
self.vector_store.add_documents(documents, embeddings)
self.stats["total_chunks"] = len(documents)
self.stats["loaded_at"] = datetime.now().isoformat()
self._generate_report(chunks_path)
return self.stats
except Exception as e:
logger.error(f"Error cargando chunks: {e}")
import traceback
traceback.print_exc()
return {"error": str(e)}
def _create_rag_content(self, chunk: Dict) -> str:
"""Crea contenido enriquecido para el vector store"""
parts = []
doc_type = chunk.get('doc_type', 'documento').upper()
parts.append(f"[{doc_type}]")
metadata = chunk.get('metadata', {})
if 'title' in metadata:
parts.append(f"\n## {metadata['title']}")
elif 'section' in metadata:
parts.append(f"\n## {metadata['section']}")
page_range = chunk.get('page_range', '1-1')
parts.append(f"\n📄 Páginas: {page_range}")
text = chunk.get('text', '').strip()
if text:
parts.append(f"\n{text}")
# Metadatos específicos según tipo de documento
specific_metadata = []
if doc_type.lower() == 'convocatoria':
if 'section_number' in metadata:
specific_metadata.append(f"Sección: {metadata.get('section_number')}")
elif doc_type.lower() == 'normativa':
if 'article' in metadata:
specific_metadata.append(f"Artículo: {metadata.get('article')}")
if 'chapter' in metadata:
specific_metadata.append(f"Capítulo: {metadata.get('chapter')}")
elif doc_type.lower() == 'guia':
if 'step_number' in metadata:
specific_metadata.append(f"Paso: {metadata.get('step_number')}")
elif doc_type.lower() == 'decalogo':
if 'principle_number' in metadata:
specific_metadata.append(f"Principio {metadata.get('principle_number')}")
elif doc_type.lower() == 'politica_cero_tolerancia':
if 'conduct_name' in metadata:
specific_metadata.append(f"Conducta: {metadata.get('conduct_name')}")
elif doc_type.lower() == 'protocolo':
if 'term' in metadata:
specific_metadata.append(f"Término: {metadata.get('term')}")
elif doc_type.lower() == 'reglas_comunicacion':
if 'rule_title' in metadata:
specific_metadata.append(f"Regla: {metadata.get('rule_title')}")
if specific_metadata:
parts.append(f"\n📌 {', '.join(specific_metadata)}")
source_file = chunk.get('source_file', 'Documento oficial')
parts.append(f"\n📚 Fuente: {source_file}")
return "\n".join(parts)
def _infer_chunk_type(self, doc_type: str, metadata: Dict) -> str:
"""Inferir chunk_type basado en doc_type y metadatos"""
doc_type_lower = doc_type.lower()
if 'guia' in doc_type_lower or 'aspirante' in doc_type_lower:
return 'paso'
elif 'normativa' in doc_type_lower or 'reglamento' in doc_type_lower:
return 'articulo'
elif 'protocolo' in doc_type_lower:
return 'conducta'
elif 'decalogo' in doc_type_lower:
return 'principio'
elif 'glosario' in doc_type_lower:
return 'termino'
elif 'reglas_comunicacion' in doc_type_lower:
return 'regla'
elif 'convocatoria' in doc_type_lower:
return 'pregunta'
if metadata.get('step_number'):
return 'paso'
if metadata.get('article'):
return 'articulo'
if metadata.get('principle_number'):
return 'principio'
if metadata.get('term'):
return 'termino'
if metadata.get('conduct_name'):
return 'conducta'
return 'general'
def _contains_dates(self, text: str) -> bool:
"""Detectar si el texto contiene fechas o plazos"""
import re
date_patterns = [
r'\d{1,2}/\d{1,2}/\d{2,4}',
r'\d{1,2}-\d{1,2}-\d{2,4}',
r'\d{4}-\d{2}-\d{2}',
r'enero|febrero|marzo|abril|mayo|junio|julio|agosto|septiembre|octubre|noviembre|diciembre',
r'vigencia|vence|plazo|días?|fecha'
]
return any(re.search(p, text, re.IGNORECASE) for p in date_patterns)
def _generate_report(self, chunks_path: str):
"""Genera un reporte de carga"""
print("\n" + "=" * 60)
print("📈 REPORTE DE CARGA COMPLETADO")
print("=" * 60)
print(f"\n📊 ESTADÍSTICAS:")
print(f" 📂 Archivo fuente: {os.path.basename(chunks_path)}")
print(f" ✅ Total chunks cargados: {self.stats['total_chunks']}")
print(f" 🕐 Fecha de carga: {self.stats['loaded_at']}")
if self.stats["by_type"]:
print(f"\n📋 DISTRIBUCIÓN POR TIPO DE DOCUMENTO:")
for doc_type, cantidad in sorted(self.stats["by_type"].items()):
porcentaje = (cantidad / self.stats['total_chunks']) * 100 if self.stats['total_chunks'] > 0 else 0
barra = "█" * int(porcentaje / 5)
print(f" • {doc_type:<35} {cantidad:3d} ({porcentaje:5.1f}%) {barra}")
if self.stats["by_source"]:
print(f"\n📋 DISTRIBUCIÓN POR ARCHIVO FUENTE:")
for source, cantidad in sorted(self.stats["by_source"].items()):
print(f" • {source[:60]:<60} {cantidad:3d} chunks")
# Mostrar estado del índice FAISS
faiss_stats = self.vector_store.get_stats()
print(f"\n📊 ESTADO DEL ÍNDICE FAISS:")
print(f" • Total vectores: {faiss_stats.get('index_size', 0)}")
print(f" • Dimensión: {faiss_stats.get('embedding_dim', 'N/A')}")
print(f" • Directorio persistencia: {self.vector_store.persist_directory}")
print("\n✅ CARGA COMPLETADA EXITOSAMENTE")
print("\n" + "=" * 60)
print("🎯 VECTOR STORE ACTUALIZADO")
print("=" * 60)
print(f"\n📚 Total documentos en índice: {self.stats['total_chunks']}")
def main():
import argparse
parser = argparse.ArgumentParser(
description='Sistema de Carga de Chunks al Vector Store FAISS'
)
parser.add_argument(
'--chunks',
type=str,
default='../Documentos RAG/output/chunks/ready_for_rag/all_chunks.jsonl',
help='Ruta al archivo all_chunks.jsonl'
)
args = parser.parse_args()
chunks_path = Path(args.chunks)
if not chunks_path.exists():
print(f"\n❌ ERROR: No se encuentra el archivo: {chunks_path}")
print(f" 📍 Ruta absoluta: {chunks_path.absolute()}")
return
loader = ChunksRAGLoader()
stats = loader.load_chunks_file(str(chunks_path))
if "error" not in stats and stats.get('total_chunks', 0) > 0:
print(f"\n✅ ÉXITO: {stats['total_chunks']} chunks cargados")
elif "error" in stats:
print(f"\n❌ Error: {stats['error']}")
if __name__ == "__main__":
main()