Paramjit Singh commited on
Commit
5926dae
Β·
unverified Β·
2 Parent(s): f6182823dfe460

Merge pull request #336 from Srushti-Kamble14/feat/celery-redis-pdf-processing

Browse files
.env.example CHANGED
@@ -55,6 +55,16 @@ ALLOWED_ORIGINS=http://localhost:3000,http://localhost:7860
55
  # Optional β€” required only for Google sign-in.
56
  # NEXT_PUBLIC_GOOGLE_CLIENT_ID=your_google_oauth_client_id.apps.googleusercontent.com
57
 
 
 
 
 
 
 
 
 
 
 
58
  # ── File Upload ─────────────────────────────────────────────
59
 
60
  # Directory where uploaded documents (PDFs, DOCXs, etc.) are stored.
 
55
  # Optional β€” required only for Google sign-in.
56
  # NEXT_PUBLIC_GOOGLE_CLIENT_ID=your_google_oauth_client_id.apps.googleusercontent.com
57
 
58
+ # ── Celery / Redis Background Processing ───────────────────
59
+
60
+ # Redis URL used by FastAPI to enqueue PDF processing jobs.
61
+ # Optional β€” defaults to redis://localhost:6379/0
62
+ # CELERY_BROKER_URL=redis://localhost:6379/0
63
+
64
+ # Redis URL used by Celery to store task results/status.
65
+ # Optional β€” defaults to redis://localhost:6379/1
66
+ # CELERY_RESULT_BACKEND=redis://localhost:6379/1
67
+
68
  # ── File Upload ─────────────────────────────────────────────
69
 
70
  # Directory where uploaded documents (PDFs, DOCXs, etc.) are stored.
README.md CHANGED
@@ -362,6 +362,8 @@ DATABASE_URL=sqlite:///./data/app.db
362
  HF_TOKEN=hf_your_huggingface_token_here
363
  UPLOAD_DIR=./data/uploads
364
  CHROMA_PERSIST_DIR=./data/chroma_db
 
 
365
  ```
366
 
367
  > Get your free HuggingFace token at [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)
@@ -410,7 +412,7 @@ npm run dev
410
 
411
  ```bash
412
  docker compose up --build
413
- # β†’ Full stack at http://localhost:7860
414
  ```
415
 
416
  <br/>
@@ -503,6 +505,8 @@ docker compose up --build
503
  | `JWT_EXPIRY_HOURS` | ❌ | `72` | JWT token lifetime in hours before re-login is required. | β€” |
504
  | `GOOGLE_CLIENT_ID` | ❌ | β€” | Google OAuth web client ID used by FastAPI to verify ID tokens. | [Google Cloud Console](https://console.cloud.google.com/apis/credentials) |
505
  | `NEXT_PUBLIC_GOOGLE_CLIENT_ID` | ❌ | β€” | Google OAuth web client ID exposed to the Next.js Google sign-in button. | [Google Cloud Console](https://console.cloud.google.com/apis/credentials) |
 
 
506
  | `UPLOAD_DIR` | ❌ | `./data/uploads` | Local directory for storing uploaded documents. | β€” |
507
  | `MAX_FILE_SIZE_MB` | ❌ | `50` | Maximum allowed upload file size in MB. | β€” |
508
  | `ALLOWED_EXTENSIONS` | ❌ | `pdf,docx,txt,md` | Comma-separated list of permitted file extensions. | β€” |
 
362
  HF_TOKEN=hf_your_huggingface_token_here
363
  UPLOAD_DIR=./data/uploads
364
  CHROMA_PERSIST_DIR=./data/chroma_db
365
+ CELERY_BROKER_URL=redis://localhost:6379/0
366
+ CELERY_RESULT_BACKEND=redis://localhost:6379/1
367
  ```
368
 
369
  > Get your free HuggingFace token at [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)
 
412
 
413
  ```bash
414
  docker compose up --build
415
+ # β†’ FastAPI, Redis, Celery worker, and Postgres at http://localhost:7860
416
  ```
417
 
418
  <br/>
 
505
  | `JWT_EXPIRY_HOURS` | ❌ | `72` | JWT token lifetime in hours before re-login is required. | β€” |
