LiamKhoaLe commited on
Commit
77ff318
·
1 Parent(s): 9785201

Upd safe GridFS usage on ingest

Browse files
Files changed (1) hide show
  1. app/services/ingest.py +6 -7
app/services/ingest.py CHANGED
@@ -2,7 +2,7 @@
2
  import os
3
  import fitz # PyMuPDF - convert PDF to plaintext for semantic embedding
4
  import io
5
- from app.db import db, grid_fs_bucket
6
  from sentence_transformers import SentenceTransformer
7
 
8
  async def parse_and_index(document_id: str):
@@ -10,12 +10,12 @@ async def parse_and_index(document_id: str):
10
  try:
11
  # Lazy model load
12
  model = SentenceTransformer("all-MiniLM-L6-v2")
13
-
 
14
  # Load PDF from GridFS
15
  buffer = io.BytesIO()
16
  await grid_fs_bucket.download_to_stream_by_name(f"{document_id}.pdf", buffer)
17
  buffer.seek(0)
18
-
19
  # Extract text from PDF
20
  text_chunks = []
21
  with fitz.open(stream=buffer.read(), filetype="pdf") as doc:
@@ -26,10 +26,8 @@ async def parse_and_index(document_id: str):
26
 
27
  if not text_chunks:
28
  raise ValueError("No text extracted from PDF.")
29
-
30
  # Embed chunks
31
  embeddings = model.encode(text_chunks, convert_to_tensor=True)
32
-
33
  # Store in MongoDB
34
  entries = [
35
  {
@@ -42,9 +40,10 @@ async def parse_and_index(document_id: str):
42
  ]
43
  await db.embeddings.insert_many(entries)
44
  await db.documents.update_one({"_id": document_id}, {"$set": {"status": "READY"}})
45
-
46
  print(f"[INFO] Finished indexing {len(entries)} chunks from document: {document_id}")
47
-
48
  except Exception as e:
49
  print(f"[ERROR] Ingestion failed for {document_id}: {e}")
 
50
  await db.documents.update_one({"_id": document_id}, {"$set": {"status": "FAILED"}})
 
2
  import os
3
  import fitz # PyMuPDF - convert PDF to plaintext for semantic embedding
4
  import io
5
+ from app.db import get_db, get_gridfs
6
  from sentence_transformers import SentenceTransformer
7
 
8
  async def parse_and_index(document_id: str):
 
10
  try:
11
  # Lazy model load
12
  model = SentenceTransformer("all-MiniLM-L6-v2")
13
+ db = get_db()
14
+ grid_fs_bucket = get_gridfs()
15
  # Load PDF from GridFS
16
  buffer = io.BytesIO()
17
  await grid_fs_bucket.download_to_stream_by_name(f"{document_id}.pdf", buffer)
18
  buffer.seek(0)
 
19
  # Extract text from PDF
20
  text_chunks = []
21
  with fitz.open(stream=buffer.read(), filetype="pdf") as doc:
 
26
 
27
  if not text_chunks:
28
  raise ValueError("No text extracted from PDF.")
 
29
  # Embed chunks
30
  embeddings = model.encode(text_chunks, convert_to_tensor=True)
 
31
  # Store in MongoDB
32
  entries = [
33
  {
 
40
  ]
41
  await db.embeddings.insert_many(entries)
42
  await db.documents.update_one({"_id": document_id}, {"$set": {"status": "READY"}})
43
+ # Log
44
  print(f"[INFO] Finished indexing {len(entries)} chunks from document: {document_id}")
45
+ # Exception
46
  except Exception as e:
47
  print(f"[ERROR] Ingestion failed for {document_id}: {e}")
48
+ db = get_db()
49
  await db.documents.update_one({"_id": document_id}, {"$set": {"status": "FAILED"}})