quantumbit Copilot commited on
Commit
812b65b
·
1 Parent(s): d0220ae

saving db files to neon

Browse files

Co-authored-by: Copilot <copilot@github.com>

.env.example CHANGED
@@ -1,7 +1,11 @@
1
  GEMINI_API_KEY=your_gemini_api_key
2
  GEMINI_MODEL=gemini-2.5-flash
3
  EMBEDDING_MODEL_NAME=sentence-transformers/all-MiniLM-L6-v2
4
- STUDENT_PERFORMANCE_URL_TEMPLATE=https://git-connect-backend-v2.vercel.app/api/student/{student_id}/performance
 
 
 
 
5
  VECTOR_DATA_DIR=data/vector_index
6
  RAW_TEXT_DIR=data/raw_text
7
  PDF_TIMEOUT_SEC=60
 
1
  GEMINI_API_KEY=your_gemini_api_key
2
  GEMINI_MODEL=gemini-2.5-flash
3
  EMBEDDING_MODEL_NAME=sentence-transformers/all-MiniLM-L6-v2
4
+ STUDENT_PERFORMANCE_URL_TEMPLATE=https://git-connect-backend-v2.vercel.app/api/student/{student_id}/performance/{semester}
5
+ RAG_INDEX_DB_URL=
6
+ NEON_MAX_RETRIES=5
7
+ NEON_RETRY_BACKOFF_SEC=1.0
8
+ NEON_CONNECT_TIMEOUT_SEC=10
9
  VECTOR_DATA_DIR=data/vector_index
10
  RAW_TEXT_DIR=data/raw_text
11
  PDF_TIMEOUT_SEC=60
README.md CHANGED
@@ -24,6 +24,19 @@ FastAPI backend with two primary features:
24
  - Vector Search: FAISS (IndexFlatIP with normalized vectors)
25
  - LLM generation/summarization: Gemini
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  ## Local Setup
28
 
29
  1. Create and activate a virtual environment.
 
24
  - Vector Search: FAISS (IndexFlatIP with normalized vectors)
25
  - LLM generation/summarization: Gemini
26
 
27
+ RAG index persistence:
28
+
29
+ - Runtime retrieval uses semester FAISS files plus semester PKL records.
30
+ - Optional cloud persistence to Neon is enabled with RAG_INDEX_DB_URL.
31
+ - On syllabus processing, updated semester FAISS and PKL are uploaded to Neon.
32
+ - On server restart, if local files are missing, the server hydrates them from Neon automatically.
33
+
34
+ About stored artifacts:
35
+
36
+ - Needed for RAG runtime: semester_x.faiss and semester_x.pkl.
37
+ - Raw text files under data/raw_text are not required for search-time RAG.
38
+ - Raw text is mainly useful for debugging/auditing ingestion outputs.
39
+
40
  ## Local Setup
41
 
42
  1. Create and activate a virtual environment.
app/config.py CHANGED
@@ -20,8 +20,12 @@ class Settings:
20
  )
21
  student_performance_url_template: str = os.getenv(
22
  "STUDENT_PERFORMANCE_URL_TEMPLATE",
23
- "https://git-connect-backend-v2.vercel.app/api/student/{student_id}/performance",
24
  )
 
 
 
 
25
  vector_data_dir: str = os.getenv("VECTOR_DATA_DIR", "data/vector_index")
26
  raw_text_dir: str = os.getenv("RAW_TEXT_DIR", "data/raw_text")
27
  gemini_model: str = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
 
20
  )
21
  student_performance_url_template: str = os.getenv(
22
  "STUDENT_PERFORMANCE_URL_TEMPLATE",
23
+ "https://git-connect-backend-v2.vercel.app/api/student/{student_id}/performance/{semester}",
24
  )
25
+ rag_index_db_url: str = os.getenv("RAG_INDEX_DB_URL", "")
26
+ neon_max_retries: int = int(os.getenv("NEON_MAX_RETRIES", "5"))
27
+ neon_retry_backoff_sec: float = float(os.getenv("NEON_RETRY_BACKOFF_SEC", "1.0"))
28
+ neon_connect_timeout_sec: int = int(os.getenv("NEON_CONNECT_TIMEOUT_SEC", "10"))
29
  vector_data_dir: str = os.getenv("VECTOR_DATA_DIR", "data/vector_index")
30
  raw_text_dir: str = os.getenv("RAW_TEXT_DIR", "data/raw_text")
