| import os |
| import pickle |
| from typing import List |
|
|
| import faiss |
| import numpy as np |
|
|
| from app.services.neon_index_store import NeonIndexStore |
|
|
|
|
| class LocalVectorStore: |
| def __init__( |
| self, |
| base_dir: str, |
| rag_index_db_url: str = "", |
| neon_max_retries: int = 5, |
| neon_retry_backoff_sec: float = 1.0, |
| neon_connect_timeout_sec: int = 10, |
| ) -> None: |
| self.base_dir = base_dir |
| self.cloud_store = ( |
| NeonIndexStore( |
| rag_index_db_url, |
| max_retries=neon_max_retries, |
| base_backoff_sec=neon_retry_backoff_sec, |
| connect_timeout_sec=neon_connect_timeout_sec, |
| ) |
| if rag_index_db_url |
| else None |
| ) |
| self._hydration_error_logged_semesters: set[int] = set() |
| self._setup_error_logged = False |
| os.makedirs(self.base_dir, exist_ok=True) |
|
|
| if self.cloud_store: |
| try: |
| self.cloud_store.ensure_table() |
| except Exception as exc: |
| if not self._setup_error_logged: |
| print(f"Neon index table setup skipped: {exc}") |
| self._setup_error_logged = True |
|
|
| def upsert_documents( |
| self, |
| semester: int, |
| course_code: str, |
| chunks: List[str], |
| embeddings: List[List[float]], |
| ) -> None: |
| self._ensure_local_semester_data(semester) |
|
|
| records_path = self._semester_records_path(semester) |
| records = self._load_records(records_path) |
|
|
| records = [r for r in records if r.get("course_code") != course_code] |
| for chunk, vector in zip(chunks, embeddings): |
| records.append( |
| { |
| "course_code": course_code, |
| "chunk": chunk, |
| "embedding": vector, |
| } |
| ) |
|
|
| self._save_records(records_path, records) |
| self._rebuild_faiss_index(semester, records) |
| self._sync_semester_to_cloud(semester) |
|
|
| def search(self, semester: int, query_embedding: List[float], top_k: int = 6) -> List[dict]: |
| self._ensure_local_semester_data(semester) |
|
|
| records_path = self._semester_records_path(semester) |
| index_path = self._semester_index_path(semester) |
| records = self._load_records(records_path) |
| if not records: |
| return [] |
|
|
| if not os.path.exists(index_path): |
| self._rebuild_faiss_index(semester, records) |
|
|
| if not os.path.exists(index_path): |
| return [] |
|
|
| index = faiss.read_index(index_path) |
|
|
| q = np.array(query_embedding, dtype=np.float32).reshape(1, -1) |
| faiss.normalize_L2(q) |
|
|
| k = min(top_k, len(records)) |
| _, indices = index.search(q, k) |
|
|
| hits = [] |
| for idx in indices[0].tolist(): |
| if idx == -1: |
| continue |
| if 0 <= idx < len(records): |
| record = records[idx] |
| hits.append( |
| { |
| "course_code": record.get("course_code", ""), |
| "chunk": record.get("chunk", ""), |
| } |
| ) |
|
|
| return hits |
|
|
| def _semester_records_path(self, semester: int) -> str: |
| return os.path.join(self.base_dir, f"semester_{semester}.pkl") |
|
|
| def _semester_index_path(self, semester: int) -> str: |
| return os.path.join(self.base_dir, f"semester_{semester}.faiss") |
|
|
| def _rebuild_faiss_index(self, semester: int, records: List[dict]) -> None: |
| index_path = self._semester_index_path(semester) |
|
|
| if not records: |
| if os.path.exists(index_path): |
| os.remove(index_path) |
| return |
|
|
| vectors = np.array([r["embedding"] for r in records], dtype=np.float32) |
| if vectors.ndim != 2 or vectors.shape[0] == 0: |
| return |
|
|
| faiss.normalize_L2(vectors) |
| dim = vectors.shape[1] |
| index = faiss.IndexFlatIP(dim) |
| index.add(vectors) |
| faiss.write_index(index, index_path) |
|
|
| def _ensure_local_semester_data(self, semester: int) -> None: |
| records_path = self._semester_records_path(semester) |
| index_path = self._semester_index_path(semester) |
|
|
| if os.path.exists(records_path) and os.path.exists(index_path): |
| return |
|
|
| if not self.cloud_store: |
| return |
|
|
| try: |
| payload = self.cloud_store.load_semester_files(semester) |
| if not payload: |
| return |
|
|
| faiss_bytes, records_bytes = payload |
| with open(index_path, "wb") as f: |
| f.write(faiss_bytes) |
| with open(records_path, "wb") as f: |
| f.write(records_bytes) |
| self._hydration_error_logged_semesters.discard(semester) |
| except Exception as exc: |
| if semester not in self._hydration_error_logged_semesters: |
| print(f"Neon index hydration failed for semester {semester}: {exc}") |
| self._hydration_error_logged_semesters.add(semester) |
|
|
| def _sync_semester_to_cloud(self, semester: int) -> None: |
| if not self.cloud_store: |
| return |
|
|
| records_path = self._semester_records_path(semester) |
| index_path = self._semester_index_path(semester) |
|
|
| if not (os.path.exists(records_path) and os.path.exists(index_path)): |
| return |
|
|
| try: |
| with open(index_path, "rb") as f: |
| faiss_bytes = f.read() |
| with open(records_path, "rb") as f: |
| records_bytes = f.read() |
|
|
| self.cloud_store.save_semester_files(semester, faiss_bytes, records_bytes) |
| except Exception as exc: |
| print(f"Neon index sync failed for semester {semester}: {exc}") |
|
|
| @staticmethod |
| def _load_records(path: str) -> List[dict]: |
| if not os.path.exists(path): |
| return [] |
| with open(path, "rb") as f: |
| return pickle.load(f) |
|
|
| @staticmethod |
| def _save_records(path: str, data: List[dict]) -> None: |
| with open(path, "wb") as f: |
| pickle.dump(data, f) |
|
|