import os import pickle from typing import List import faiss import numpy as np from app.services.neon_index_store import NeonIndexStore class LocalVectorStore: def __init__( self, base_dir: str, rag_index_db_url: str = "", neon_max_retries: int = 5, neon_retry_backoff_sec: float = 1.0, neon_connect_timeout_sec: int = 10, ) -> None: self.base_dir = base_dir self.cloud_store = ( NeonIndexStore( rag_index_db_url, max_retries=neon_max_retries, base_backoff_sec=neon_retry_backoff_sec, connect_timeout_sec=neon_connect_timeout_sec, ) if rag_index_db_url else None ) self._hydration_error_logged_semesters: set[int] = set() self._setup_error_logged = False os.makedirs(self.base_dir, exist_ok=True) if self.cloud_store: try: self.cloud_store.ensure_table() except Exception as exc: if not self._setup_error_logged: print(f"Neon index table setup skipped: {exc}") self._setup_error_logged = True def upsert_documents( self, semester: int, course_code: str, chunks: List[str], embeddings: List[List[float]], ) -> None: self._ensure_local_semester_data(semester) records_path = self._semester_records_path(semester) records = self._load_records(records_path) records = [r for r in records if r.get("course_code") != course_code] for chunk, vector in zip(chunks, embeddings): records.append( { "course_code": course_code, "chunk": chunk, "embedding": vector, } ) self._save_records(records_path, records) self._rebuild_faiss_index(semester, records) self._sync_semester_to_cloud(semester) def search(self, semester: int, query_embedding: List[float], top_k: int = 6) -> List[dict]: self._ensure_local_semester_data(semester) records_path = self._semester_records_path(semester) index_path = self._semester_index_path(semester) records = self._load_records(records_path) if not records: return [] if not os.path.exists(index_path): self._rebuild_faiss_index(semester, records) if not os.path.exists(index_path): return [] index = faiss.read_index(index_path) q = np.array(query_embedding, dtype=np.float32).reshape(1, -1) faiss.normalize_L2(q) k = min(top_k, len(records)) _, indices = index.search(q, k) hits = [] for idx in indices[0].tolist(): if idx == -1: continue if 0 <= idx < len(records): record = records[idx] hits.append( { "course_code": record.get("course_code", ""), "chunk": record.get("chunk", ""), } ) return hits def _semester_records_path(self, semester: int) -> str: return os.path.join(self.base_dir, f"semester_{semester}.pkl") def _semester_index_path(self, semester: int) -> str: return os.path.join(self.base_dir, f"semester_{semester}.faiss") def _rebuild_faiss_index(self, semester: int, records: List[dict]) -> None: index_path = self._semester_index_path(semester) if not records: if os.path.exists(index_path): os.remove(index_path) return vectors = np.array([r["embedding"] for r in records], dtype=np.float32) if vectors.ndim != 2 or vectors.shape[0] == 0: return faiss.normalize_L2(vectors) dim = vectors.shape[1] index = faiss.IndexFlatIP(dim) index.add(vectors) faiss.write_index(index, index_path) def _ensure_local_semester_data(self, semester: int) -> None: records_path = self._semester_records_path(semester) index_path = self._semester_index_path(semester) if os.path.exists(records_path) and os.path.exists(index_path): return if not self.cloud_store: return try: payload = self.cloud_store.load_semester_files(semester) if not payload: return faiss_bytes, records_bytes = payload with open(index_path, "wb") as f: f.write(faiss_bytes) with open(records_path, "wb") as f: f.write(records_bytes) self._hydration_error_logged_semesters.discard(semester) except Exception as exc: if semester not in self._hydration_error_logged_semesters: print(f"Neon index hydration failed for semester {semester}: {exc}") self._hydration_error_logged_semesters.add(semester) def _sync_semester_to_cloud(self, semester: int) -> None: if not self.cloud_store: return records_path = self._semester_records_path(semester) index_path = self._semester_index_path(semester) if not (os.path.exists(records_path) and os.path.exists(index_path)): return try: with open(index_path, "rb") as f: faiss_bytes = f.read() with open(records_path, "rb") as f: records_bytes = f.read() self.cloud_store.save_semester_files(semester, faiss_bytes, records_bytes) except Exception as exc: print(f"Neon index sync failed for semester {semester}: {exc}") @staticmethod def _load_records(path: str) -> List[dict]: if not os.path.exists(path): return [] with open(path, "rb") as f: return pickle.load(f) @staticmethod def _save_records(path: str, data: List[dict]) -> None: with open(path, "wb") as f: pickle.dump(data, f)