31
  gemini_model: str = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
app/main.py CHANGED
@@ -62,8 +62,13 @@ def process_syllabus(courses: List[CourseInput]) -> SyllabusProcessResponse:
62
  except ValueError as exc:
63
  raise HTTPException(status_code=500, detail=str(exc)) from exc
64
 
65
- vector_store = LocalVectorStore(settings.vector_data_dir)
66
- os.makedirs(settings.raw_text_dir, exist_ok=True)
 
 
 
 
 
67
 
68
  results: List[CourseSummary] = []
69
  failed: List[CourseProcessError] = []
@@ -79,10 +84,6 @@ def process_syllabus(courses: List[CourseInput]) -> SyllabusProcessResponse:
79
  if not syllabus_text:
80
  raise RuntimeError("No text extracted from PDF.")
81
 
82
- raw_path = os.path.join(settings.raw_text_dir, f"{course.course_code}.txt")
83
- with open(raw_path, "w", encoding="utf-8") as f:
84
- f.write(syllabus_text)
85
-
86
  chunks = chunk_text(syllabus_text)
87
  if not chunks:
88
  raise RuntimeError("Unable to create text chunks from syllabus content.")
@@ -166,7 +167,13 @@ Rules:
166
  - Be concise and practical.
167
  """
168
  elif intent == "syllabus":
169
- vector_store = LocalVectorStore(settings.vector_data_dir)
 
 
 
 
 
 
170
  query_embedding = gemini.embed_text(req.query, task_type="retrieval_query")
171
  hits = vector_store.search(req.semester, query_embedding, top_k=5)
172
  hits = hits[:5]
 
62
  except ValueError as exc:
63
  raise HTTPException(status_code=500, detail=str(exc)) from exc
64
 
65
+ vector_store = LocalVectorStore(
66
+ settings.vector_data_dir,
67
+ rag_index_db_url=settings.rag_index_db_url,
68
+ neon_max_retries=settings.neon_max_retries,
69
+ neon_retry_backoff_sec=settings.neon_retry_backoff_sec,
70
+ neon_connect_timeout_sec=settings.neon_connect_timeout_sec,
71
+ )
72
 
73
  results: List[CourseSummary] = []
74
  failed: List[CourseProcessError] = []
 
84
  if not syllabus_text:
85
  raise RuntimeError("No text extracted from PDF.")
86
 
 
 
 
 
87
  chunks = chunk_text(syllabus_text)
88
  if not chunks:
89
  raise RuntimeError("Unable to create text chunks from syllabus content.")
 
167
  - Be concise and practical.
168
  """
169
  elif intent == "syllabus":
170
+ vector_store = LocalVectorStore(
171
+ settings.vector_data_dir,
172
+ rag_index_db_url=settings.rag_index_db_url,
173
+ neon_max_retries=settings.neon_max_retries,
174
+ neon_retry_backoff_sec=settings.neon_retry_backoff_sec,
175
+ neon_connect_timeout_sec=settings.neon_connect_timeout_sec,
176
+ )
177
  query_embedding = gemini.embed_text(req.query, task_type="retrieval_query")
178
  hits = vector_store.search(req.semester, query_embedding, top_k=5)
179
  hits = hits[:5]
