ROCKIT-Vision-Intelligence / vector_store.py
Billavenu's picture
adding filleeeesssss
fb12ddc verified
# HF_Space_hipVS/vector_store.py
# ================================
# Multi-project vector store with 3-tier GPU acceleration.
#
# Key design:
# - Each project gets its own VectorStore instances (image_index, video_index)
# - CAGRA is rebuilt on every insert (optimized for query, not ingestion)
# - Indexes swap between NVMe and VRAM via async pinned-memory DMA
# - Multiple projects coexist by LRU-evicting cold indexes from VRAM
#
# Tiers:
# 1. CAGRA graph (hipVS / cuVS) β€” ANN search in ~50us
# 2. PyTorch flat tensor (hipBLAS matmul) β€” brute-force GPU
# 3. NumPy CPU cosine similarity β€” works everywhere
import json
import logging
import threading
import numpy as np
from pathlib import Path
from config import USE_GPU, HF_TOKEN, HF_DATASET_REPO, SWAP_PATH, get_project_dir
logger = logging.getLogger(__name__)
# ── GPU Backend Detection ────────────────────────────────────────────────────
_HIPVS_AVAILABLE = False
_TORCH_CUDA_AVAILABLE = False
_cagra = None
if USE_GPU:
try:
from cuvs.neighbors import cagra as _cagra_mod
_cagra = _cagra_mod
_HIPVS_AVAILABLE = True
logger.info("Tier 1: hipVS (cuvs) -- CAGRA index enabled")
except ImportError:
pass
if not _HIPVS_AVAILABLE:
try:
import torch
if torch.cuda.is_available():
_TORCH_CUDA_AVAILABLE = True
props = torch.cuda.get_device_properties(0)
name = props.name.lower()
backend = "ROCm" if ("amd" in name or "radeon" in name) else "CUDA"
logger.info(f"Tier 2: PyTorch {backend} -- flat GPU search ({props.name})")
except ImportError:
pass
if not _HIPVS_AVAILABLE and not _TORCH_CUDA_AVAILABLE:
logger.info("Tier 3: NumPy CPU vector search")
# ── HF Dataset Persistence ──────────────────────────────────────────────────
def _hf_save(name: str, ids: list[str], vectors: np.ndarray, metadata: list[dict]):
if not HF_DATASET_REPO or not HF_TOKEN:
return
try:
from datasets import Dataset
records = [
{"id": ids[i], "vector": vectors[i].tolist(), "metadata": json.dumps(metadata[i])}
for i in range(len(ids))
]
ds = Dataset.from_list(records)
repo = f"{HF_DATASET_REPO}-{name}"
ds.push_to_hub(repo, token=HF_TOKEN, private=True)
logger.info(f"[{name}] Pushed {len(records)} vectors to HF Dataset")
except Exception as e:
logger.warning(f"[{name}] HF push failed: {e}")
def _hf_load(name: str):
if not HF_DATASET_REPO or not HF_TOKEN:
return None
try:
from datasets import load_dataset
repo = f"{HF_DATASET_REPO}-{name}"
ds = load_dataset(repo, token=HF_TOKEN, split="train")
logger.info(f"[{name}] Loaded {len(ds)} vectors from HF Dataset")
return ds
except Exception:
return None
# ── VectorStore ──────────────────────────────────────────────────────────────
class VectorStore:
"""
GPU-backed vector store with NVMe swap and CAGRA rebuild-on-insert.
Lifecycle:
1. add(vectors, ids, meta) β€” bulk add + CAGRA rebuild + persist
2. append(vector, id, meta) β€” single add, NO rebuild (caller decides)
3. append_and_rebuild(v, id, meta) β€” single add + CAGRA rebuild + persist
4. search(query, top_k) β€” search (auto-loads from NVMe if needed)
5. evict() β€” free VRAM, keep NVMe
6. restore() β€” NVMe -> VRAM (async, pinned DMA)
"""
def __init__(self, name: str, index_dir: Path | None = None):
self.name = name
self._index_dir = index_dir or SWAP_PATH
self._index_dir.mkdir(parents=True, exist_ok=True)
self._vectors: np.ndarray | None = None
self._ids: list[str] = []
self._metadata: list[dict] = []
# GPU state
self._gpu_index = None # CAGRA index object
self._gpu_vecs = None # torch tensor (flat fallback)
self._in_vram = False
# File paths
self._npz_file = self._index_dir / f"{name}.npz"
self._meta_file = self._index_dir / f"{name}_meta.json"
self._cagra_file = self._index_dir / f"{name}.cagra"
# Load from disk on init
if self._npz_file.exists():
self._load_from_disk()
else:
self._load_from_hf()
# ── Add / Append ─────────────────────────────────────────────────────────
def add(self, vectors: np.ndarray, ids: list[str], metadata: list[dict] | None = None):
"""Bulk add vectors + rebuild CAGRA + persist."""
if len(vectors) == 0:
return
self._vectors = vectors.astype(np.float32)
self._ids = list(ids)
self._metadata = metadata or [{} for _ in ids]
self._normalize()
self.rebuild_gpu_index()
self._persist()
logger.info(f"[{self.name}] Indexed {len(ids)} vectors (mode={self.mode})")
def append(self, vector: np.ndarray, vid: str, meta: dict | None = None):
"""Append one vector. NO CAGRA rebuild (batch callers rebuild at end)."""
vector = vector.astype(np.float32).reshape(1, -1)
norm = np.linalg.norm(vector)
if norm > 0:
vector = vector / norm
if self._vectors is not None and len(self._vectors) > 0:
self._vectors = np.vstack([self._vectors, vector])
else:
self._vectors = vector
self._ids.append(vid)
self._metadata.append(meta or {})
self._in_vram = False # invalidate GPU index
def append_and_rebuild(self, vector: np.ndarray, vid: str, meta: dict | None = None):
"""Append one vector + rebuild CAGRA + persist."""
self.append(vector, vid, meta)
self.rebuild_gpu_index()
self._persist()
# ── Search ───────────────────────────────────────────────────────────────
def search(self, query: np.ndarray, top_k: int = 10) -> list[dict]:
"""
Cosine similarity search. Auto-restores from NVMe if not in VRAM.
Returns list of dicts: {id, score, ...metadata}
"""
if self._vectors is None or len(self._vectors) == 0:
return []
query = query.astype(np.float32)
norm = np.linalg.norm(query)
if norm > 0:
query = query / norm
# Auto-load GPU index if needed
if ((_HIPVS_AVAILABLE or _TORCH_CUDA_AVAILABLE) and not self._in_vram):
self.rebuild_gpu_index()
if _HIPVS_AVAILABLE and self._gpu_index is not None:
return self._search_cagra(query, top_k)
elif _TORCH_CUDA_AVAILABLE and self._in_vram:
return self._search_torch(query, top_k)
return self._search_numpy(query, top_k)
def _search_numpy(self, query: np.ndarray, top_k: int) -> list[dict]:
scores = self._vectors @ query
k = min(top_k, len(self._ids))
if len(scores) > top_k:
idx = np.argpartition(scores, -k)[-k:]
idx = idx[np.argsort(scores[idx])[::-1]]
else:
idx = np.argsort(scores)[::-1][:k]
return [{"id": self._ids[i], "score": float(scores[i]), **self._metadata[i]} for i in idx]
def _search_cagra(self, query: np.ndarray, top_k: int) -> list[dict]:
import cupy as cp
q = cp.asarray(query.reshape(1, -1))
search_params = _cagra.SearchParams()
distances, indices = _cagra.search(search_params, self._gpu_index, q, top_k)
results = []
for idx, dist in zip(indices[0].get().tolist(), distances[0].get().tolist()):
if 0 <= idx < len(self._ids):
results.append({"id": self._ids[idx], "score": -float(dist), **self._metadata[idx]})
return results
def _search_torch(self, query: np.ndarray, top_k: int) -> list[dict]:
import torch
q = torch.from_numpy(query).to(self._gpu_vecs.device, dtype=self._gpu_vecs.dtype).unsqueeze(0)
scores = (q @ self._gpu_vecs.T).squeeze(0)
k = min(top_k, len(self._ids))
top_scores, top_idx = torch.topk(scores, k=k)
return [
{"id": self._ids[i], "score": float(s), **self._metadata[i]}
for i, s in zip(top_idx.cpu().tolist(), top_scores.cpu().tolist())
]
# ── GPU Index Build (CAGRA rebuilt on every insert) ──────────────────────
def rebuild_gpu_index(self):
"""Build/rebuild the GPU index from current vectors."""
if self._vectors is None or len(self._vectors) == 0:
return
if _HIPVS_AVAILABLE:
self._build_cagra()
elif _TORCH_CUDA_AVAILABLE:
self._build_torch()
def _build_cagra(self):
import cupy as cp
d_vecs = cp.asarray(self._vectors)
params = _cagra.IndexParams()
params.metric = "sqeuclidean"
params.graph_degree = 64
params.intermediate_graph_degree = 128
params.build_algo = "IVF_PQ"
logger.info(f"[{self.name}] Building CAGRA ({self._vectors.shape}) ...")
self._gpu_index = _cagra.build(params, d_vecs)
# Serialize to NVMe for fast restore after eviction
_cagra.serialize(str(self._cagra_file), self._gpu_index)
self._in_vram = True
logger.info(f"[{self.name}] CAGRA built + serialized")
def _build_torch(self):
import torch
self._gpu_vecs = torch.from_numpy(self._vectors).cuda().half()
self._in_vram = True
# ── NVMe <-> VRAM Swap ───────────────────────────────────────────────────
def evict(self):
"""Free VRAM. NVMe files stay intact for fast restore()."""
if not self._in_vram:
return
self._gpu_index = None
self._gpu_vecs = None
if _HIPVS_AVAILABLE or _TORCH_CUDA_AVAILABLE:
import torch
torch.cuda.empty_cache()
self._in_vram = False
logger.info(f"[{self.name}] Evicted from VRAM")
def restore(self):
"""
Restore index from NVMe to VRAM via async pinned-memory copy.
Does NOT re-embed or re-read source files.
"""
if self._in_vram:
return
if _HIPVS_AVAILABLE and self._cagra_file.exists():
logger.info(f"[{self.name}] Restoring CAGRA from NVMe (async) ...")
self._gpu_index = _cagra.deserialize(str(self._cagra_file))
self._in_vram = True
logger.info(f"[{self.name}] CAGRA restored to VRAM")
elif _TORCH_CUDA_AVAILABLE and self._vectors is not None:
import torch
# Pinned memory -> VRAM (async DMA copy)
pinned = torch.from_numpy(self._vectors).pin_memory()
self._gpu_vecs = pinned.to("cuda", non_blocking=True, dtype=torch.float16)
self._in_vram = True
logger.info(f"[{self.name}] Flat tensor restored to VRAM (async)")
# Load IDs if needed
if not self._ids and self._npz_file.exists():
data = np.load(self._npz_file, allow_pickle=True)
self._ids = data["ids"].tolist()
if self._meta_file.exists():
with open(self._meta_file, "r") as f:
self._metadata = json.load(f)
# ── Persistence ──────────────────────────────────────────────────────────
def _persist(self):
self._save_to_disk()
if HF_DATASET_REPO and HF_TOKEN:
_hf_save(self.name, self._ids, self._vectors, self._metadata)
def _save_to_disk(self):
if self._vectors is None:
return
np.savez_compressed(self._npz_file, vectors=self._vectors, ids=np.array(self._ids, dtype=object))
with open(self._meta_file, "w") as f:
json.dump(self._metadata, f)
def _load_from_disk(self):
try:
data = np.load(self._npz_file, allow_pickle=True)
self._vectors = data["vectors"].astype(np.float32)
self._ids = data["ids"].tolist()
if self._meta_file.exists():
with open(self._meta_file, "r") as f:
self._metadata = json.load(f)
else:
self._metadata = [{} for _ in self._ids]
logger.info(f"[{self.name}] Loaded {len(self._ids)} vectors from disk")
except Exception as e:
logger.error(f"[{self.name}] Disk load failed: {e}")
def _load_from_hf(self):
ds = _hf_load(self.name)
if ds is None or len(ds) == 0:
return
try:
self._ids = ds["id"]
self._vectors = np.array(ds["vector"], dtype=np.float32)
self._metadata = [json.loads(m) for m in ds["metadata"]]
self._save_to_disk()
except Exception as e:
logger.error(f"[{self.name}] HF load failed: {e}")
def _normalize(self):
if self._vectors is None:
return
norms = np.linalg.norm(self._vectors, axis=1, keepdims=True)
norms = np.where(norms == 0, 1, norms)
self._vectors = self._vectors / norms
# ── Utilities ────────────────────────────────────────────────────────────
def clear(self):
self._vectors = None
self._ids = []
self._metadata = []
self._gpu_index = None
self._gpu_vecs = None
self._in_vram = False
for f in (self._npz_file, self._meta_file, self._cagra_file):
if f.exists():
f.unlink()
def has_data(self) -> bool:
return self._vectors is not None and len(self._ids) > 0
@property
def count(self) -> int:
return len(self._ids) if self._ids else 0
@property
def in_vram(self) -> bool:
return self._in_vram
@property
def mode(self) -> str:
if _HIPVS_AVAILABLE:
return "CAGRA (hipVS GPU)"
elif _TORCH_CUDA_AVAILABLE:
return "Flat Tensor (GPU)"
return "NumPy (CPU)"
def __len__(self):
return self.count
def __repr__(self):
vram = "VRAM" if self._in_vram else "NVMe"
return f"VectorStore('{self.name}', n={self.count}, {self.mode}, {vram})"
# ── Multi-Project Store Registry ────────────────────────────────────────────
_stores: dict[str, VectorStore] = {}
_lock = threading.Lock()
def get_store(project: str, index_name: str) -> VectorStore:
"""
Get or create a VectorStore for a specific project + index.
Stores are cached globally and share the same GPU memory pool.
"""
key = f"{project}/{index_name}"
with _lock:
if key not in _stores:
proj_dir = get_project_dir(project)
idx_dir = proj_dir / "indexes"
_stores[key] = VectorStore(index_name, index_dir=idx_dir)
logger.info(f"Store created: {_stores[key]}")
return _stores[key]
def list_projects() -> list[str]:
"""List all projects that have at least one index file."""
from config import PROJECTS_DIR
projects = []
if PROJECTS_DIR.exists():
for p in sorted(PROJECTS_DIR.iterdir()):
if p.is_dir():
projects.append(p.name)
return projects
def evict_all():
"""Evict all stores from VRAM."""
with _lock:
for store in _stores.values():
if store.in_vram:
store.evict()