Spaces:
Sleeping
Sleeping
| """Módulo de operaciones de índice para RAG en producción. | |
| Provee sincronización de índices con hash-based diff, deduplicación | |
| de chunks y versionamiento de documentos. | |
| """ | |
| import hashlib | |
| import json | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| def compute_doc_hash(content: str) -> str: | |
| """Calcula el SHA-256 de un string y devuelve el hexdigest.""" | |
| return hashlib.sha256(content.encode("utf-8")).hexdigest() | |
| def generate_chunk_id(doc_id: str, index: int, text: str) -> str: | |
| """Genera un ID determinista para cada chunk. | |
| Formato: {doc_id}::chunk_{index}::{hash_parcial} | |
| donde hash_parcial son los primeros 8 caracteres del MD5 del texto. | |
| """ | |
| hash_parcial = hashlib.md5(text.encode("utf-8")).hexdigest()[:8] | |
| return f"{doc_id}::chunk_{index}::{hash_parcial}" | |
| def load_registry(path: Path) -> dict: | |
| """Carga un JSON desde disco. Si no existe, devuelve dict vacío.""" | |
| path = Path(path) | |
| if not path.exists(): | |
| return {} | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, IOError): | |
| return {} | |
| def save_registry(registry: dict, path: Path) -> None: | |
| """Guarda el diccionario como JSON con indent=2.""" | |
| path = Path(path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(registry, f, indent=2, ensure_ascii=False) | |
| def sync_documents( | |
| docs: list[dict], | |
| vectorstore, | |
| registry_path: str, | |
| ) -> dict: | |
| """Sincroniza documentos con el vector store usando hash-based diff. | |
| Args: | |
| docs: Lista de documentos con keys "id" y "content". | |
| vectorstore: Colección de ChromaDB. | |
| registry_path: Ruta al archivo JSON de registry. | |
| Returns: | |
| Diccionario con contadores: new, modified, deleted, unchanged. | |
| """ | |
| registry = load_registry(Path(registry_path)) | |
| current_doc_ids = {doc["id"] for doc in docs} | |
| counters = {"new": 0, "modified": 0, "deleted": 0, "unchanged": 0} | |
| # Procesar cada documento actual | |
| for doc in docs: | |
| doc_id = doc["id"] | |
| content = doc["content"] | |
| current_hash = compute_doc_hash(content) | |
| if doc_id not in registry: | |
| # Documento nuevo | |
| chunk_id = generate_chunk_id(doc_id, 0, content) | |
| vectorstore.add( | |
| ids=[chunk_id], | |
| documents=[content], | |
| metadatas=[{"doc_id": doc_id, "is_current": True}], | |
| ) | |
| registry[doc_id] = { | |
| "hash": current_hash, | |
| "chunk_ids": [chunk_id], | |
| "updated_at": time.time(), | |
| } | |
| counters["new"] += 1 | |
| elif registry[doc_id]["hash"] != current_hash: | |
| # Documento modificado: eliminar chunks viejos y re-indexar | |
| old_chunk_ids = registry[doc_id].get("chunk_ids", []) | |
| if old_chunk_ids: | |
| try: | |
| vectorstore.delete(ids=old_chunk_ids) | |
| except Exception: | |
| pass | |
| chunk_id = generate_chunk_id(doc_id, 0, content) | |
| vectorstore.add( | |
| ids=[chunk_id], | |
| documents=[content], | |
| metadatas=[{"doc_id": doc_id, "is_current": True}], | |
| ) | |
| registry[doc_id] = { | |
| "hash": current_hash, | |
| "chunk_ids": [chunk_id], | |
| "updated_at": time.time(), | |
| } | |
| counters["modified"] += 1 | |
| else: | |
| # Sin cambios | |
| counters["unchanged"] += 1 | |
| # Detectar documentos eliminados | |
| deleted_ids = set(registry.keys()) - current_doc_ids | |
| for doc_id in deleted_ids: | |
| old_chunk_ids = registry[doc_id].get("chunk_ids", []) | |
| if old_chunk_ids: | |
| try: | |
| vectorstore.delete(ids=old_chunk_ids) | |
| except Exception: | |
| pass | |
| del registry[doc_id] | |
| counters["deleted"] += 1 | |
| save_registry(registry, Path(registry_path)) | |
| print( | |
| f"Sync completado: {counters['new']} nuevos, " | |
| f"{counters['modified']} modificados, " | |
| f"{counters['deleted']} eliminados, " | |
| f"{counters['unchanged']} sin cambios" | |
| ) | |
| return counters | |
| def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: | |
| """Calcula la similitud coseno entre dos vectores.""" | |
| norm_a = np.linalg.norm(a) | |
| norm_b = np.linalg.norm(b) | |
| if norm_a == 0 or norm_b == 0: | |
| return 0.0 | |
| return float(np.dot(a, b) / (norm_a * norm_b)) | |
| def deduplicate_chunks( | |
| chunks: list[dict], | |
| embeddings: list[list[float]], | |
| sim_threshold: float = 0.95, | |
| ) -> list[dict]: | |
| """Deduplica chunks por hash exacto y similitud semántica. | |
| Args: | |
| chunks: Lista de dicts con al menos key "text". | |
| embeddings: Lista de embeddings correspondientes a cada chunk. | |
| sim_threshold: Umbral de similitud coseno para dedup semántica. | |
| Returns: | |
| Lista de chunks únicos. | |
| """ | |
| # Paso 1: Deduplicación exacta por hash MD5 | |
| seen_hashes: set[str] = set() | |
| exact_unique: list[tuple[dict, list[float]]] = [] | |
| for chunk, emb in zip(chunks, embeddings): | |
| text_hash = hashlib.md5(chunk["text"].encode("utf-8")).hexdigest() | |
| if text_hash not in seen_hashes: | |
| seen_hashes.add(text_hash) | |
| exact_unique.append((chunk, emb)) | |
| # Paso 2: Deduplicación semántica por similitud coseno | |
| semantic_unique: list[dict] = [] | |
| unique_embeddings: list[np.ndarray] = [] | |
| for chunk, emb in exact_unique: | |
| emb_array = np.array(emb) | |
| is_duplicate = False | |
| for existing_emb in unique_embeddings: | |
| sim = _cosine_similarity(emb_array, existing_emb) | |
| if sim >= sim_threshold: | |
| is_duplicate = True | |
| break | |
| if not is_duplicate: | |
| semantic_unique.append(chunk) | |
| unique_embeddings.append(emb_array) | |
| return semantic_unique | |
| def ingest_new_version( | |
| doc: dict, | |
| vectorstore, | |
| chunks: list[str], | |
| ) -> int: | |
| """Ingesta una nueva versión de un documento. | |
| Marca chunks de versiones anteriores como is_current=False e indexa | |
| los nuevos chunks con la versión incrementada. | |
| Args: | |
| doc: Diccionario con key "id". | |
| vectorstore: Colección de ChromaDB. | |
| chunks: Lista de strings (textos de los chunks). | |
| Returns: | |
| Número de la nueva versión. | |
| """ | |
| doc_id = doc["id"] | |
| # Determinar la versión máxima existente | |
| max_version = 0 | |
| try: | |
| existing = vectorstore.get( | |
| where={"doc_id": doc_id}, | |
| ) | |
| if existing and existing["ids"]: | |
| for meta in existing["metadatas"]: | |
| v = meta.get("version", 0) | |
| if v > max_version: | |
| max_version = v | |
| # Marcar chunks anteriores como no actuales | |
| vectorstore.update( | |
| ids=existing["ids"], | |
| metadatas=[ | |
| {**meta, "is_current": False} | |
| for meta in existing["metadatas"] | |
| ], | |
| ) | |
| except Exception: | |
| pass | |
| new_version = max_version + 1 | |
| # Indexar nuevos chunks | |
| new_ids = [] | |
| new_documents = [] | |
| new_metadatas = [] | |
| for i, chunk_text in enumerate(chunks): | |
| chunk_id = generate_chunk_id(doc_id, i, chunk_text) | |
| # Agregar version al ID para evitar colisiones | |
| versioned_id = f"{chunk_id}::v{new_version}" | |
| new_ids.append(versioned_id) | |
| new_documents.append(chunk_text) | |
| new_metadatas.append({ | |
| "doc_id": doc_id, | |
| "version": new_version, | |
| "is_current": True, | |
| "created_at": time.time(), | |
| }) | |
| vectorstore.add( | |
| ids=new_ids, | |
| documents=new_documents, | |
| metadatas=new_metadatas, | |
| ) | |
| return new_version | |