506
  | `GOOGLE_CLIENT_ID` | ❌ | β€” | Google OAuth web client ID used by FastAPI to verify ID tokens. | [Google Cloud Console](https://console.cloud.google.com/apis/credentials) |
507
  | `NEXT_PUBLIC_GOOGLE_CLIENT_ID` | ❌ | β€” | Google OAuth web client ID exposed to the Next.js Google sign-in button. | [Google Cloud Console](https://console.cloud.google.com/apis/credentials) |
508
+ | `CELERY_BROKER_URL` | ❌ | `redis://localhost:6379/0` | Redis broker URL used by FastAPI to queue document ingestion jobs. | Redis |
509
+ | `CELERY_RESULT_BACKEND` | ❌ | `redis://localhost:6379/1` | Redis backend URL used by Celery to store task state/results. | Redis |
510
  | `UPLOAD_DIR` | ❌ | `./data/uploads` | Local directory for storing uploaded documents. | β€” |
511
  | `MAX_FILE_SIZE_MB` | ❌ | `50` | Maximum allowed upload file size in MB. | β€” |
512
  | `ALLOWED_EXTENSIONS` | ❌ | `pdf,docx,txt,md` | Comma-separated list of permitted file extensions. | β€” |
backend/app/celery_app.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Celery application configured for Redis-backed background jobs."""
2
+ from celery import Celery
3
+
4
+ from app.config import get_settings
5
+
6
+
7
+ settings = get_settings()
8
+
9
+ celery_app = Celery(
10
+ "pdf_assistant_rag",
11
+ broker=settings.CELERY_BROKER_URL,
12
+ backend=settings.CELERY_RESULT_BACKEND,
13
+ include=["app.tasks"],
14
+ )
15
+
16
+ celery_app.conf.update(
17
+ task_track_started=settings.CELERY_TASK_TRACK_STARTED,
18
+ task_serializer="json",
19
+ result_serializer="json",
20
+ accept_content=["json"],
21
+ timezone="UTC",
22
+ )
23
+
backend/app/config.py CHANGED
@@ -33,6 +33,11 @@ class Settings(BaseSettings):
33
  DRIVE_SYNC_INTERVAL_MINUTES: int = 60
34
  GOOGLE_SERVICE_ACCOUNT_FILE: str = ""
35
 
 
 
 
 
 
36
  # ── File Upload ──────────────────────────────────────
37
  UPLOAD_DIR: str = "./data/uploads"
38
  MAX_UPLOAD_SIZE_MB: int = 20
 
33
  DRIVE_SYNC_INTERVAL_MINUTES: int = 60
34
  GOOGLE_SERVICE_ACCOUNT_FILE: str = ""
35
 
36
+ # Celery / Redis background processing
37
+ CELERY_BROKER_URL: str = "redis://localhost:6379/0"
38
+ CELERY_RESULT_BACKEND: str = "redis://localhost:6379/1"
39
+ CELERY_TASK_TRACK_STARTED: bool = True
40
+
41
  # ── File Upload ──────────────────────────────────────
42
  UPLOAD_DIR: str = "./data/uploads"
43
  MAX_UPLOAD_SIZE_MB: int = 20
backend/app/routes/documents.py CHANGED
@@ -1,6 +1,6 @@
1
  """
2
  Document management routes β€” upload, list, delete, and serve PDF files.
3
- Background ingestion via FastAPI BackgroundTasks.
4
  """
5
  import os
6
  import sys
@@ -14,7 +14,7 @@ from pathlib import Path
14
  import shutil
15
  import tempfile
16
  from urllib.parse import urlparse
17
- from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks, status, Query
18
  from fastapi.responses import FileResponse
19
  from sqlalchemy.orm import Session
20
 
@@ -23,8 +23,7 @@ from app.models import User, Document
23
  from app.schemas import DocumentResponse, DocumentListResponse, DocumentStatusResponse, ChunkSettings, UploadUrl
24
  from app.auth import get_current_user
25
  from app.config import get_settings
26
- from app.rag.chunker import chunk_document, get_page_count
27
- from app.rag.vectorstore import store_chunks
28
 
29
  try:
30
  from crawl4ai import AsyncWebCrawler
@@ -130,133 +129,6 @@ async def validate_upload(file: UploadFile):
130
  pass
131
 
132
 
133
- def _ingest_document(document_id: str, filepath: str, original_name: str, user_id: str):
134
- """
135
- Process a document in the background: chunk document, generate embeddings, and store in ChromaDB,
136
- calls document summary function, and update the database record.
137
-
138
- This function is intended to be run as a background task.
139
- It creates its own database session, updates the
140
- document status, extracts text, splits into chunks, generates embeddings,
141
- stores everything in ChromaDB, calls summary function, updates the document record with page count,
142
- chunk count, and summary, and marks the document as 'ready'.
143
- On failure, it sets status to 'failed' and records the error message.
144
-
145
- Args:
146
- document_id: Unique identifier of the document in the database.
147
- filepath: Absolute or relative path to the uploaded file on disk.
148
- original_name: original filename provided by the user (for logging and metadata).
149
- user_id: Identifier of the user who owns the document.
150
-
151
- Returns:
152
- None
153
-
154
- Note:
155
- This function does not raise exceptions to the caller;
156
- all errors are logged and the database record is updated accordingly.
157
- """
158
- from app.database import SessionLocal
159
-
160
- db = SessionLocal()
161
- try:
162
- doc = (
163
- db.query(Document)
164
- .filter(Document.id == document_id, Document.is_deleted.is_(False))
165
- .first()
166
- )
167
- if not doc:
168
- logger.error(f"Document {document_id} not found for ingestion")
169
- return
170
-
171
- # Update status to processing
172
- doc.status = "processing"
173
- db.commit()
174
-
175
- # Get page count
176
- page_count = get_page_count(filepath)
177
- doc.page_count = page_count
178
-
179
- # Chunk document with optional chunk size and overlap parameters from the document record, falling back to global defaults if not set
180
- chunk_size = doc.chunk_size
181
- chunk_overlap = doc.chunk_overlap
182
- try:
183
- kwargs = {}
184
- if chunk_size is not None:
185
- kwargs["chunk_size"] = chunk_size
186
- if chunk_overlap is not None:
187
- kwargs["chunk_overlap"] = chunk_overlap
188
-
189
- if kwargs:
190
- chunks = chunk_document(filepath, **kwargs)
191
- else:
192
- chunks = chunk_document(filepath)
193
-
194
- except TypeError:
195
- # Backward-compatible fallback for chunk_document implementations/tests
196
- # that only accept (filepath)
197
- chunks = chunk_document(filepath)
198
-
199
- if not chunks:
200
- doc.status = "failed"
201
- doc.error_message = "No text could be extracted from the document"
202
- db.commit()
203
- return
204
-
205
- # Build and persist a lightweight entity co-occurrence graph for GraphRAG.
206
- try:
207
- from app.rag.graph_builder import build_graph, save_graph
208
-
209
- graph = build_graph(chunks)
210
- save_graph(graph, user_id=user_id, document_id=document_id)
211
- except Exception as e:
212
- logger.warning(f"Could not build knowledge graph for document {document_id}: {e}")
213
-
214
- # Store embeddings in ChromaDB
215
- chunk_count = store_chunks(
216
- chunks=chunks,
217
- document_id=document_id,
218
- filename=original_name,
219
- user_id=user_id,
220
- )
221
-
222
- # Generate summary and update document record
223
- try:
224
- from app.rag.summarizer import generate_document_summary
225
-
226
- summary = generate_document_summary(filepath, max_sentences=2)
227
- if summary:
228
- doc.summary = summary
229
- db.commit() # Update document record with summary
230
- except Exception as e:
231
- logger.warning(f"Could not import summarizer for document {document_id}: {e}")
232
- doc.summary = None
233
-
234
- # Update document record
235
- doc.chunk_count = chunk_count
236
- doc.status = "ready"
237
- db.commit()
238
-
239
- logger.info(f"Document {document_id} ingested: {page_count} pages, {chunk_count} chunks")
240
-
241
- except Exception as e:
242
- logger.error(f"Ingestion error for {document_id}: {e}")
243
- try:
244
- doc = (
245
- db.query(Document)
246
- .filter(Document.id == document_id, Document.is_deleted.is_(False))
247
- .first()
248
- )
249
- if doc:
250
- doc.status = "failed"
251
- doc.error_message = str(e)[:500]
252
- db.commit()
253
- except Exception:
254
- pass
255
- finally:
256
- db.close()
257
-
258
-
259
-
260
  def _crawl_in_new_loop(url: str) -> str:
261
  """Run the async crawler in a fresh event loop on a worker thread.
262
  On Windows this must be a ProactorEventLoop to support subprocesses.
@@ -288,7 +160,6 @@ def _crawl_in_new_loop(url: str) -> str:
288
 
289
  @router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_202_ACCEPTED)
290
  async def upload_document(
291
- background_tasks: BackgroundTasks,
292
  file: UploadFile = File(...),
293
  user: User = Depends(get_current_user),
294
  db: Session = Depends(get_db),
@@ -298,12 +169,11 @@ async def upload_document(
298
 
299
  Validates the uploaded file (extension, size, MIME type, integrity),
300
  saves it to the user's directory, creates a database record with status
301
- 'pending', schedules a background task for chunking and embedding, and
302
- returns 202 Accepted immediately so large documents do not block the API
303
- request while embeddings are generated.
304
 
305
  Args:
306
- background_tasks: FastAPI BackgroundTasks instance to run the ingestion process asynchronously.
307
  file: The uploaded file, provided as a multipart/form-data field in the request.
308
  user: The currently authenticated user, injected by the `get_current_user` dependency.
309
  db: Database session, injected by the `get_db` dependency.
@@ -357,21 +227,19 @@ async def upload_document(
357
  db.commit()
358
  db.refresh(document)
359
 
360
- # ── Trigger background ingestion ─────────────────
361
- background_tasks.add_task(
362
- _ingest_document,
363
  document_id=document.id,
364
  filepath=filepath,
365
  original_name=file.filename,
366
  user_id=user.id,
367
  )
368
 
369
- return DocumentResponse.model_validate(document)
370
 
371
  @router.post("/urlupload", status_code=status.HTTP_202_ACCEPTED)
372
  async def upload_document_url(
373
  payload: UploadUrl,
374
- background_tasks: BackgroundTasks,
375
  user: User = Depends(get_current_user),
376
  db: Session = Depends(get_db),
377
  ):
@@ -443,16 +311,15 @@ async def upload_document_url(
443
  db.commit()
444
  db.refresh(document)
445
 
446
- # ── Trigger background ingestion ───────────────────────
447
- background_tasks.add_task(
448
- _ingest_document,
449
  document_id=document.id,
450
  filepath=filepath,
451
  original_name=original_name,
452
  user_id=user.id,
453
  )
454
 
455
- return DocumentResponse.model_validate(document)
456
 
457
  except HTTPException:
458
  raise
@@ -681,7 +548,6 @@ def delete_document(
681
  def update_chunk_settings(
682
  document_id: str,
683
  settings_update: ChunkSettings,
684
- background_tasks: BackgroundTasks,
685
  user: User = Depends(get_current_user),
686
  db: Session = Depends(get_db),
687
  ):
@@ -692,7 +558,6 @@ def update_chunk_settings(
692
  Args:
693
  document_id: The unique identifier of the document to update.
694
  settings_update: A ChunkSettings object containing the chunk_size and chunk_overlap values.
695
- background_tasks: FastAPI BackgroundTasks instance to run the ingestion process asynchronously.
696
  user: The currently authenticated user, injected by the `get_current_user` dependency.
697
  db: Database session, injected by the `get_db` dependency.
698
 
@@ -733,13 +598,13 @@ def update_chunk_settings(
733
  doc.summary = None
734
  db.commit()
735
 
736
- # Trigger background ingestion with updated chunk settings. The _ingest_document function will read the new chunk settings from the document record and re-chunk the document accordingly.
737
- background_tasks.add_task(
738
- _ingest_document,
739
  document_id=doc.id,
740
  filepath=os.path.join(settings.UPLOAD_DIR, user.id, doc.filename),
741
  original_name=doc.original_name,
742
  user_id=user.id,
743
  )
744
  # Return the updated document record with new chunk settings
745
- return DocumentResponse.model_validate(doc)
 
1
  """
2
  Document management routes β€” upload, list, delete, and serve PDF files.
3
+ Background ingestion via Celery workers.
4
  """
5
  import os
6
  import sys
 
14
  import shutil
15
  import tempfile
16
  from urllib.parse import urlparse
17
+ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, status, Query
18
  from fastapi.responses import FileResponse
19
  from sqlalchemy.orm import Session
20
 
 
23
  from app.schemas import DocumentResponse, DocumentListResponse, DocumentStatusResponse, ChunkSettings, UploadUrl
24
  from app.auth import get_current_user
25
  from app.config import get_settings
26
+ from app.tasks import process_document
 
27
 
28
  try:
29
  from crawl4ai import AsyncWebCrawler
 
129
  pass
130
 
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  def _crawl_in_new_loop(url: str) -> str:
133
  """Run the async crawler in a fresh event loop on a worker thread.
134
  On Windows this must be a ProactorEventLoop to support subprocesses.
 
160
 
161
  @router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_202_ACCEPTED)
162
  async def upload_document(
 
163
  file: UploadFile = File(...),
164
  user: User = Depends(get_current_user),
165
  db: Session = Depends(get_db),
 
169
 
170
  Validates the uploaded file (extension, size, MIME type, integrity),
171
  saves it to the user's directory, creates a database record with status
172
+ 'pending', queues a Celery task for chunking and embedding, and returns
173
+ 202 Accepted immediately so large documents do not block the API request
174
+ while embeddings are generated.
175
 
176
  Args:
 
177
  file: The uploaded file, provided as a multipart/form-data field in the request.
178
  user: The currently authenticated user, injected by the `get_current_user` dependency.
179
  db: Database session, injected by the `get_db` dependency.
 
227
  db.commit()
228
  db.refresh(document)
229
 
230
+ # ── Queue background ingestion ─────────────────
231
+ task = process_document.delay(
 
232
  document_id=document.id,
233
  filepath=filepath,
234
  original_name=file.filename,
235
  user_id=user.id,
236
  )
237
 
238
+ return DocumentResponse.model_validate(document).model_copy(update={"task_id": task.id})
239
 
240
  @router.post("/urlupload", status_code=status.HTTP_202_ACCEPTED)
241
  async def upload_document_url(
242
  payload: UploadUrl,
 
243
  user: User = Depends(get_current_user),
244
  db: Session = Depends(get_db),
245
  ):
 
311
  db.commit()
312
  db.refresh(document)
313
 
314
+ # ── Queue background ingestion ───────────────────────
315
+ task = process_document.delay(
 
316
  document_id=document.id,
317
  filepath=filepath,
318
  original_name=original_name,
319
  user_id=user.id,
320
  )
321
 
322
+ return DocumentResponse.model_validate(document).model_copy(update={"task_id": task.id})
323
 
324
  except HTTPException:
325
  raise
 
548
  def update_chunk_settings(
549
  document_id: str,
550
  settings_update: ChunkSettings,
 
551
  user: User = Depends(get_current_user),
552
  db: Session = Depends(get_db),
553
  ):
 
558
  Args:
559
  document_id: The unique identifier of the document to update.
560
  settings_update: A ChunkSettings object containing the chunk_size and chunk_overlap values.
 
561
  user: The currently authenticated user, injected by the `get_current_user` dependency.
562
  db: Database session, injected by the `get_db` dependency.
563
 
 
598
  doc.summary = None
599
  db.commit()
600
 
601
+ # Queue ingestion with updated chunk settings. The worker reads the new
602
+ # settings from the document record before re-chunking.
603
+ task = process_document.delay(
604
  document_id=doc.id,
605
  filepath=os.path.join(settings.UPLOAD_DIR, user.id, doc.filename),
606
  original_name=doc.original_name,
607
  user_id=user.id,
608
  )
609
  # Return the updated document record with new chunk settings
610
+ return DocumentResponse.model_validate(doc).model_copy(update={"task_id": task.id})
backend/app/schemas.py CHANGED
@@ -119,6 +119,7 @@ class DocumentResponse(BaseModel):
119
  error_message: Optional[str] = None
120
  uploaded_at: datetime
121
  summary: Optional[str] = None # New field for document summary
 
122
 
123
  class Config:
124
  from_attributes = True
 
119
  error_message: Optional[str] = None
120
  uploaded_at: datetime
121
  summary: Optional[str] = None # New field for document summary
122
+ task_id: Optional[str] = None
123
 
124
  class Config:
125
  from_attributes = True
backend/app/services/document_ingestion.py CHANGED
@@ -17,18 +17,31 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id
17
 
18
  db = SessionLocal()
19
  try:
20
- doc = db.query(Document).filter(Document.id == document_id).first()
 
 
 
21
  if not doc:
22
  logger.error("Document %s not found for ingestion", document_id)
23
  return
24
 
25
  doc.status = "processing"
 
26
  db.commit()
27
 
28
  page_count = get_page_count(filepath)
29
  doc.page_count = page_count
30
 
31
- chunks = chunk_document(filepath)
 
 
 
 
 
 
 
 
 
32
 
33
  if not chunks:
34
  doc.status = "failed"
@@ -36,6 +49,14 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id
36
  db.commit()
37
  return
38
 
 
 
 
 
 
 
 
 
39
  chunk_count = store_chunks(
40
  chunks=chunks,
41
  document_id=document_id,
@@ -69,7 +90,10 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id
69
  except Exception as e:
70
  logger.error("Ingestion error for %s: %s", document_id, e)
71
  try:
72
- doc = db.query(Document).filter(Document.id == document_id).first()
 
 
 
73
  if doc:
74
  doc.status = "failed"
75
  doc.error_message = str(e)[:500]
 
17
 
18
  db = SessionLocal()
19
  try:
20
+ doc = db.query(Document).filter(
21
+ Document.id == document_id,
22
+ Document.is_deleted.is_(False),
23
+ ).first()
24
  if not doc:
25
  logger.error("Document %s not found for ingestion", document_id)
26
  return
27
 
28
  doc.status = "processing"
29
+ doc.error_message = None
30
  db.commit()
31
 
32
  page_count = get_page_count(filepath)
33
  doc.page_count = page_count
34
 
35
+ try:
36
+ chunk_kwargs = {}
37
+ if doc.chunk_size is not None:
38
+ chunk_kwargs["chunk_size"] = doc.chunk_size
39
+ if doc.chunk_overlap is not None:
40
+ chunk_kwargs["chunk_overlap"] = doc.chunk_overlap
41
+ chunks = chunk_document(filepath, **chunk_kwargs)
42
+ except TypeError:
43
+ # Preserve compatibility with patched/test implementations.
44
+ chunks = chunk_document(filepath)
45
 
46
  if not chunks:
47
  doc.status = "failed"
 
49
  db.commit()
50
  return
51
 
52
+ try:
53
+ from app.rag.graph_builder import build_graph, save_graph
54
+
55
+ graph = build_graph(chunks)
56
+ save_graph(graph, user_id=user_id, document_id=document_id)
57
+ except Exception as e:
58
+ logger.warning("Could not build knowledge graph for document %s: %s", document_id, e)
59
+
60
  chunk_count = store_chunks(
61
  chunks=chunks,
62
  document_id=document_id,
 
90
  except Exception as e:
91
  logger.error("Ingestion error for %s: %s", document_id, e)
92
  try:
93
+ doc = db.query(Document).filter(
94
+ Document.id == document_id,
95
+ Document.is_deleted.is_(False),
96
+ ).first()
97
  if doc:
98
  doc.status = "failed"
99
  doc.error_message = str(e)[:500]
backend/app/tasks.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Celery tasks for document processing."""
2
+ from app.celery_app import celery_app
3
+ from app.services.document_ingestion import ingest_document
4
+
5
+
6
+ @celery_app.task(bind=True, name="app.tasks.process_document")
7
+ def process_document(
8
+ self,
9
+ document_id: str,
10
+ filepath: str,
11
+ original_name: str,
12
+ user_id: str,
13
+ ) -> dict[str, str]:
14
+ """Run the RAG ingestion pipeline for a stored document."""
15
+ ingest_document(
16
+ document_id=document_id,
17
+ filepath=filepath,
18
+ original_name=original_name,
19
+ user_id=user_id,
20
+ )
21
+ return {"document_id": document_id, "status": "completed"}
22
+
backend/requirements.txt CHANGED
@@ -56,6 +56,7 @@ huggingface-hub
56
  gunicorn
57
  slowapi
58
  prometheus-fastapi-instrumentator
 
59
 
60
  # File Validation
61
  #sudo apt-get install libmagic1 // for Debian/Ubuntu
 
56
  gunicorn
57
  slowapi
58
  prometheus-fastapi-instrumentator
59
+ celery[redis]
60
 
61
  # File Validation
62
  #sudo apt-get install libmagic1 // for Debian/Ubuntu
backend/tests/test_document_upload_validation.py CHANGED
@@ -6,7 +6,7 @@ import uuid
6
  from pathlib import Path
7
 
8
  import pytest
9
- from fastapi import BackgroundTasks, HTTPException, UploadFile
10
  from pypdf import PdfWriter
11
  from sqlalchemy import create_engine
12
  from sqlalchemy.orm import sessionmaker
@@ -141,10 +141,14 @@ def test_upload_document_handles_duplicate_original_names(
141
  monkeypatch.setattr(documents, "validate_upload", fake_validate_upload)
142
  monkeypatch.setattr(documents.settings, "UPLOAD_DIR", str(tmp_path / "uploads"))
143
  monkeypatch.setattr(documents.uuid, "uuid4", lambda: next(uuid_values))
 
 
 
 
 
144
 
145
  first = _run(
146
  documents.upload_document(
147
- BackgroundTasks(),
148
  file=_upload_file("same-name.pdf", b"first"),
149
  user=user,
150
  db=session,
@@ -152,7 +156,6 @@ def test_upload_document_handles_duplicate_original_names(
152
  )
153
  second = _run(
154
  documents.upload_document(
155
- BackgroundTasks(),
156
  file=_upload_file("same-name.pdf", b"second"),
157
  user=user,
158
  db=session,
@@ -164,6 +167,7 @@ def test_upload_document_handles_duplicate_original_names(
164
  assert [doc.original_name for doc in stored_docs] == ["same-name.pdf", "same-name.pdf"]
165
  assert len({doc.filename for doc in stored_docs}) == 2
166
  assert first.original_name == second.original_name == "same-name.pdf"
 
167
  assert (tmp_path / "uploads" / user.id / f"{first_hex}.pdf").exists()
168
  assert (tmp_path / "uploads" / user.id / f"{second_hex}.pdf").exists()
169
  assert all(not path.exists() for path in temp_files)
 
6
  from pathlib import Path
7
 
8
  import pytest
9
+ from fastapi import HTTPException, UploadFile
10
  from pypdf import PdfWriter
11
  from sqlalchemy import create_engine
12
  from sqlalchemy.orm import sessionmaker
 
141
  monkeypatch.setattr(documents, "validate_upload", fake_validate_upload)
142
  monkeypatch.setattr(documents.settings, "UPLOAD_DIR", str(tmp_path / "uploads"))
143
  monkeypatch.setattr(documents.uuid, "uuid4", lambda: next(uuid_values))
144
+ monkeypatch.setattr(
145
+ documents.process_document,
146
+ "delay",
147
+ lambda **_kwargs: types.SimpleNamespace(id="queued-task"),
148
+ )
149
 
150
  first = _run(
151
  documents.upload_document(
 
152
  file=_upload_file("same-name.pdf", b"first"),
153
  user=user,
154
  db=session,
 
156
  )
157
  second = _run(
158
  documents.upload_document(
 
159
  file=_upload_file("same-name.pdf", b"second"),
160
  user=user,
161
  db=session,
 
167
  assert [doc.original_name for doc in stored_docs] == ["same-name.pdf", "same-name.pdf"]
168
  assert len({doc.filename for doc in stored_docs}) == 2
169
  assert first.original_name == second.original_name == "same-name.pdf"
170
+ assert first.task_id == second.task_id == "queued-task"
171
  assert (tmp_path / "uploads" / user.id / f"{first_hex}.pdf").exists()
172
  assert (tmp_path / "uploads" / user.id / f"{second_hex}.pdf").exists()
173
  assert all(not path.exists() for path in temp_files)
backend/tests/test_documents.py CHANGED
@@ -1,7 +1,7 @@
1
  import types
2
 
3
  from app.models import Document
4
- from app.routes.documents import _ingest_document
5
 
6
 
7
  def test_api_health(client):
@@ -56,9 +56,9 @@ def test_ingest_document_builds_and_saves_graph(db_session, monkeypatch, tmp_pat
56
  chunks = [{"text": "OpenAI works with Microsoft.", "page": 1, "chunk_index": 0}]
57
  saved = {}
58
 
59
- monkeypatch.setattr("app.routes.documents.get_page_count", lambda filepath: 1)
60
- monkeypatch.setattr("app.routes.documents.chunk_document", lambda filepath: chunks)
61
- monkeypatch.setattr("app.routes.documents.store_chunks", lambda **kwargs: len(chunks))
62
  monkeypatch.setattr("app.database.SessionLocal", lambda: db_session)
63
 
64
  fake_summary = types.ModuleType("app.rag.summarizer")
@@ -76,7 +76,7 @@ def test_ingest_document_builds_and_saves_graph(db_session, monkeypatch, tmp_pat
76
  ),
77
  )
78
 
79
- _ingest_document(
80
  document_id=document_id,
81
  filepath=str(tmp_path / "graph.txt"),
82
  original_name=document.original_name,
 
1
  import types
2
 
3
  from app.models import Document
4
+ from app.services.document_ingestion import ingest_document
5
 
6
 
7
  def test_api_health(client):
 
56
  chunks = [{"text": "OpenAI works with Microsoft.", "page": 1, "chunk_index": 0}]
57
  saved = {}
58
 
59
+ monkeypatch.setattr("app.services.document_ingestion.get_page_count", lambda filepath: 1)
60
+ monkeypatch.setattr("app.services.document_ingestion.chunk_document", lambda filepath: chunks)
61
+ monkeypatch.setattr("app.services.document_ingestion.store_chunks", lambda **kwargs: len(chunks))
62
  monkeypatch.setattr("app.database.SessionLocal", lambda: db_session)
63
 
64
  fake_summary = types.ModuleType("app.rag.summarizer")
 
76
  ),
77
  )
78
 
79
+ ingest_document(
80
  document_id=document_id,
81
  filepath=str(tmp_path / "graph.txt"),
82
  original_name=document.original_name,
docker-compose.yml CHANGED
@@ -1,6 +1,20 @@
1
  version: '3.8'
2
 
3
  services:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  # ── PostgreSQL Database ──────────────────────────────────
5
  postgres:
6
  image: postgres:16-alpine
@@ -34,11 +48,16 @@ services:
34
  - SECRET_KEY=${SECRET_KEY:-dev-secret-key-change-me}
35
  - HF_TOKEN=${HF_TOKEN}
36
  - DATABASE_URL=postgresql://${POSTGRES_USER:-pdf_rag_user}:${POSTGRES_PASSWORD:-pdf_rag_pass}@postgres:5432/${POSTGRES_DB:-pdf_rag}
37
- - UPLOAD_DIR=./data/uploads
38
- - CHROMA_PERSIST_DIR=./data/chroma_db
 
 
 
39
  depends_on:
40
  postgres:
41
  condition: service_healthy
 
 
42
  restart: unless-stopped
43
  healthcheck:
44
  test: ["CMD", "curl", "-f", "http://localhost:7860/api/health"]
@@ -47,6 +66,31 @@ services:
47
  retries: 3
48
  start_period: 60s
49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  # ── pgAdmin (optional β€” for local DB inspection) ─────────
51
  pgadmin:
52
  image: dpage/pgadmin4:latest
 
1
  version: '3.8'
2
 
3
  services:
4
+ # Redis broker/result backend for Celery document processing
5
+ redis:
6
+ image: redis:7-alpine
7
+ container_name: pdf_rag_redis
8
+ restart: unless-stopped
9
+ ports:
10
+ - "6379:6379"
11
+ healthcheck:
12
+ test: ["CMD", "redis-cli", "ping"]
13
+ interval: 10s
14
+ timeout: 5s
15
+ retries: 5
16
+ start_period: 5s
17
+
18
  # ── PostgreSQL Database ──────────────────────────────────
19
  postgres:
20
  image: postgres:16-alpine
 
48
  - SECRET_KEY=${SECRET_KEY:-dev-secret-key-change-me}
49
  - HF_TOKEN=${HF_TOKEN}
50
  - DATABASE_URL=postgresql://${POSTGRES_USER:-pdf_rag_user}:${POSTGRES_PASSWORD:-pdf_rag_pass}@postgres:5432/${POSTGRES_DB:-pdf_rag}
51
+ - UPLOAD_DIR=/app/data/uploads
52
+ - CHROMA_PERSIST_DIR=/app/data/chroma_db
53
+ - GRAPH_PERSIST_DIR=/app/data/graphs
54
+ - CELERY_BROKER_URL=redis://redis:6379/0
55
+ - CELERY_RESULT_BACKEND=redis://redis:6379/1
56
  depends_on:
57
  postgres:
58
  condition: service_healthy
59
+ redis:
60
+ condition: service_healthy
61
  restart: unless-stopped
62
  healthcheck:
63
  test: ["CMD", "curl", "-f", "http://localhost:7860/api/health"]
 
66
  retries: 3
67
  start_period: 60s
68
 
69
+ # Celery worker for document extraction, chunking, embeddings, and vector storage
70
+ worker:
71
+ build: .
72
+ container_name: pdf_rag_worker
73
+ command: >
74
+ sh -c "cd /app/backend &&
75
+ celery -A app.celery_app.celery_app worker --loglevel=info"
76
+ volumes:
77
+ - app_data:/app/data
78
+ environment:
79
+ - SECRET_KEY=${SECRET_KEY:-dev-secret-key-change-me}
80
+ - HF_TOKEN=${HF_TOKEN}
81
+ - DATABASE_URL=postgresql://${POSTGRES_USER:-pdf_rag_user}:${POSTGRES_PASSWORD:-pdf_rag_pass}@postgres:5432/${POSTGRES_DB:-pdf_rag}
82
+ - UPLOAD_DIR=/app/data/uploads
83
+ - CHROMA_PERSIST_DIR=/app/data/chroma_db
84
+ - GRAPH_PERSIST_DIR=/app/data/graphs
85
+ - CELERY_BROKER_URL=redis://redis:6379/0
86
+ - CELERY_RESULT_BACKEND=redis://redis:6379/1
87
+ depends_on:
88
+ postgres:
89
+ condition: service_healthy
90
+ redis:
91
+ condition: service_healthy
92
+ restart: unless-stopped
93
+
94
  # ── pgAdmin (optional β€” for local DB inspection) ─────────
95
  pgadmin:
96
  image: dpage/pgadmin4:latest
docs/ARCHITECTURE.md CHANGED
@@ -52,7 +52,8 @@ sequenceDiagram
52
  participant UI as Frontend
53
  participant API as FastAPI documents route
54
  participant DB as SQL metadata
55
- participant Worker as Background task
 
56
  participant Files as Upload storage
57
  participant Vector as ChromaDB
58
 
@@ -60,8 +61,9 @@ sequenceDiagram
60
  API->>API: Validate filename, extension, size, MIME, and parser readability
61
  API->>Files: Persist original file under the user's upload directory
62
  API->>DB: Create document row with processing status
63
- API-->>UI: 202 Accepted with document metadata
64
- API->>Worker: Queue ingestion task
 
65
  Worker->>Files: Read saved document
66
  Worker->>Worker: Extract pages, chunk text, build graph summary data
67
  Worker->>Vector: Store chunks with document and user metadata
@@ -70,9 +72,9 @@ sequenceDiagram
70
 
71
  The upload route is intentionally strict before it writes long-lived state:
72
  extension checks, size checks, MIME checks, and parser checks happen before the
73
- file is moved into permanent storage. The background task owns expensive work
74
- such as text extraction, chunking, embedding, graph building, and summary
75
- generation.
76
 
77
  ## Chat And Retrieval Flow
78
 
 
52
  participant UI as Frontend
53
  participant API as FastAPI documents route
54
  participant DB as SQL metadata
55
+ participant Redis as Redis broker
56
+ participant Worker as Celery worker
57
  participant Files as Upload storage
58
  participant Vector as ChromaDB
59
 
 
61
  API->>API: Validate filename, extension, size, MIME, and parser readability
62
  API->>Files: Persist original file under the user's upload directory
63
  API->>DB: Create document row with processing status
64
+ API->>Redis: Queue Celery ingestion task
65
+ API-->>UI: 202 Accepted with document metadata and task_id
66
+ Redis->>Worker: Deliver ingestion task
67
  Worker->>Files: Read saved document
68
  Worker->>Worker: Extract pages, chunk text, build graph summary data
69
  Worker->>Vector: Store chunks with document and user metadata
 
72
 
73
  The upload route is intentionally strict before it writes long-lived state:
74
  extension checks, size checks, MIME checks, and parser checks happen before the
75
+ file is moved into permanent storage. Celery uses Redis as the broker/result
76
+ backend, and the worker owns expensive work such as text extraction, chunking,
77
+ embedding, graph building, and summary generation.
78
 
79
  ## Chat And Retrieval Flow
80