app/services/neon_index_store.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Optional, Tuple
2
+ import time
3
+
4
+ import psycopg
5
+ from psycopg import Binary
6
+
7
+
8
+ class NeonIndexStore:
9
+ def __init__(
10
+ self,
11
+ db_url: str,
12
+ max_retries: int = 5,
13
+ base_backoff_sec: float = 1.0,
14
+ connect_timeout_sec: int = 10,
15
+ ) -> None:
16
+ self.db_url = db_url
17
+ self.max_retries = max(1, max_retries)
18
+ self.base_backoff_sec = max(0.1, base_backoff_sec)
19
+ self.connect_timeout_sec = max(1, connect_timeout_sec)
20
+
21
+ def _connect(self):
22
+ return psycopg.connect(
23
+ self.db_url,
24
+ connect_timeout=self.connect_timeout_sec,
25
+ )
26
+
27
+ def _run_with_retry(self, op_name: str, fn):
28
+ last_exc: Exception | None = None
29
+
30
+ for attempt in range(1, self.max_retries + 1):
31
+ try:
32
+ return fn()
33
+ except Exception as exc:
34
+ last_exc = exc
35
+ if attempt < self.max_retries:
36
+ sleep_sec = self.base_backoff_sec * (2 ** (attempt - 1))
37
+ time.sleep(sleep_sec)
38
+
39
+ raise RuntimeError(
40
+ f"Neon operation '{op_name}' failed after {self.max_retries} attempts: {last_exc}"
41
+ )
42
+
43
+ def ensure_table(self) -> None:
44
+ query = """
45
+ CREATE TABLE IF NOT EXISTS rag_semester_indexes (
46
+ semester INTEGER PRIMARY KEY,
47
+ faiss_blob BYTEA NOT NULL,
48
+ records_blob BYTEA NOT NULL,
49
+ updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
50
+ )
51
+ """
52
+
53
+ def _op():
54
+ with self._connect() as conn:
55
+ with conn.cursor() as cur:
56
+ cur.execute(query)
57
+ conn.commit()
58
+
59
+ self._run_with_retry("ensure_table", _op)
60
+
61
+ def save_semester_files(self, semester: int, faiss_bytes: bytes, records_bytes: bytes) -> None:
62
+ query = """
63
+ INSERT INTO rag_semester_indexes (semester, faiss_blob, records_blob, updated_at)
64
+ VALUES (%s, %s, %s, NOW())
65
+ ON CONFLICT (semester)
66
+ DO UPDATE SET
67
+ faiss_blob = EXCLUDED.faiss_blob,
68
+ records_blob = EXCLUDED.records_blob,
69
+ updated_at = NOW()
70
+ """
71
+ def _op():
72
+ with self._connect() as conn:
73
+ with conn.cursor() as cur:
74
+ cur.execute(
75
+ query,
76
+ (semester, Binary(faiss_bytes), Binary(records_bytes)),
77
+ )
78
+ conn.commit()
79
+
80
+ self._run_with_retry("save_semester_files", _op)
81
+
82
+ def load_semester_files(self, semester: int) -> Optional[Tuple[bytes, bytes]]:
83
+ query = """
84
+ SELECT faiss_blob, records_blob
85
+ FROM rag_semester_indexes
86
+ WHERE semester = %s
87
+ """
88
+ def _op():
89
+ with self._connect() as conn:
90
+ with conn.cursor() as cur:
91
+ cur.execute(query, (semester,))
92
+ return cur.fetchone()
93
+
94
+ row = self._run_with_retry("load_semester_files", _op)
95
+
96
+ if not row:
97
+ return None
98
+
99
+ faiss_blob, records_blob = row
100
+ return bytes(faiss_blob), bytes(records_blob)
app/services/student_service.py CHANGED
@@ -12,20 +12,41 @@ def fetch_student_info(
12
  raise ValueError(
13
  "STUDENT_PERFORMANCE_URL_TEMPLATE must include '{student_id}'."
14
  )
 
 
 
 
 
 
 
 
 
 
15
 
16
- student_url = student_performance_url_template.format(student_id=student_id)
17
- get_resp = requests.get(
18
  student_url,
19
- params={"semester": semester, "intent": intent},
20
  timeout=timeout,
21
  )
22
- if not get_resp.ok:
23
- raise RuntimeError(
24
- f"Failed to fetch student info from {student_url}; "
25
- f"status {get_resp.status_code}."
 
26
  )
27
 
28
- return _filter_student_info_by_intent(get_resp.json(), semester=semester, intent=intent)
 
 
 
 
 
 
 
 
 
 
29
 
30
 
31
  def _filter_student_info_by_intent(payload: dict, semester: int, intent: str) -> dict:
@@ -36,7 +57,7 @@ def _filter_student_info_by_intent(payload: dict, semester: int, intent: str) ->
36
  data.pop("results", None)
37
  return data
38
 
39
- if intent == "result":
40
  data.pop("attendance", None)
41
  results = data.get("results")
42
  if isinstance(results, dict):
 
12
  raise ValueError(
13
  "STUDENT_PERFORMANCE_URL_TEMPLATE must include '{student_id}'."
14
  )
15
+ if "{semester}" not in student_performance_url_template:
16
+ raise ValueError(
17
+ "STUDENT_PERFORMANCE_URL_TEMPLATE must include '{semester}'."
18
+ )
19
+
20
+ student_url = student_performance_url_template.format(
21
+ student_id=student_id,
22
+ semester=semester,
23
+ )
24
+ backend_intent = "results" if intent == "result" else intent
25
 
26
+ # Contract: GET with intent as URL query parameter.
27
+ get_with_intent_resp = requests.get(
28
  student_url,
29
+ params={"intent": backend_intent},
30
  timeout=timeout,
31
  )
32
+ if get_with_intent_resp.ok:
33
+ return _filter_student_info_by_intent(
34
+ get_with_intent_resp.json(),
35
+ semester=semester,
36
+ intent=intent,
37
  )
38
 
39
+ # Fallback: plain GET if server ignores intent query param.
40
+ get_resp = requests.get(student_url, timeout=timeout)
41
+ if get_resp.ok:
42
+ return _filter_student_info_by_intent(get_resp.json(), semester=semester, intent=intent)
43
+
44
+ raise RuntimeError(
45
+ "Failed to fetch student info from "
46
+ f"{student_url}; "
47
+ f"GET(intent={backend_intent}) status {get_with_intent_resp.status_code}, "
48
+ f"GET status {get_resp.status_code}."
49
+ )
50
 
51
 
52
  def _filter_student_info_by_intent(payload: dict, semester: int, intent: str) -> dict:
 
57
  data.pop("results", None)
58
  return data
59
 
60
+ if intent in {"result", "results"}:
61
  data.pop("attendance", None)
62
  results = data.get("results")
63
  if isinstance(results, dict):
app/vector_store.py CHANGED
@@ -1,16 +1,45 @@
1
- import json
2
  import os
 
3
  from typing import List
4
 
5
  import faiss
6
  import numpy as np
7
 
 
 
8
 
9
  class LocalVectorStore:
10
- def __init__(self, base_dir: str) -> None:
 
 
 
 
 
 
 
11
  self.base_dir = base_dir
 
 
 
 
 
 
 
 
 
 
 
 
12
  os.makedirs(self.base_dir, exist_ok=True)
13
 
 
 
 
 
 
 
 
 
14
  def upsert_documents(
15
  self,
16
  semester: int,
@@ -18,8 +47,10 @@ class LocalVectorStore:
18
  chunks: List[str],
19
  embeddings: List[List[float]],
20
  ) -> None:
21
- meta_path = self._semester_meta_path(semester)
22
- records = self._load(meta_path)
 
 
23
 
24
  records = [r for r in records if r.get("course_code") != course_code]
25
  for chunk, vector in zip(chunks, embeddings):
@@ -31,13 +62,16 @@ class LocalVectorStore:
31
  }
