chatbot-gitconnect / app /vector_store.py
quantumbit's picture
saving db files to neon
812b65b
import os
import pickle
from typing import List
import faiss
import numpy as np
from app.services.neon_index_store import NeonIndexStore
class LocalVectorStore:
def __init__(
self,
base_dir: str,
rag_index_db_url: str = "",
neon_max_retries: int = 5,
neon_retry_backoff_sec: float = 1.0,
neon_connect_timeout_sec: int = 10,
) -> None:
self.base_dir = base_dir
self.cloud_store = (
NeonIndexStore(
rag_index_db_url,
max_retries=neon_max_retries,
base_backoff_sec=neon_retry_backoff_sec,
connect_timeout_sec=neon_connect_timeout_sec,
)
if rag_index_db_url
else None
)
self._hydration_error_logged_semesters: set[int] = set()
self._setup_error_logged = False
os.makedirs(self.base_dir, exist_ok=True)
if self.cloud_store:
try:
self.cloud_store.ensure_table()
except Exception as exc:
if not self._setup_error_logged:
print(f"Neon index table setup skipped: {exc}")
self._setup_error_logged = True
def upsert_documents(
self,
semester: int,
course_code: str,
chunks: List[str],
embeddings: List[List[float]],
) -> None:
self._ensure_local_semester_data(semester)
records_path = self._semester_records_path(semester)
records = self._load_records(records_path)
records = [r for r in records if r.get("course_code") != course_code]
for chunk, vector in zip(chunks, embeddings):
records.append(
{
"course_code": course_code,
"chunk": chunk,
"embedding": vector,
}
)
self._save_records(records_path, records)
self._rebuild_faiss_index(semester, records)
self._sync_semester_to_cloud(semester)
def search(self, semester: int, query_embedding: List[float], top_k: int = 6) -> List[dict]:
self._ensure_local_semester_data(semester)
records_path = self._semester_records_path(semester)
index_path = self._semester_index_path(semester)
records = self._load_records(records_path)
if not records:
return []
if not os.path.exists(index_path):
self._rebuild_faiss_index(semester, records)
if not os.path.exists(index_path):
return []
index = faiss.read_index(index_path)
q = np.array(query_embedding, dtype=np.float32).reshape(1, -1)
faiss.normalize_L2(q)
k = min(top_k, len(records))
_, indices = index.search(q, k)
hits = []
for idx in indices[0].tolist():
if idx == -1:
continue
if 0 <= idx < len(records):
record = records[idx]
hits.append(
{
"course_code": record.get("course_code", ""),
"chunk": record.get("chunk", ""),
}
)
return hits
def _semester_records_path(self, semester: int) -> str:
return os.path.join(self.base_dir, f"semester_{semester}.pkl")
def _semester_index_path(self, semester: int) -> str:
return os.path.join(self.base_dir, f"semester_{semester}.faiss")
def _rebuild_faiss_index(self, semester: int, records: List[dict]) -> None:
index_path = self._semester_index_path(semester)
if not records:
if os.path.exists(index_path):
os.remove(index_path)
return
vectors = np.array([r["embedding"] for r in records], dtype=np.float32)
if vectors.ndim != 2 or vectors.shape[0] == 0:
return
faiss.normalize_L2(vectors)
dim = vectors.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(vectors)
faiss.write_index(index, index_path)
def _ensure_local_semester_data(self, semester: int) -> None:
records_path = self._semester_records_path(semester)
index_path = self._semester_index_path(semester)
if os.path.exists(records_path) and os.path.exists(index_path):
return
if not self.cloud_store:
return
try:
payload = self.cloud_store.load_semester_files(semester)
if not payload:
return
faiss_bytes, records_bytes = payload
with open(index_path, "wb") as f:
f.write(faiss_bytes)
with open(records_path, "wb") as f:
f.write(records_bytes)
self._hydration_error_logged_semesters.discard(semester)
except Exception as exc:
if semester not in self._hydration_error_logged_semesters:
print(f"Neon index hydration failed for semester {semester}: {exc}")
self._hydration_error_logged_semesters.add(semester)
def _sync_semester_to_cloud(self, semester: int) -> None:
if not self.cloud_store:
return
records_path = self._semester_records_path(semester)
index_path = self._semester_index_path(semester)
if not (os.path.exists(records_path) and os.path.exists(index_path)):
return
try:
with open(index_path, "rb") as f:
faiss_bytes = f.read()
with open(records_path, "rb") as f:
records_bytes = f.read()
self.cloud_store.save_semester_files(semester, faiss_bytes, records_bytes)
except Exception as exc:
print(f"Neon index sync failed for semester {semester}: {exc}")
@staticmethod
def _load_records(path: str) -> List[dict]:
if not os.path.exists(path):
return []
with open(path, "rb") as f:
return pickle.load(f)
@staticmethod
def _save_records(path: str, data: List[dict]) -> None:
with open(path, "wb") as f:
pickle.dump(data, f)