File size: 16,471 Bytes
fb12ddc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 | # HF_Space_hipVS/vector_store.py
# ================================
# Multi-project vector store with 3-tier GPU acceleration.
#
# Key design:
# - Each project gets its own VectorStore instances (image_index, video_index)
# - CAGRA is rebuilt on every insert (optimized for query, not ingestion)
# - Indexes swap between NVMe and VRAM via async pinned-memory DMA
# - Multiple projects coexist by LRU-evicting cold indexes from VRAM
#
# Tiers:
# 1. CAGRA graph (hipVS / cuVS) β ANN search in ~50us
# 2. PyTorch flat tensor (hipBLAS matmul) β brute-force GPU
# 3. NumPy CPU cosine similarity β works everywhere
import json
import logging
import threading
import numpy as np
from pathlib import Path
from config import USE_GPU, HF_TOKEN, HF_DATASET_REPO, SWAP_PATH, get_project_dir
logger = logging.getLogger(__name__)
# ββ GPU Backend Detection ββββββββββββββββββββββββββββββββββββββββββββββββββββ
_HIPVS_AVAILABLE = False
_TORCH_CUDA_AVAILABLE = False
_cagra = None
if USE_GPU:
try:
from cuvs.neighbors import cagra as _cagra_mod
_cagra = _cagra_mod
_HIPVS_AVAILABLE = True
logger.info("Tier 1: hipVS (cuvs) -- CAGRA index enabled")
except ImportError:
pass
if not _HIPVS_AVAILABLE:
try:
import torch
if torch.cuda.is_available():
_TORCH_CUDA_AVAILABLE = True
props = torch.cuda.get_device_properties(0)
name = props.name.lower()
backend = "ROCm" if ("amd" in name or "radeon" in name) else "CUDA"
logger.info(f"Tier 2: PyTorch {backend} -- flat GPU search ({props.name})")
except ImportError:
pass
if not _HIPVS_AVAILABLE and not _TORCH_CUDA_AVAILABLE:
logger.info("Tier 3: NumPy CPU vector search")
# ββ HF Dataset Persistence ββββββββββββββββββββββββββββββββββββββββββββββββββ
def _hf_save(name: str, ids: list[str], vectors: np.ndarray, metadata: list[dict]):
if not HF_DATASET_REPO or not HF_TOKEN:
return
try:
from datasets import Dataset
records = [
{"id": ids[i], "vector": vectors[i].tolist(), "metadata": json.dumps(metadata[i])}
for i in range(len(ids))
]
ds = Dataset.from_list(records)
repo = f"{HF_DATASET_REPO}-{name}"
ds.push_to_hub(repo, token=HF_TOKEN, private=True)
logger.info(f"[{name}] Pushed {len(records)} vectors to HF Dataset")
except Exception as e:
logger.warning(f"[{name}] HF push failed: {e}")
def _hf_load(name: str):
if not HF_DATASET_REPO or not HF_TOKEN:
return None
try:
from datasets import load_dataset
repo = f"{HF_DATASET_REPO}-{name}"
ds = load_dataset(repo, token=HF_TOKEN, split="train")
logger.info(f"[{name}] Loaded {len(ds)} vectors from HF Dataset")
return ds
except Exception:
return None
# ββ VectorStore ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class VectorStore:
"""
GPU-backed vector store with NVMe swap and CAGRA rebuild-on-insert.
Lifecycle:
1. add(vectors, ids, meta) β bulk add + CAGRA rebuild + persist
2. append(vector, id, meta) β single add, NO rebuild (caller decides)
3. append_and_rebuild(v, id, meta) β single add + CAGRA rebuild + persist
4. search(query, top_k) β search (auto-loads from NVMe if needed)
5. evict() β free VRAM, keep NVMe
6. restore() β NVMe -> VRAM (async, pinned DMA)
"""
def __init__(self, name: str, index_dir: Path | None = None):
self.name = name
self._index_dir = index_dir or SWAP_PATH
self._index_dir.mkdir(parents=True, exist_ok=True)
self._vectors: np.ndarray | None = None
self._ids: list[str] = []
self._metadata: list[dict] = []
# GPU state
self._gpu_index = None # CAGRA index object
self._gpu_vecs = None # torch tensor (flat fallback)
self._in_vram = False
# File paths
self._npz_file = self._index_dir / f"{name}.npz"
self._meta_file = self._index_dir / f"{name}_meta.json"
self._cagra_file = self._index_dir / f"{name}.cagra"
# Load from disk on init
if self._npz_file.exists():
self._load_from_disk()
else:
self._load_from_hf()
# ββ Add / Append βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def add(self, vectors: np.ndarray, ids: list[str], metadata: list[dict] | None = None):
"""Bulk add vectors + rebuild CAGRA + persist."""
if len(vectors) == 0:
return
self._vectors = vectors.astype(np.float32)
self._ids = list(ids)
self._metadata = metadata or [{} for _ in ids]
self._normalize()
self.rebuild_gpu_index()
self._persist()
logger.info(f"[{self.name}] Indexed {len(ids)} vectors (mode={self.mode})")
def append(self, vector: np.ndarray, vid: str, meta: dict | None = None):
"""Append one vector. NO CAGRA rebuild (batch callers rebuild at end)."""
vector = vector.astype(np.float32).reshape(1, -1)
norm = np.linalg.norm(vector)
if norm > 0:
vector = vector / norm
if self._vectors is not None and len(self._vectors) > 0:
self._vectors = np.vstack([self._vectors, vector])
else:
self._vectors = vector
self._ids.append(vid)
self._metadata.append(meta or {})
self._in_vram = False # invalidate GPU index
def append_and_rebuild(self, vector: np.ndarray, vid: str, meta: dict | None = None):
"""Append one vector + rebuild CAGRA + persist."""
self.append(vector, vid, meta)
self.rebuild_gpu_index()
self._persist()
# ββ Search βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def search(self, query: np.ndarray, top_k: int = 10) -> list[dict]:
"""
Cosine similarity search. Auto-restores from NVMe if not in VRAM.
Returns list of dicts: {id, score, ...metadata}
"""
if self._vectors is None or len(self._vectors) == 0:
return []
query = query.astype(np.float32)
norm = np.linalg.norm(query)
if norm > 0:
query = query / norm
# Auto-load GPU index if needed
if ((_HIPVS_AVAILABLE or _TORCH_CUDA_AVAILABLE) and not self._in_vram):
self.rebuild_gpu_index()
if _HIPVS_AVAILABLE and self._gpu_index is not None:
return self._search_cagra(query, top_k)
elif _TORCH_CUDA_AVAILABLE and self._in_vram:
return self._search_torch(query, top_k)
return self._search_numpy(query, top_k)
def _search_numpy(self, query: np.ndarray, top_k: int) -> list[dict]:
scores = self._vectors @ query
k = min(top_k, len(self._ids))
if len(scores) > top_k:
idx = np.argpartition(scores, -k)[-k:]
idx = idx[np.argsort(scores[idx])[::-1]]
else:
idx = np.argsort(scores)[::-1][:k]
return [{"id": self._ids[i], "score": float(scores[i]), **self._metadata[i]} for i in idx]
def _search_cagra(self, query: np.ndarray, top_k: int) -> list[dict]:
import cupy as cp
q = cp.asarray(query.reshape(1, -1))
search_params = _cagra.SearchParams()
distances, indices = _cagra.search(search_params, self._gpu_index, q, top_k)
results = []
for idx, dist in zip(indices[0].get().tolist(), distances[0].get().tolist()):
if 0 <= idx < len(self._ids):
results.append({"id": self._ids[idx], "score": -float(dist), **self._metadata[idx]})
return results
def _search_torch(self, query: np.ndarray, top_k: int) -> list[dict]:
import torch
q = torch.from_numpy(query).to(self._gpu_vecs.device, dtype=self._gpu_vecs.dtype).unsqueeze(0)
scores = (q @ self._gpu_vecs.T).squeeze(0)
k = min(top_k, len(self._ids))
top_scores, top_idx = torch.topk(scores, k=k)
return [
{"id": self._ids[i], "score": float(s), **self._metadata[i]}
for i, s in zip(top_idx.cpu().tolist(), top_scores.cpu().tolist())
]
# ββ GPU Index Build (CAGRA rebuilt on every insert) ββββββββββββββββββββββ
def rebuild_gpu_index(self):
"""Build/rebuild the GPU index from current vectors."""
if self._vectors is None or len(self._vectors) == 0:
return
if _HIPVS_AVAILABLE:
self._build_cagra()
elif _TORCH_CUDA_AVAILABLE:
self._build_torch()
def _build_cagra(self):
import cupy as cp
d_vecs = cp.asarray(self._vectors)
params = _cagra.IndexParams()
params.metric = "sqeuclidean"
params.graph_degree = 64
params.intermediate_graph_degree = 128
params.build_algo = "IVF_PQ"
logger.info(f"[{self.name}] Building CAGRA ({self._vectors.shape}) ...")
self._gpu_index = _cagra.build(params, d_vecs)
# Serialize to NVMe for fast restore after eviction
_cagra.serialize(str(self._cagra_file), self._gpu_index)
self._in_vram = True
logger.info(f"[{self.name}] CAGRA built + serialized")
def _build_torch(self):
import torch
self._gpu_vecs = torch.from_numpy(self._vectors).cuda().half()
self._in_vram = True
# ββ NVMe <-> VRAM Swap βββββββββββββββββββββββββββββββββββββββββββββββββββ
def evict(self):
"""Free VRAM. NVMe files stay intact for fast restore()."""
if not self._in_vram:
return
self._gpu_index = None
self._gpu_vecs = None
if _HIPVS_AVAILABLE or _TORCH_CUDA_AVAILABLE:
import torch
torch.cuda.empty_cache()
self._in_vram = False
logger.info(f"[{self.name}] Evicted from VRAM")
def restore(self):
"""
Restore index from NVMe to VRAM via async pinned-memory copy.
Does NOT re-embed or re-read source files.
"""
if self._in_vram:
return
if _HIPVS_AVAILABLE and self._cagra_file.exists():
logger.info(f"[{self.name}] Restoring CAGRA from NVMe (async) ...")
self._gpu_index = _cagra.deserialize(str(self._cagra_file))
self._in_vram = True
logger.info(f"[{self.name}] CAGRA restored to VRAM")
elif _TORCH_CUDA_AVAILABLE and self._vectors is not None:
import torch
# Pinned memory -> VRAM (async DMA copy)
pinned = torch.from_numpy(self._vectors).pin_memory()
self._gpu_vecs = pinned.to("cuda", non_blocking=True, dtype=torch.float16)
self._in_vram = True
logger.info(f"[{self.name}] Flat tensor restored to VRAM (async)")
# Load IDs if needed
if not self._ids and self._npz_file.exists():
data = np.load(self._npz_file, allow_pickle=True)
self._ids = data["ids"].tolist()
if self._meta_file.exists():
with open(self._meta_file, "r") as f:
self._metadata = json.load(f)
# ββ Persistence ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _persist(self):
self._save_to_disk()
if HF_DATASET_REPO and HF_TOKEN:
_hf_save(self.name, self._ids, self._vectors, self._metadata)
def _save_to_disk(self):
if self._vectors is None:
return
np.savez_compressed(self._npz_file, vectors=self._vectors, ids=np.array(self._ids, dtype=object))
with open(self._meta_file, "w") as f:
json.dump(self._metadata, f)
def _load_from_disk(self):
try:
data = np.load(self._npz_file, allow_pickle=True)
self._vectors = data["vectors"].astype(np.float32)
self._ids = data["ids"].tolist()
if self._meta_file.exists():
with open(self._meta_file, "r") as f:
self._metadata = json.load(f)
else:
self._metadata = [{} for _ in self._ids]
logger.info(f"[{self.name}] Loaded {len(self._ids)} vectors from disk")
except Exception as e:
logger.error(f"[{self.name}] Disk load failed: {e}")
def _load_from_hf(self):
ds = _hf_load(self.name)
if ds is None or len(ds) == 0:
return
try:
self._ids = ds["id"]
self._vectors = np.array(ds["vector"], dtype=np.float32)
self._metadata = [json.loads(m) for m in ds["metadata"]]
self._save_to_disk()
except Exception as e:
logger.error(f"[{self.name}] HF load failed: {e}")
def _normalize(self):
if self._vectors is None:
return
norms = np.linalg.norm(self._vectors, axis=1, keepdims=True)
norms = np.where(norms == 0, 1, norms)
self._vectors = self._vectors / norms
# ββ Utilities ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def clear(self):
self._vectors = None
self._ids = []
self._metadata = []
self._gpu_index = None
self._gpu_vecs = None
self._in_vram = False
for f in (self._npz_file, self._meta_file, self._cagra_file):
if f.exists():
f.unlink()
def has_data(self) -> bool:
return self._vectors is not None and len(self._ids) > 0
@property
def count(self) -> int:
return len(self._ids) if self._ids else 0
@property
def in_vram(self) -> bool:
return self._in_vram
@property
def mode(self) -> str:
if _HIPVS_AVAILABLE:
return "CAGRA (hipVS GPU)"
elif _TORCH_CUDA_AVAILABLE:
return "Flat Tensor (GPU)"
return "NumPy (CPU)"
def __len__(self):
return self.count
def __repr__(self):
vram = "VRAM" if self._in_vram else "NVMe"
return f"VectorStore('{self.name}', n={self.count}, {self.mode}, {vram})"
# ββ Multi-Project Store Registry ββββββββββββββββββββββββββββββββββββββββββββ
_stores: dict[str, VectorStore] = {}
_lock = threading.Lock()
def get_store(project: str, index_name: str) -> VectorStore:
"""
Get or create a VectorStore for a specific project + index.
Stores are cached globally and share the same GPU memory pool.
"""
key = f"{project}/{index_name}"
with _lock:
if key not in _stores:
proj_dir = get_project_dir(project)
idx_dir = proj_dir / "indexes"
_stores[key] = VectorStore(index_name, index_dir=idx_dir)
logger.info(f"Store created: {_stores[key]}")
return _stores[key]
def list_projects() -> list[str]:
"""List all projects that have at least one index file."""
from config import PROJECTS_DIR
projects = []
if PROJECTS_DIR.exists():
for p in sorted(PROJECTS_DIR.iterdir()):
if p.is_dir():
projects.append(p.name)
return projects
def evict_all():
"""Evict all stores from VRAM."""
with _lock:
for store in _stores.values():
if store.in_vram:
store.evict()
|