32
  )
33
 
34
- self._save(meta_path, records)
35
  self._rebuild_faiss_index(semester, records)
 
36
 
37
  def search(self, semester: int, query_embedding: List[float], top_k: int = 6) -> List[dict]:
38
- meta_path = self._semester_meta_path(semester)
 
 
39
  index_path = self._semester_index_path(semester)
40
- records = self._load(meta_path)
41
  if not records:
42
  return []
43
 
@@ -70,8 +104,8 @@ class LocalVectorStore:
70
 
71
  return hits
72
 
73
- def _semester_meta_path(self, semester: int) -> str:
74
- return os.path.join(self.base_dir, f"semester_{semester}.meta.json")
75
 
76
  def _semester_index_path(self, semester: int) -> str:
77
  return os.path.join(self.base_dir, f"semester_{semester}.faiss")
@@ -94,14 +128,60 @@ class LocalVectorStore:
94
  index.add(vectors)
95
  faiss.write_index(index, index_path)
96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  @staticmethod
98
- def _load(path: str) -> List[dict]:
99
  if not os.path.exists(path):
100
  return []
101
- with open(path, "r", encoding="utf-8") as f:
102
- return json.load(f)
103
 
104
  @staticmethod
105
- def _save(path: str, data: List[dict]) -> None:
106
- with open(path, "w", encoding="utf-8") as f:
107
- json.dump(data, f, ensure_ascii=False)
 
 
1
  import os
2
+ import pickle
3
  from typing import List
4
 
5
  import faiss
6
  import numpy as np
7
 
8
+ from app.services.neon_index_store import NeonIndexStore
9
+
10
 
11
  class LocalVectorStore:
12
+ def __init__(
13
+ self,
14
+ base_dir: str,
15
+ rag_index_db_url: str = "",
16
+ neon_max_retries: int = 5,
17
+ neon_retry_backoff_sec: float = 1.0,
18
+ neon_connect_timeout_sec: int = 10,
19
+ ) -> None:
20
  self.base_dir = base_dir
