LiamKhoaLe commited on
Commit
c1dfa67
·
1 Parent(s): acf837b

Upd ws, db and import state of changes

Browse files
app/.DS_Store CHANGED
Binary files a/app/.DS_Store and b/app/.DS_Store differ
 
app/db.py CHANGED
@@ -1,54 +1,60 @@
1
  # app/db.py
2
  from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
3
- import os
4
- import logging
5
  from pymongo.uri_parser import parse_uri
 
6
 
7
- logger = logging.getLogger("book-query")
 
 
 
8
 
9
- MONGO_URI = os.getenv("MONGODB_URI")
10
- MONGO_DB_NAME = os.getenv("MONGODB_DB", "querysearcher")
11
- TEXTBOOK_URI = os.getenv("TEXTBOOK_URI")
12
-
13
- # == QUERY ==
14
- # Return a fresh client for current event loop
15
  def get_client():
16
  return AsyncIOMotorClient(MONGO_URI)
 
17
  def get_db():
18
  return get_client()[MONGO_DB_NAME]
 
19
  def get_gridfs():
20
  return AsyncIOMotorGridFSBucket(get_db())
21
 
22
- # == PDF SAVER ==
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  async def save_to_textbook_fs(doc_id: str, file_path: str):
24
  try:
25
- uri = TEXTBOOK_URI
26
- if not uri:
27
- raise ValueError("TEXTBOOK_URI not set")
28
- # Parse to correct db
29
- parsed = parse_uri(uri)
30
- db_name = parsed.get("database")
31
- if not db_name:
32
- raise ValueError("No default database name defined or provided.")
33
- #
34
- textbook_client = AsyncIOMotorClient(uri)
35
- textbook_db = textbook_client[db_name]
36
- textbook_fs = AsyncIOMotorGridFSBucket(textbook_db)
37
- # Read
38
- with open(file_path, "rb") as f:
39
- await textbook_fs.upload_from_stream(f"{doc_id}.pdf", f)
40
- # Log
41
- logger.info(f"📦 PDF also stored to textbook bucket at: {TEXTBOOK_URI}")
42
  except Exception as e:
43
- logger.warning(f"⚠️ Failed to save to textbook storage: {e}")
44
 
45
- # == PDF FETCHER ==
46
  async def fetch_textbook_pdf(doc_id: str):
 
 
 
 
 
47
  try:
48
- textbook_client = AsyncIOMotorClient(TEXTBOOK_URI)
49
- textbook_db = textbook_client.get_default_database()
50
- textbook_fs = AsyncIOMotorGridFSBucket(textbook_db)
51
- return await textbook_fs.open_download_stream_by_name(f"{doc_id}.pdf")
52
- except Exception as e:
53
- logger.warning(f"⚠️ Failed to fetch textbook PDF for {doc_id}: {e}")
54
- raise
 
1
  # app/db.py
2
  from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
 
 
3
  from pymongo.uri_parser import parse_uri
4
+ import os, logging
5
 
6
+ logger = logging.getLogger("book-query")
7
+ MONGO_URI = os.getenv("MONGODB_URI")
8
+ MONGO_DB_NAME = os.getenv("MONGODB_DB", "querysearcher")
9
+ TEXTBOOK_URI = os.getenv("TEXTBOOK_URI")
10
 
11
+ # ────────────────────────────────────────────────────────────────
12
+ # helpers for the main “query-searcher” DB
13
+ # ────────────────────────────────────────────────────────────────
 
 
 
14
  def get_client():
15
  return AsyncIOMotorClient(MONGO_URI)
16
+
17
  def get_db():
18
  return get_client()[MONGO_DB_NAME]
19
+
20
  def get_gridfs():
21
  return AsyncIOMotorGridFSBucket(get_db())
22
 
23
+ # ────────────────────────────────────────────────────────────────
24
+ # ONE canonical helper for the *textbook* bucket
25
+ # ────────────────────────────────────────────────────────────────
26
+ def _get_textbook_fs() -> AsyncIOMotorGridFSBucket:
27
+ """
28
+ Build (and cache) a GridFS bucket that points to the textbook replica.
29
+ Works whether TEXTBOOK_URI ends with '/<db>' or not.
30
+ """
31
+ if not TEXTBOOK_URI:
32
+ raise RuntimeError("TEXTBOOK_URI not set in environment")
33
+
34
+ parsed = parse_uri(TEXTBOOK_URI)
35
+ db_name = parsed.get("database") or "textbooks" # fallback name
36
+ client = AsyncIOMotorClient(TEXTBOOK_URI)
37
+ return AsyncIOMotorGridFSBucket(client[db_name])
38
+
39
+ # ── public wrappers ─────────────────────────────────────────────
40
  async def save_to_textbook_fs(doc_id: str, file_path: str):
41
  try:
42
+ bucket = _get_textbook_fs()
43
+ with open(file_path, "rb") as fp:
44
+ await bucket.upload_from_stream(f"{doc_id}.pdf", fp)
45
+ logger.info(f"📦 textbook PDF stored → {TEXTBOOK_URI}/{bucket.database.name}")
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  except Exception as e:
47
+ logger.warning(f"⚠️ textbook GridFS save failed: {e}")
48
 
 
49
  async def fetch_textbook_pdf(doc_id: str):
