Spaces:
Sleeping
Sleeping
| """Faiss vector store service (replaces Qdrant)""" | |
| from typing import List, Dict, Any, Optional | |
| from app.config import settings | |
| from app.utils.logger import setup_logger | |
| import os | |
| import json | |
| import faiss | |
| import numpy as np | |
| logger = setup_logger(__name__) | |
| def _normalize(vectors: np.ndarray) -> np.ndarray: | |
| """L2-normalize vectors so inner product equals cosine similarity.""" | |
| norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-12 | |
| return vectors / norms | |
| class VectorStoreService: | |
| """ | |
| Manages a Faiss index + sidecar payload store. | |
| - Index: Faiss IndexFlatIP (cosine via normalization) | |
| - Payloads: JSON list aligned to vector IDs | |
| - Persistence: saves/loads index + payloads from disk | |
| """ | |
| def __init__(self): | |
| print("FAISS_INDEX_PATH:", settings.FAISS_INDEX_PATH) | |
| print("Working directory:", os.getcwd()) | |
| self.index: Optional[faiss.Index] = None | |
| self.dimension: Optional[int] = None | |
| self.payloads: List[Dict[str, Any]] = [] | |
| self.index_path = settings.FAISS_INDEX_PATH | |
| self.payloads_path = settings.FAISS_PAYLOADS_PATH | |
| self._load_if_exists() | |
| # ---------- Persistence ---------- | |
| def _load_if_exists(self): | |
| """Load index + payloads if the files exist.""" | |
| if os.path.exists(self.index_path) and os.path.exists(self.payloads_path): | |
| try: | |
| self.index = faiss.read_index(self.index_path) | |
| self.dimension = self.index.d # type: ignore[attr-defined] | |
| with open(self.payloads_path, "r", encoding="utf-8") as f: | |
| self.payloads = json.load(f) | |
| logger.info( | |
| f"Loaded Faiss index ({self.dimension}d) with {self.index.ntotal} vectors" # type: ignore | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to load Faiss store; starting fresh. Error: {e}") | |
| self.index = None | |
| self.payloads = [] | |
| self.dimension = None | |
| def _save(self): | |
| """Persist index + payloads to disk.""" | |
| if self.index is not None: | |
| faiss.write_index(self.index, self.index_path) | |
| with open(self.payloads_path, "w", encoding="utf-8") as f: | |
| json.dump(self.payloads, f, ensure_ascii=False) | |
| # ---------- Collection lifecycle ---------- | |
| def create_collection(self, vector_size: int): | |
| """ | |
| (Re)create a fresh Faiss index (cosine via normalized vectors). | |
| WARNING: This clears existing data. | |
| """ | |
| self.dimension = vector_size | |
| self.index = faiss.IndexFlatIP(vector_size) # inner product | |
| self.payloads = [] | |
| self._save() | |
| logger.info(f"Created Faiss collection: dim={vector_size}") | |
| # ---------- Upsert/Search ---------- | |
| def upsert_vectors( | |
| self, | |
| vectors: List[List[float]], | |
| payloads: List[Dict[str, Any]] | |
| ) -> int: | |
| """Insert vectors with metadata (IDs are implicit by order).""" | |
| if self.index is None: | |
| raise RuntimeError("Faiss index is not initialized. Call create_collection first.") | |
| arr = np.array(vectors, dtype="float32") | |
| arr = _normalize(arr) | |
| self.index.add(arr) # type: ignore | |
| self.payloads.extend(payloads) | |
| self._save() | |
| logger.info(f"Upserted {len(vectors)} vectors into Faiss") | |
| return len(vectors) | |
| def search( | |
| self, | |
| query_vector: List[float], | |
| limit: int = 5, | |
| score_threshold: float = 0.0 | |
| ) -> List[Dict[str, Any]]: | |
| """Search similar vectors via inner product (cosine).""" | |
| if self.index is None or self.index.ntotal == 0: # type: ignore | |
| return [] | |
| q = np.array([query_vector], dtype="float32") | |
| q = _normalize(q) | |
| scores, indices = self.index.search(q, limit) # type: ignore | |
| scores = scores[0].tolist() | |
| indices = indices[0].tolist() | |
| results: List[Dict[str, Any]] = [] | |
| for score, idx in zip(scores, indices): | |
| if idx == -1: | |
| continue | |
| if score < score_threshold: | |
| continue | |
| payload = self.payloads[idx] if 0 <= idx < len(self.payloads) else {} | |
| results.append({ | |
| "id": idx, | |
| "score": float(score), | |
| "payload": payload | |
| }) | |
| return results | |
| # ---------- Introspection/Access ---------- | |
| def get_collection_info(self) -> Dict[str, Any]: | |
| count = int(self.index.ntotal) if self.index is not None else 0 # type: ignore | |
| return { | |
| "vectors_count": count, | |
| "status": "ready" if count >= 0 else "uninitialized" | |
| } | |
| def get_all_payloads(self) -> List[Dict[str, Any]]: | |
| """Return all payloads (used by metrics).""" | |
| return list(self.payloads) | |
| def get_payloads_sample(self, limit: int = 100) -> List[Dict[str, Any]]: | |
| return self.payloads[:limit] | |
| # Global instance | |
| vector_store = VectorStoreService() | |