Spaces:
Sleeping
Sleeping
File size: 5,081 Bytes
402298d 8fb6d95 402298d 8fb6d95 402298d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
"""Faiss vector store service (replaces Qdrant)"""
from typing import List, Dict, Any, Optional
from app.config import settings
from app.utils.logger import setup_logger
import os
import json
import faiss
import numpy as np
logger = setup_logger(__name__)
def _normalize(vectors: np.ndarray) -> np.ndarray:
"""L2-normalize vectors so inner product equals cosine similarity."""
norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-12
return vectors / norms
class VectorStoreService:
"""
Manages a Faiss index + sidecar payload store.
- Index: Faiss IndexFlatIP (cosine via normalization)
- Payloads: JSON list aligned to vector IDs
- Persistence: saves/loads index + payloads from disk
"""
def __init__(self):
print("FAISS_INDEX_PATH:", settings.FAISS_INDEX_PATH)
print("Working directory:", os.getcwd())
self.index: Optional[faiss.Index] = None
self.dimension: Optional[int] = None
self.payloads: List[Dict[str, Any]] = []
self.index_path = settings.FAISS_INDEX_PATH
self.payloads_path = settings.FAISS_PAYLOADS_PATH
self._load_if_exists()
# ---------- Persistence ----------
def _load_if_exists(self):
"""Load index + payloads if the files exist."""
if os.path.exists(self.index_path) and os.path.exists(self.payloads_path):
try:
self.index = faiss.read_index(self.index_path)
self.dimension = self.index.d # type: ignore[attr-defined]
with open(self.payloads_path, "r", encoding="utf-8") as f:
self.payloads = json.load(f)
logger.info(
f"Loaded Faiss index ({self.dimension}d) with {self.index.ntotal} vectors" # type: ignore
)
except Exception as e:
logger.error(f"Failed to load Faiss store; starting fresh. Error: {e}")
self.index = None
self.payloads = []
self.dimension = None
def _save(self):
"""Persist index + payloads to disk."""
if self.index is not None:
faiss.write_index(self.index, self.index_path)
with open(self.payloads_path, "w", encoding="utf-8") as f:
json.dump(self.payloads, f, ensure_ascii=False)
# ---------- Collection lifecycle ----------
def create_collection(self, vector_size: int):
"""
(Re)create a fresh Faiss index (cosine via normalized vectors).
WARNING: This clears existing data.
"""
self.dimension = vector_size
self.index = faiss.IndexFlatIP(vector_size) # inner product
self.payloads = []
self._save()
logger.info(f"Created Faiss collection: dim={vector_size}")
# ---------- Upsert/Search ----------
def upsert_vectors(
self,
vectors: List[List[float]],
payloads: List[Dict[str, Any]]
) -> int:
"""Insert vectors with metadata (IDs are implicit by order)."""
if self.index is None:
raise RuntimeError("Faiss index is not initialized. Call create_collection first.")
arr = np.array(vectors, dtype="float32")
arr = _normalize(arr)
self.index.add(arr) # type: ignore
self.payloads.extend(payloads)
self._save()
logger.info(f"Upserted {len(vectors)} vectors into Faiss")
return len(vectors)
def search(
self,
query_vector: List[float],
limit: int = 5,
score_threshold: float = 0.0
) -> List[Dict[str, Any]]:
"""Search similar vectors via inner product (cosine)."""
if self.index is None or self.index.ntotal == 0: # type: ignore
return []
q = np.array([query_vector], dtype="float32")
q = _normalize(q)
scores, indices = self.index.search(q, limit) # type: ignore
scores = scores[0].tolist()
indices = indices[0].tolist()
results: List[Dict[str, Any]] = []
for score, idx in zip(scores, indices):
if idx == -1:
continue
if score < score_threshold:
continue
payload = self.payloads[idx] if 0 <= idx < len(self.payloads) else {}
results.append({
"id": idx,
"score": float(score),
"payload": payload
})
return results
# ---------- Introspection/Access ----------
def get_collection_info(self) -> Dict[str, Any]:
count = int(self.index.ntotal) if self.index is not None else 0 # type: ignore
return {
"vectors_count": count,
"status": "ready" if count >= 0 else "uninitialized"
}
def get_all_payloads(self) -> List[Dict[str, Any]]:
"""Return all payloads (used by metrics)."""
return list(self.payloads)
def get_payloads_sample(self, limit: int = 100) -> List[Dict[str, Any]]:
return self.payloads[:limit]
# Global instance
vector_store = VectorStoreService()
|