50
+ bucket = _get_textbook_fs()
51
+ return await bucket.open_download_stream_by_name(f"{doc_id}.pdf")
52
+
53
+ async def delete_textbook_pdf(doc_id: str):
54
+ bucket = _get_textbook_fs()
55
  try:
56
+ file = await bucket.find({"filename": f"{doc_id}.pdf"}).to_list(1)
57
+ if file:
58
+ await bucket.delete(file[0]["_id"])
59
+ except Exception:
60
+ pass
 
 
app/routers/import_doc.py CHANGED
@@ -76,7 +76,7 @@ async def import_book(req: ImportRequest):
76
  doc = {
77
  "_id": req.candidate_id,
78
  "title": req.title,
79
- "status": "queued",
80
  "metadata": result
81
  }
82
  db = get_db()
@@ -91,7 +91,7 @@ async def import_book(req: ImportRequest):
91
  "source": req.source,
92
  "documentId": req.candidate_id,
93
  "uri": uri
94
- }
95
 
96
 
97
  # Fetch textbook on id
 
76
  doc = {
77
  "_id": req.candidate_id,
78
  "title": req.title,
79
+ "status": "DOWNLOADING",
80
  "metadata": result
81
  }
82
  db = get_db()
 
91
  "source": req.source,
92
  "documentId": req.candidate_id,
93
  "uri": uri
94
+ }
95
 
96
 
97
  # Fetch textbook on id
app/routers/ws_progress.py CHANGED
@@ -1,28 +1,53 @@
1
- import asyncio
2
- import logging
3
- from fastapi import WebSocket
4
- from app.db import get_db
 
5
 
6
  logger = logging.getLogger("book-query")
7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
  async def forward_progress(websocket: WebSocket, document_id: str):
 
9
  db = get_db()
10
  try:
11
  while True:
12
  doc = await db.documents.find_one({"_id": document_id})
13
  if not doc:
14
  await websocket.send_json({"status": "NOT_FOUND"})
15
- await asyncio.sleep(2)
16
- continue
 
 
 
 
17
 
18
- status = doc.get("status", "UNKNOWN")
19
- await websocket.send_json({"status": status})
20
- if status in {"READY", "FAILED"}:
21
- break
22
 
23
- await asyncio.sleep(2)
24
  except Exception as e:
25
- logger.exception(f"📡 WebSocket failed for doc {document_id}: {e}")
26
- await websocket.send_json({"status": "ERROR"})
 
 
27
  finally:
28
- await websocket.close()
 
 
1
+ # app/routers/ws_progress.py
2
+ import asyncio, logging, contextlib
3
+ from fastapi import WebSocket, WebSocketDisconnect
4
+ from bson import ObjectId
5
+ from app.db import get_db, get_gridfs, delete_textbook_pdf
6
 
7
  logger = logging.getLogger("book-query")
8
 
9
+ async def _delete_everything(doc_id: str):
10
+ """Remove embeddings, GridFS file & textbook copy if the user vanished."""
11
+ db = get_db()
12
+ gridfs_query = get_gridfs()
13
+ # remove metadata + embeddings
14
+ await db.documents.delete_one({"_id": doc_id})
15
+ await db.embeddings.delete_many({"document_id": doc_id})
16
+ # remove original PDF from the main bucket
17
+ with contextlib.suppress(Exception):
18
+ file = await gridfs_query.find({"filename": f"{doc_id}.pdf"}).to_list(1)
19
+ if file:
20
+ await gridfs_query.delete(file[0]["_id"])
21
+ # remove textbook replica
22
+ with contextlib.suppress(Exception):
23
+ await delete_textbook_pdf(doc_id)
24
+ logger.info(f"🗑️ cleaned up artefacts of {doc_id}")
25
+
26
+
27
  async def forward_progress(websocket: WebSocket, document_id: str):
28
+ """Handle state change allowing frontend to connect and update seamlessly."""
29
  db = get_db()
30
  try:
31
  while True:
32
  doc = await db.documents.find_one({"_id": document_id})
33
  if not doc:
34
  await websocket.send_json({"status": "NOT_FOUND"})
35
+ else:
36
+ status = doc.get("status", "UNKNOWN")
37
+ await websocket.send_json({"status": status})
38
+ if status in {"READY", "FAILED"}:
39
+ break
40
+ await asyncio.sleep(2)
41
 
42
+ except WebSocketDisconnect:
43
+ logger.info(f"💨 client closed WS for {document_id} – deleting artefacts")
44
+ await _delete_everything(document_id)
 
45
 
 
46
  except Exception as e:
47
+ logger.exception(f"📡 WS loop crashed for {document_id}: {e}")
48
+ with contextlib.suppress(Exception):
49
+ await websocket.send_json({"status": "ERROR"})
50
+
51
  finally:
52
+ with contextlib.suppress(Exception):
53
+ await websocket.close()