DocAgentSystem / rag /index_ops.py
RamsesCamas's picture
Initial clean commit for HF Space deployment
d0d2f42
"""Módulo de operaciones de índice para RAG en producción.
Provee sincronización de índices con hash-based diff, deduplicación
de chunks y versionamiento de documentos.
"""
import hashlib
import json
import time
from pathlib import Path
import numpy as np
def compute_doc_hash(content: str) -> str:
"""Calcula el SHA-256 de un string y devuelve el hexdigest."""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def generate_chunk_id(doc_id: str, index: int, text: str) -> str:
"""Genera un ID determinista para cada chunk.
Formato: {doc_id}::chunk_{index}::{hash_parcial}
donde hash_parcial son los primeros 8 caracteres del MD5 del texto.
"""
hash_parcial = hashlib.md5(text.encode("utf-8")).hexdigest()[:8]
return f"{doc_id}::chunk_{index}::{hash_parcial}"
def load_registry(path: Path) -> dict:
"""Carga un JSON desde disco. Si no existe, devuelve dict vacío."""
path = Path(path)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_registry(registry: dict, path: Path) -> None:
"""Guarda el diccionario como JSON con indent=2."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(registry, f, indent=2, ensure_ascii=False)
def sync_documents(
docs: list[dict],
vectorstore,
registry_path: str,
) -> dict:
"""Sincroniza documentos con el vector store usando hash-based diff.
Args:
docs: Lista de documentos con keys "id" y "content".
vectorstore: Colección de ChromaDB.
registry_path: Ruta al archivo JSON de registry.
Returns:
Diccionario con contadores: new, modified, deleted, unchanged.
"""
registry = load_registry(Path(registry_path))
current_doc_ids = {doc["id"] for doc in docs}
counters = {"new": 0, "modified": 0, "deleted": 0, "unchanged": 0}
# Procesar cada documento actual
for doc in docs:
doc_id = doc["id"]
content = doc["content"]
current_hash = compute_doc_hash(content)
if doc_id not in registry:
# Documento nuevo
chunk_id = generate_chunk_id(doc_id, 0, content)
vectorstore.add(
ids=[chunk_id],
documents=[content],
metadatas=[{"doc_id": doc_id, "is_current": True}],
)
registry[doc_id] = {
"hash": current_hash,
"chunk_ids": [chunk_id],
"updated_at": time.time(),
}
counters["new"] += 1
elif registry[doc_id]["hash"] != current_hash:
# Documento modificado: eliminar chunks viejos y re-indexar
old_chunk_ids = registry[doc_id].get("chunk_ids", [])
if old_chunk_ids:
try:
vectorstore.delete(ids=old_chunk_ids)
except Exception:
pass
chunk_id = generate_chunk_id(doc_id, 0, content)
vectorstore.add(
ids=[chunk_id],
documents=[content],
metadatas=[{"doc_id": doc_id, "is_current": True}],
)
registry[doc_id] = {
"hash": current_hash,
"chunk_ids": [chunk_id],
"updated_at": time.time(),
}
counters["modified"] += 1
else:
# Sin cambios
counters["unchanged"] += 1
# Detectar documentos eliminados
deleted_ids = set(registry.keys()) - current_doc_ids
for doc_id in deleted_ids:
old_chunk_ids = registry[doc_id].get("chunk_ids", [])
if old_chunk_ids:
try:
vectorstore.delete(ids=old_chunk_ids)
except Exception:
pass
del registry[doc_id]
counters["deleted"] += 1
save_registry(registry, Path(registry_path))
print(
f"Sync completado: {counters['new']} nuevos, "
f"{counters['modified']} modificados, "
f"{counters['deleted']} eliminados, "
f"{counters['unchanged']} sin cambios"
)
return counters
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Calcula la similitud coseno entre dos vectores."""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))
def deduplicate_chunks(
chunks: list[dict],
embeddings: list[list[float]],
sim_threshold: float = 0.95,
) -> list[dict]:
"""Deduplica chunks por hash exacto y similitud semántica.
Args:
chunks: Lista de dicts con al menos key "text".
embeddings: Lista de embeddings correspondientes a cada chunk.
sim_threshold: Umbral de similitud coseno para dedup semántica.
Returns:
Lista de chunks únicos.
"""
# Paso 1: Deduplicación exacta por hash MD5
seen_hashes: set[str] = set()
exact_unique: list[tuple[dict, list[float]]] = []
for chunk, emb in zip(chunks, embeddings):
text_hash = hashlib.md5(chunk["text"].encode("utf-8")).hexdigest()
if text_hash not in seen_hashes:
seen_hashes.add(text_hash)
exact_unique.append((chunk, emb))
# Paso 2: Deduplicación semántica por similitud coseno
semantic_unique: list[dict] = []
unique_embeddings: list[np.ndarray] = []
for chunk, emb in exact_unique:
emb_array = np.array(emb)
is_duplicate = False
for existing_emb in unique_embeddings:
sim = _cosine_similarity(emb_array, existing_emb)
if sim >= sim_threshold:
is_duplicate = True
break
if not is_duplicate:
semantic_unique.append(chunk)
unique_embeddings.append(emb_array)
return semantic_unique
def ingest_new_version(
doc: dict,
vectorstore,
chunks: list[str],
) -> int:
"""Ingesta una nueva versión de un documento.
Marca chunks de versiones anteriores como is_current=False e indexa
los nuevos chunks con la versión incrementada.
Args:
doc: Diccionario con key "id".
vectorstore: Colección de ChromaDB.
chunks: Lista de strings (textos de los chunks).
Returns:
Número de la nueva versión.
"""
doc_id = doc["id"]
# Determinar la versión máxima existente
max_version = 0
try:
existing = vectorstore.get(
where={"doc_id": doc_id},
)
if existing and existing["ids"]:
for meta in existing["metadatas"]:
v = meta.get("version", 0)
if v > max_version:
max_version = v
# Marcar chunks anteriores como no actuales
vectorstore.update(
ids=existing["ids"],
metadatas=[
{**meta, "is_current": False}
for meta in existing["metadatas"]
],
)
except Exception:
pass
new_version = max_version + 1
# Indexar nuevos chunks
new_ids = []
new_documents = []
new_metadatas = []
for i, chunk_text in enumerate(chunks):
chunk_id = generate_chunk_id(doc_id, i, chunk_text)
# Agregar version al ID para evitar colisiones
versioned_id = f"{chunk_id}::v{new_version}"
new_ids.append(versioned_id)
new_documents.append(chunk_text)
new_metadatas.append({
"doc_id": doc_id,
"version": new_version,
"is_current": True,
"created_at": time.time(),
})
vectorstore.add(
ids=new_ids,
documents=new_documents,
metadatas=new_metadatas,
)
return new_version