21
+ self.cloud_store = (
22
+ NeonIndexStore(
23
+ rag_index_db_url,
24
+ max_retries=neon_max_retries,
25
+ base_backoff_sec=neon_retry_backoff_sec,
26
+ connect_timeout_sec=neon_connect_timeout_sec,
27
+ )
28
+ if rag_index_db_url
29
+ else None
30
+ )
31
+ self._hydration_error_logged_semesters: set[int] = set()
32
+ self._setup_error_logged = False
33
  os.makedirs(self.base_dir, exist_ok=True)
34
 
35
+ if self.cloud_store:
36
+ try:
37
+ self.cloud_store.ensure_table()
38
+ except Exception as exc:
39
+ if not self._setup_error_logged:
40
+ print(f"Neon index table setup skipped: {exc}")
41
+ self._setup_error_logged = True
42
+
43
  def upsert_documents(
44
  self,
45
  semester: int,
 
47
  chunks: List[str],
48
  embeddings: List[List[float]],
49
  ) -> None:
50
+ self._ensure_local_semester_data(semester)
51
+
52
+ records_path = self._semester_records_path(semester)
53
+ records = self._load_records(records_path)
54
 
55
  records = [r for r in records if r.get("course_code") != course_code]
56
  for chunk, vector in zip(chunks, embeddings):
 
62
  }
63
  )
64
 
65
+ self._save_records(records_path, records)
66
  self._rebuild_faiss_index(semester, records)
67
+ self._sync_semester_to_cloud(semester)
68
 
69
  def search(self, semester: int, query_embedding: List[float], top_k: int = 6) -> List[dict]:
70
+ self._ensure_local_semester_data(semester)
71
+
72
+ records_path = self._semester_records_path(semester)
73
  index_path = self._semester_index_path(semester)
74
+ records = self._load_records(records_path)
75
  if not records:
76
  return []
77
 
 
104
 
105
  return hits
106
 
107
+ def _semester_records_path(self, semester: int) -> str:
108
+ return os.path.join(self.base_dir, f"semester_{semester}.pkl")
109
 
110
  def _semester_index_path(self, semester: int) -> str:
111
  return os.path.join(self.base_dir, f"semester_{semester}.faiss")
 
128
  index.add(vectors)
129
  faiss.write_index(index, index_path)
130
 
131
+ def _ensure_local_semester_data(self, semester: int) -> None:
132
+ records_path = self._semester_records_path(semester)
133
+ index_path = self._semester_index_path(semester)
134
+
135
+ if os.path.exists(records_path) and os.path.exists(index_path):
136
+ return
137
+
138
+ if not self.cloud_store:
139
+ return
140
+
141
+ try:
142
+ payload = self.cloud_store.load_semester_files(semester)
143
+ if not payload:
144
+ return
145
+
146
+ faiss_bytes, records_bytes = payload
147
+ with open(index_path, "wb") as f:
148
+ f.write(faiss_bytes)
149
+ with open(records_path, "wb") as f:
150
+ f.write(records_bytes)
151
+ self._hydration_error_logged_semesters.discard(semester)
152
+ except Exception as exc:
153
+ if semester not in self._hydration_error_logged_semesters:
154
+ print(f"Neon index hydration failed for semester {semester}: {exc}")
155
+ self._hydration_error_logged_semesters.add(semester)
156
+
157
+ def _sync_semester_to_cloud(self, semester: int) -> None:
158
+ if not self.cloud_store:
159
+ return
160
+
161
+ records_path = self._semester_records_path(semester)
162
+ index_path = self._semester_index_path(semester)
163
+
164
+ if not (os.path.exists(records_path) and os.path.exists(index_path)):
165
+ return
166
+
167
+ try:
168
+ with open(index_path, "rb") as f:
169
+ faiss_bytes = f.read()
170
+ with open(records_path, "rb") as f:
171
+ records_bytes = f.read()
172
+
173
+ self.cloud_store.save_semester_files(semester, faiss_bytes, records_bytes)
174
+ except Exception as exc:
175
+ print(f"Neon index sync failed for semester {semester}: {exc}")
176
+
177
  @staticmethod
178
+ def _load_records(path: str) -> List[dict]:
179
  if not os.path.exists(path):
180
  return []
181
+ with open(path, "rb") as f:
182
+ return pickle.load(f)
183
 
184
  @staticmethod
185
+ def _save_records(path: str, data: List[dict]) -> None:
186
+ with open(path, "wb") as f:
187
+ pickle.dump(data, f)