|
|
"""
|
|
|
Simple pluggable VectorStore with a FAISS adapter and a numpy brute-force fallback.
|
|
|
|
|
|
This file provides:
|
|
|
- EmbeddingAdapter: deterministic text->vector adapter for development.
|
|
|
- VectorStore: in-memory store that uses FAISS when available.
|
|
|
- get_global_vector_store(): convenience singleton for the app to reuse.
|
|
|
|
|
|
Designed to be lightweight and safe to import when FAISS is not installed.
|
|
|
"""
|
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
|
import hashlib
|
|
|
import numpy as np
|
|
|
try:
|
|
|
import faiss
|
|
|
except Exception:
|
|
|
faiss = None
|
|
|
|
|
|
|
|
|
class EmbeddingAdapter:
|
|
|
"""Deterministic embedding adapter for development.
|
|
|
|
|
|
It hashes the input text and produces a fixed-size float vector. Not
|
|
|
a production-quality embedder but useful for development and tests.
|
|
|
"""
|
|
|
def __init__(self, dim: int = 128):
|
|
|
self.dim = dim
|
|
|
|
|
|
def embed(self, text: str) -> np.ndarray:
|
|
|
h = hashlib.sha256(text.encode("utf-8")).digest()
|
|
|
|
|
|
needed = self.dim
|
|
|
data = (h * ((needed * 32) // len(h) + 1))[:needed]
|
|
|
arr = np.frombuffer(data, dtype=np.uint8).astype(np.float32)
|
|
|
|
|
|
if arr.sum() == 0:
|
|
|
return np.zeros(self.dim, dtype=np.float32)
|
|
|
vec = arr / np.linalg.norm(arr)
|
|
|
return vec
|
|
|
|
|
|
|
|
|
class VectorStore:
|
|
|
def __init__(self, dim: int = 128):
|
|
|
self.dim = dim
|
|
|
self._emb = EmbeddingAdapter(dim=dim)
|
|
|
self._meta: Dict[str, Dict[str, Any]] = {}
|
|
|
self._vectors: Dict[str, np.ndarray] = {}
|
|
|
self._faiss_index = None
|
|
|
self._use_faiss = False
|
|
|
self._build_index()
|
|
|
|
|
|
def _build_index(self):
|
|
|
if faiss is None:
|
|
|
self._use_faiss = False
|
|
|
self._faiss_index = None
|
|
|
return
|
|
|
try:
|
|
|
index = faiss.IndexFlatL2(self.dim)
|
|
|
self._faiss_index = index
|
|
|
self._use_faiss = True
|
|
|
except Exception:
|
|
|
self._use_faiss = False
|
|
|
self._faiss_index = None
|
|
|
|
|
|
def add_vector(self, id: str, vector: np.ndarray, metadata: Optional[Dict[str, Any]] = None):
|
|
|
v = np.asarray(vector, dtype=np.float32)
|
|
|
if v.shape != (self.dim,):
|
|
|
raise ValueError(f"vector must have shape ({self.dim},), got {v.shape}")
|
|
|
self._vectors[id] = v
|
|
|
self._meta[id] = metadata or {}
|
|
|
if self._use_faiss and self._faiss_index is not None:
|
|
|
try:
|
|
|
|
|
|
self._faiss_index.add(np.expand_dims(v, axis=0))
|
|
|
except Exception:
|
|
|
|
|
|
self._rebuild_faiss_index()
|
|
|
|
|
|
def add_text(self, id: str, text: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
vec = self._emb.embed(text)
|
|
|
self.add_vector(id, vec, metadata)
|
|
|
|
|
|
def _rebuild_faiss_index(self):
|
|
|
if faiss is None:
|
|
|
return
|
|
|
try:
|
|
|
index = faiss.IndexFlatL2(self.dim)
|
|
|
if len(self._vectors) > 0:
|
|
|
mats = np.stack(list(self._vectors.values(), axis=0).astype(np.float32))
|
|
|
index.add(mats)
|
|
|
self._faiss_index = index
|
|
|
self._use_faiss = True
|
|
|
except Exception:
|
|
|
self._faiss_index = None
|
|
|
self._use_faiss = False
|
|
|
|
|
|
def get(self, id: str) -> Optional[Dict[str, Any]]:
|
|
|
if id not in self._vectors:
|
|
|
return None
|
|
|
return {"id": id, "vector": self._vectors[id], "metadata": self._meta.get(id, {})}
|
|
|
|
|
|
def query_vector(self, vector: np.ndarray, k: int = 5) -> List[Tuple[str, float, Dict[str, Any]]]:
|
|
|
v = np.asarray(vector, dtype=np.float32)
|
|
|
if self._use_faiss and self._faiss_index is not None:
|
|
|
D, I = self._faiss_index.search(np.expand_dims(v, axis=0), k)
|
|
|
|
|
|
results: List[Tuple[str, float, Dict[str, Any]]] = []
|
|
|
ids = list(self._vectors.keys())
|
|
|
for dist, idx in zip(D[0], I[0]):
|
|
|
if idx < 0 or idx >= len(ids):
|
|
|
continue
|
|
|
rid = ids[idx]
|
|
|
results.append((rid, float(dist), self._meta.get(rid, {})))
|
|
|
return results
|
|
|
|
|
|
results = []
|
|
|
for rid, rv in self._vectors.items():
|
|
|
dist = float(np.linalg.norm(rv - v))
|
|
|
results.append((rid, dist, self._meta.get(rid, {})))
|
|
|
results.sort(key=lambda x: x[1])
|
|
|
return results[:k]
|
|
|
|
|
|
def query_text(self, text: str, k: int = 5) -> List[Tuple[str, float, Dict[str, Any]]]:
|
|
|
vec = self._emb.embed(text)
|
|
|
return self.query_vector(vec, k=k)
|
|
|
|
|
|
|
|
|
|
|
|
_GLOBAL_STORE: Optional[VectorStore] = None
|
|
|
|
|
|
|
|
|
def get_global_vector_store() -> VectorStore:
|
|
|
global _GLOBAL_STORE
|
|
|
if _GLOBAL_STORE is None:
|
|
|
_GLOBAL_STORE = VectorStore()
|
|
|
return _GLOBAL_STORE
|
|
|
"""Simple pluggable vector store with FAISS backend and numpy fallback.
|
|
|
|
|
|
This file provides a minimal VectorStore interface used by the REST API.
|
|
|
It intentionally keeps dependencies optional: if `faiss` isn't installed the
|
|
|
implementation falls back to an in-memory numpy-based nearest-neighbour search
|
|
|
for development and testing.
|
|
|
"""
|
|
|
from typing import List, Optional, Dict, Any
|
|
|
try:
|
|
|
import faiss
|
|
|
FAISS_AVAILABLE = True
|
|
|
except Exception:
|
|
|
faiss = None
|
|
|
FAISS_AVAILABLE = False
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
class VectorStore:
|
|
|
"""A tiny vector store abstraction.
|
|
|
|
|
|
- `add(ids, vectors, metas)` stores vectors and optional metadata.
|
|
|
- `search(query_vector, top_k)` returns nearest neighbours with scores.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, dim: int = 128):
|
|
|
self.dim = dim
|
|
|
if FAISS_AVAILABLE:
|
|
|
|
|
|
self.index = faiss.IndexFlatL2(dim)
|
|
|
self._id_map: Dict[int, Any] = {}
|
|
|
self._next_index = 0
|
|
|
else:
|
|
|
self.vectors = np.zeros((0, dim), dtype=np.float32)
|
|
|
self.ids: List[str] = []
|
|
|
self.metas: Dict[str, Any] = {}
|
|
|
|
|
|
def add(self, ids: List[str], vectors: np.ndarray, metas: Optional[List[Any]] = None) -> int:
|
|
|
"""Add vectors to the store.
|
|
|
|
|
|
Args:
|
|
|
ids: list of string IDs (one per vector).
|
|
|
vectors: numpy array of shape (N, dim).
|
|
|
metas: optional list of metadata objects parallel to ids.
|
|
|
|
|
|
Returns:
|
|
|
number of indexed vectors after insertion.
|
|
|
"""
|
|
|
vecs = np.asarray(vectors, dtype=np.float32)
|
|
|
if vecs.ndim != 2 or vecs.shape[1] != self.dim:
|
|
|
raise ValueError(f"vectors must be shape (N, {self.dim})")
|
|
|
|
|
|
if FAISS_AVAILABLE:
|
|
|
self.index.add(vecs)
|
|
|
for i, id_ in enumerate(ids):
|
|
|
self._id_map[self._next_index] = {"id": id_, "meta": metas[i] if metas else None}
|
|
|
self._next_index += 1
|
|
|
return int(self.index.ntotal)
|
|
|
else:
|
|
|
if self.vectors.size == 0:
|
|
|
self.vectors = vecs
|
|
|
else:
|
|
|
self.vectors = np.vstack([self.vectors, vecs])
|
|
|
self.ids.extend(ids)
|
|
|
if metas:
|
|
|
for i, id_ in enumerate(ids):
|
|
|
self.metas[id_] = metas[i]
|
|
|
return len(self.ids)
|
|
|
|
|
|
def search(self, query_vector: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""Return nearest neighbours as a list of {id, score, meta}.
|
|
|
|
|
|
Score is L2 distance (lower is better).
|
|
|
"""
|
|
|
q = np.asarray(query_vector, dtype=np.float32)
|
|
|
if q.ndim == 1:
|
|
|
q = q.reshape(1, -1)
|
|
|
if q.shape[1] != self.dim:
|
|
|
raise ValueError(f"query_vector must have dimension {self.dim}")
|
|
|
|
|
|
if FAISS_AVAILABLE:
|
|
|
D, I = self.index.search(q, top_k)
|
|
|
results = []
|
|
|
for dist, idx in zip(D[0], I[0]):
|
|
|
if idx < 0:
|
|
|
continue
|
|
|
info = self._id_map.get(int(idx), {"id": str(idx), "meta": None})
|
|
|
results.append({"id": info["id"], "score": float(dist), "meta": info.get("meta")})
|
|
|
return results
|
|
|
else:
|
|
|
if self.vectors.shape[0] == 0:
|
|
|
return []
|
|
|
|
|
|
diffs = self.vectors - q
|
|
|
dists = np.linalg.norm(diffs, axis=1)
|
|
|
idxs = np.argsort(dists)[:top_k]
|
|
|
out = []
|
|
|
for i in idxs:
|
|
|
out.append({"id": self.ids[int(i)], "score": float(dists[int(i)]), "meta": self.metas.get(self.ids[int(i)])})
|
|
|
return out
|
|
|
|
|
|
|
|
|
_default_store: Optional[VectorStore] = None
|
|
|
|
|
|
|
|
|
def get_default_store(dim: int = 128) -> VectorStore:
|
|
|
global _default_store
|
|
|
if _default_store is None:
|
|
|
_default_store = VectorStore(dim=dim)
|
|
|
return _default_store
|
|
|
"""Simple pluggable vector store with FAISS backend (optional) and numpy brute-force fallback.
|
|
|
|
|
|
This module provides a lightweight interface used by the API for indexing and nearest-neighbor
|
|
|
search. FAISS is optional; if it's not installed the implementation falls back to an in-memory
|
|
|
brute-force search using NumPy (if available) or pure Python.
|
|
|
"""
|
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
|
|
|
try:
|
|
|
import faiss
|
|
|
_has_faiss = True
|
|
|
except Exception:
|
|
|
faiss = None
|
|
|
_has_faiss = False
|
|
|
|
|
|
try:
|
|
|
import numpy as np
|
|
|
_has_numpy = True
|
|
|
except Exception:
|
|
|
np = None
|
|
|
_has_numpy = False
|
|
|
|
|
|
|
|
|
class VectorStore:
|
|
|
"""In-memory vector store with optional FAISS acceleration.
|
|
|
|
|
|
Usage:
|
|
|
store = VectorStore(dim=128)
|
|
|
store.add('id1', vector, metadata={...})
|
|
|
results = store.search(query_vector, k=5)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, dim: int = 128, use_faiss: bool = True):
|
|
|
self.dim = dim
|
|
|
self.ids: List[str] = []
|
|
|
self.vectors: List = []
|
|
|
self.metadatas: List[Dict] = []
|
|
|
self._index = None
|
|
|
|
|
|
self._use_faiss = use_faiss and _has_faiss
|
|
|
if self._use_faiss:
|
|
|
|
|
|
self._index = faiss.IndexFlatIP(dim)
|
|
|
|
|
|
def _ensure_numpy(self, vec):
|
|
|
if _has_numpy:
|
|
|
return np.asarray(vec, dtype=np.float32)
|
|
|
return vec
|
|
|
|
|
|
def add(self, id: str, vector, metadata: Optional[Dict] = None):
|
|
|
metadata = metadata or {}
|
|
|
vec = self._ensure_numpy(vector)
|
|
|
self.ids.append(id)
|
|
|
self.vectors.append(vec)
|
|
|
self.metadatas.append(metadata)
|
|
|
if self._use_faiss:
|
|
|
|
|
|
arr = np.asarray(vec, dtype=np.float32).reshape(1, -1)
|
|
|
self._index.add(arr)
|
|
|
|
|
|
def search(self, query_vector, k: int = 5) -> List[Tuple[str, float, Dict]]:
|
|
|
"""Return list of (id, score, metadata) ordered by descending score.
|
|
|
|
|
|
Score semantics: if FAISS IndexFlatIP is used, it's inner product. The
|
|
|
fallback uses cosine similarity when numpy is available.
|
|
|
"""
|
|
|
if len(self.ids) == 0:
|
|
|
return []
|
|
|
|
|
|
q = self._ensure_numpy(query_vector)
|
|
|
|
|
|
if self._use_faiss:
|
|
|
q_arr = np.asarray(q, dtype=np.float32).reshape(1, -1)
|
|
|
D, I = self._index.search(q_arr, min(k, len(self.ids)))
|
|
|
results = []
|
|
|
for score, idx in zip(D[0].tolist(), I[0].tolist()):
|
|
|
if idx < 0:
|
|
|
continue
|
|
|
results.append((self.ids[idx], float(score), self.metadatas[idx]))
|
|
|
return results
|
|
|
|
|
|
|
|
|
if _has_numpy:
|
|
|
mats = np.vstack([np.asarray(v, dtype=np.float32).reshape(1, -1) for v in self.vectors])
|
|
|
qv = np.asarray(q, dtype=np.float32).reshape(-1)
|
|
|
|
|
|
norms = np.linalg.norm(mats, axis=1) * (np.linalg.norm(qv) + 1e-12)
|
|
|
sims = (mats.dot(qv)) / (norms + 1e-12)
|
|
|
idxs = sims.argsort()[::-1][:k]
|
|
|
return [(self.ids[i], float(sims[i]), self.metadatas[i]) for i in idxs]
|
|
|
|
|
|
|
|
|
def dot(a, b):
|
|
|
return sum(x * y for x, y in zip(a, b))
|
|
|
|
|
|
scores = [dot(v, q) for v in self.vectors]
|
|
|
ordered = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k]
|
|
|
return [(self.ids[i], float(scores[i]), self.metadatas[i]) for i in ordered]
|
|
|
|
|
|
|
|
|
|
|
|
default_store = VectorStore(dim=128, use_faiss=True)
|
|
|
|
|
|
__all__ = ["VectorStore", "default_store"]
|
|
|
|