Hamza4100 commited on
Commit
7c6f6e2
·
verified ·
1 Parent(s): ff19c99

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +42 -47
main.py CHANGED
@@ -12,7 +12,7 @@ Secure multi-user API with:
12
  import os
13
  import asyncio
14
  from typing import List, Optional, Dict
15
- from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
16
  from fastapi.middleware.cors import CORSMiddleware
17
  from pydantic import BaseModel
18
  from threading import Lock
@@ -20,6 +20,7 @@ from rag_engine import RAGEngine
20
  from hf_storage import create_hf_storage_manager
21
  from auth import get_current_user
22
  from user_management import create_hf_user_manager
 
23
  from dotenv import load_dotenv
24
 
25
  # ============================================
@@ -167,6 +168,13 @@ async def startup_event():
167
 
168
  print("✅ Server ready (per-user lazy loading)")
169
 
 
 
 
 
 
 
 
170
 
171
  # ============================================
172
  # ENDPOINTS
@@ -181,7 +189,8 @@ async def health_check():
181
  @app.post("/upload", response_model=UploadResponse)
182
  async def upload_pdf(
183
  file: UploadFile = File(...),
184
- user_id: str = Depends(get_current_user)
 
185
  ):
186
  """
187
  Upload PDF for authenticated user.
@@ -194,59 +203,45 @@ async def upload_pdf(
194
  if file.content_type and file.content_type not in ['application/pdf']:
195
  raise HTTPException(400, "Invalid MIME type")
196
 
197
- # Read content
198
  content = await file.read()
199
-
200
  # Size limit (10MB)
201
  if len(content) > 10 * 1024 * 1024:
202
  raise HTTPException(413, "File too large (max 10MB)")
203
-
204
  try:
205
- engine = await rag_manager.get_engine(user_id)
206
- user_lock = rag_manager.get_user_lock(user_id)
207
-
208
- with user_lock:
209
- result = await asyncio.to_thread(
210
- engine.upload_document,
211
- filename=file.filename,
212
- file_content=content,
213
- action="auto"
214
- )
215
-
216
- if result["status"] == "success":
217
- # Save PDF to user's uploaded_pdfs folder
218
- base_dir = os.path.dirname(os.path.abspath(__file__))
219
- user_pdfs_dir = os.path.join(base_dir, "users", user_id, "uploaded_pdfs")
220
- os.makedirs(user_pdfs_dir, exist_ok=True)
221
-
222
- pdf_path = os.path.join(user_pdfs_dir, file.filename)
223
- with open(pdf_path, "wb") as f:
224
- f.write(content)
225
-
226
- await asyncio.to_thread(
227
- hf_storage.push_storage_to_hf,
228
- user_id,
229
- f"Upload {file.filename}"
230
- )
231
-
232
- print(f"✅ Upload success for user {user_id}: {file.filename}")
233
-
234
- return UploadResponse(
235
- document_id=result.get("doc_id", ""),
236
- filename=file.filename,
237
- status="success",
238
- message="Uploaded successfully",
239
- pages=result.get("pages"),
240
- chunks=result.get("chunks")
241
- )
242
- else:
243
- raise HTTPException(400, result.get("message", "Upload failed"))
244
-
245
  except HTTPException:
246
  raise
247
  except Exception as e:
248
- print(f"❌ Upload error (user {user_id}): {e}")
249
- raise HTTPException(500, "Upload failed")
 
 
 
 
 
 
 
 
250
 
251
 
252
  @app.post("/query", response_model=QueryResponse)
 
12
  import os
13
  import asyncio
14
  from typing import List, Optional, Dict
15
+ from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, BackgroundTasks
16
  from fastapi.middleware.cors import CORSMiddleware
17
  from pydantic import BaseModel
18
  from threading import Lock
 
20
  from hf_storage import create_hf_storage_manager
21
  from auth import get_current_user
22
  from user_management import create_hf_user_manager
23
+ from job_worker import job_manager
24
  from dotenv import load_dotenv
25
 
26
  # ============================================
 
168
 
169
  print("✅ Server ready (per-user lazy loading)")
170
 
171
+ # Start persistent job manager
172
+ try:
173
+ job_manager.start()
174
+ print("🔁 Background job manager started")
175
+ except Exception as e:
176
+ print(f"⚠️ Failed to start job manager: {e}")
177
+
178
 
179
  # ============================================
180
  # ENDPOINTS
 
189
  @app.post("/upload", response_model=UploadResponse)
190
  async def upload_pdf(
191
  file: UploadFile = File(...),
192
+ user_id: str = Depends(get_current_user),
193
+ background_tasks: BackgroundTasks = None
194
  ):
195
  """
196
  Upload PDF for authenticated user.
 
203
  if file.content_type and file.content_type not in ['application/pdf']:
204
  raise HTTPException(400, "Invalid MIME type")
205
 
206
+ # Read content quickly and persist to a temp file, then enqueue a job for background processing
207
  content = await file.read()
208
+
209
  # Size limit (10MB)
210
  if len(content) > 10 * 1024 * 1024:
211
  raise HTTPException(413, "File too large (max 10MB)")
212
+
213
  try:
214
+ # Save to a temp file so background worker can access it
215
+ base_dir = os.path.dirname(os.path.abspath(__file__))
216
+ tmp_dir = os.path.join(base_dir, 'tmp_uploads')
217
+ os.makedirs(tmp_dir, exist_ok=True)
218
+ tmp_path = os.path.join(tmp_dir, f"{int(time.time())}_{file.filename}")
219
+ with open(tmp_path, 'wb') as f:
220
+ f.write(content)
221
+
222
+ # Create persistent job and return immediately
223
+ job = job_manager.create_job(user_id=user_id, filename=file.filename, file_path=tmp_path)
224
+
225
+ return UploadResponse(
226
+ document_id=job['id'],
227
+ filename=file.filename,
228
+ status="processing",
229
+ message="Upload accepted and is being processed in background"
230
+ )
231
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
  except HTTPException:
233
  raise
234
  except Exception as e:
235
+ print(f"❌ Upload enqueue error (user {user_id}): {e}")
236
+ raise HTTPException(500, "Failed to enqueue upload")
237
+
238
+
239
+ @app.get('/upload-status/{job_id}')
240
+ async def upload_status(job_id: str):
241
+ job = job_manager.get_job(job_id)
242
+ if not job:
243
+ raise HTTPException(404, "Job not found")
244
+ return job
245
 
246
 
247
  @app.post("/query", response_model=QueryResponse)