Spaces:
Sleeping
Sleeping
File size: 7,972 Bytes
d0d2f42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | """Módulo de operaciones de índice para RAG en producción.
Provee sincronización de índices con hash-based diff, deduplicación
de chunks y versionamiento de documentos.
"""
import hashlib
import json
import time
from pathlib import Path
import numpy as np
def compute_doc_hash(content: str) -> str:
"""Calcula el SHA-256 de un string y devuelve el hexdigest."""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def generate_chunk_id(doc_id: str, index: int, text: str) -> str:
"""Genera un ID determinista para cada chunk.
Formato: {doc_id}::chunk_{index}::{hash_parcial}
donde hash_parcial son los primeros 8 caracteres del MD5 del texto.
"""
hash_parcial = hashlib.md5(text.encode("utf-8")).hexdigest()[:8]
return f"{doc_id}::chunk_{index}::{hash_parcial}"
def load_registry(path: Path) -> dict:
"""Carga un JSON desde disco. Si no existe, devuelve dict vacío."""
path = Path(path)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_registry(registry: dict, path: Path) -> None:
"""Guarda el diccionario como JSON con indent=2."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(registry, f, indent=2, ensure_ascii=False)
def sync_documents(
docs: list[dict],
vectorstore,
registry_path: str,
) -> dict:
"""Sincroniza documentos con el vector store usando hash-based diff.
Args:
docs: Lista de documentos con keys "id" y "content".
vectorstore: Colección de ChromaDB.
registry_path: Ruta al archivo JSON de registry.
Returns:
Diccionario con contadores: new, modified, deleted, unchanged.
"""
registry = load_registry(Path(registry_path))
current_doc_ids = {doc["id"] for doc in docs}
counters = {"new": 0, "modified": 0, "deleted": 0, "unchanged": 0}
# Procesar cada documento actual
for doc in docs:
doc_id = doc["id"]
content = doc["content"]
current_hash = compute_doc_hash(content)
if doc_id not in registry:
# Documento nuevo
chunk_id = generate_chunk_id(doc_id, 0, content)
vectorstore.add(
ids=[chunk_id],
documents=[content],
metadatas=[{"doc_id": doc_id, "is_current": True}],
)
registry[doc_id] = {
"hash": current_hash,
"chunk_ids": [chunk_id],
"updated_at": time.time(),
}
counters["new"] += 1
elif registry[doc_id]["hash"] != current_hash:
# Documento modificado: eliminar chunks viejos y re-indexar
old_chunk_ids = registry[doc_id].get("chunk_ids", [])
if old_chunk_ids:
try:
vectorstore.delete(ids=old_chunk_ids)
except Exception:
pass
chunk_id = generate_chunk_id(doc_id, 0, content)
vectorstore.add(
ids=[chunk_id],
documents=[content],
metadatas=[{"doc_id": doc_id, "is_current": True}],
)
registry[doc_id] = {
"hash": current_hash,
"chunk_ids": [chunk_id],
"updated_at": time.time(),
}
counters["modified"] += 1
else:
# Sin cambios
counters["unchanged"] += 1
# Detectar documentos eliminados
deleted_ids = set(registry.keys()) - current_doc_ids
for doc_id in deleted_ids:
old_chunk_ids = registry[doc_id].get("chunk_ids", [])
if old_chunk_ids:
try:
vectorstore.delete(ids=old_chunk_ids)
except Exception:
pass
del registry[doc_id]
counters["deleted"] += 1
save_registry(registry, Path(registry_path))
print(
f"Sync completado: {counters['new']} nuevos, "
f"{counters['modified']} modificados, "
f"{counters['deleted']} eliminados, "
f"{counters['unchanged']} sin cambios"
)
return counters
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Calcula la similitud coseno entre dos vectores."""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))
def deduplicate_chunks(
chunks: list[dict],
embeddings: list[list[float]],
sim_threshold: float = 0.95,
) -> list[dict]:
"""Deduplica chunks por hash exacto y similitud semántica.
Args:
chunks: Lista de dicts con al menos key "text".
embeddings: Lista de embeddings correspondientes a cada chunk.
sim_threshold: Umbral de similitud coseno para dedup semántica.
Returns:
Lista de chunks únicos.
"""
# Paso 1: Deduplicación exacta por hash MD5
seen_hashes: set[str] = set()
exact_unique: list[tuple[dict, list[float]]] = []
for chunk, emb in zip(chunks, embeddings):
text_hash = hashlib.md5(chunk["text"].encode("utf-8")).hexdigest()
if text_hash not in seen_hashes:
seen_hashes.add(text_hash)
exact_unique.append((chunk, emb))
# Paso 2: Deduplicación semántica por similitud coseno
semantic_unique: list[dict] = []
unique_embeddings: list[np.ndarray] = []
for chunk, emb in exact_unique:
emb_array = np.array(emb)
is_duplicate = False
for existing_emb in unique_embeddings:
sim = _cosine_similarity(emb_array, existing_emb)
if sim >= sim_threshold:
is_duplicate = True
break
if not is_duplicate:
semantic_unique.append(chunk)
unique_embeddings.append(emb_array)
return semantic_unique
def ingest_new_version(
doc: dict,
vectorstore,
chunks: list[str],
) -> int:
"""Ingesta una nueva versión de un documento.
Marca chunks de versiones anteriores como is_current=False e indexa
los nuevos chunks con la versión incrementada.
Args:
doc: Diccionario con key "id".
vectorstore: Colección de ChromaDB.
chunks: Lista de strings (textos de los chunks).
Returns:
Número de la nueva versión.
"""
doc_id = doc["id"]
# Determinar la versión máxima existente
max_version = 0
try:
existing = vectorstore.get(
where={"doc_id": doc_id},
)
if existing and existing["ids"]:
for meta in existing["metadatas"]:
v = meta.get("version", 0)
if v > max_version:
max_version = v
# Marcar chunks anteriores como no actuales
vectorstore.update(
ids=existing["ids"],
metadatas=[
{**meta, "is_current": False}
for meta in existing["metadatas"]
],
)
except Exception:
pass
new_version = max_version + 1
# Indexar nuevos chunks
new_ids = []
new_documents = []
new_metadatas = []
for i, chunk_text in enumerate(chunks):
chunk_id = generate_chunk_id(doc_id, i, chunk_text)
# Agregar version al ID para evitar colisiones
versioned_id = f"{chunk_id}::v{new_version}"
new_ids.append(versioned_id)
new_documents.append(chunk_text)
new_metadatas.append({
"doc_id": doc_id,
"version": new_version,
"is_current": True,
"created_at": time.time(),
})
vectorstore.add(
ids=new_ids,
documents=new_documents,
metadatas=new_metadatas,
)
return new_version
|