Hamza4100 commited on
Commit
60e7cfb
Β·
verified Β·
1 Parent(s): 6ad61bb

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +375 -364
main.py CHANGED
@@ -1,364 +1,375 @@
1
- """
2
- FastAPI Backend for Multi-PDF RAG System with Per-User Storage
3
- ===============================================================
4
- Secure multi-user API with:
5
- - API key authentication
6
- - Per-user storage isolation
7
- - PDF upload and management
8
- - RAG-based question answering
9
- - HF persistent storage
10
- """
11
-
12
- import os
13
- import asyncio
14
- from typing import List, Optional, Dict
15
- from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
16
- from fastapi.middleware.cors import CORSMiddleware
17
- from pydantic import BaseModel
18
- from threading import Lock
19
- from rag_engine import RAGEngine
20
- from hf_storage import create_hf_storage_manager
21
- from auth import get_current_user
22
- from dotenv import load_dotenv
23
-
24
- # ============================================
25
- # CONFIGURATION
26
- # ============================================
27
-
28
- load_dotenv()
29
-
30
- GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
31
- HF_TOKEN = os.environ.get("HF_TOKEN")
32
- HF_REPO = os.environ.get("HF_REPO", "Hamza4100/multi-pdf-storage")
33
-
34
- if not GEMINI_API_KEY:
35
- raise RuntimeError("❌ GEMINI_API_KEY not set")
36
-
37
- hf_storage = create_hf_storage_manager(hf_token=HF_TOKEN, hf_repo=HF_REPO)
38
-
39
- app = FastAPI(
40
- title="Multi-PDF RAG System",
41
- description="Secure multi-user RAG API with persistent storage",
42
- version="2.0.0"
43
- )
44
-
45
- app.add_middleware(
46
- CORSMiddleware,
47
- allow_origins=["*"],
48
- allow_credentials=True,
49
- allow_methods=["*"],
50
- allow_headers=["*"],
51
- )
52
-
53
- # ============================================
54
- # PER-USER RAG ENGINE MANAGER
55
- # ============================================
56
-
57
- class UserRAGManager:
58
- """Manages per-user RAG engine instances with lazy loading."""
59
-
60
- def __init__(self):
61
- self.engines: Dict[str, RAGEngine] = {}
62
- self.locks: Dict[str, Lock] = {}
63
- self.global_lock = Lock()
64
-
65
- def get_user_lock(self, user_id: str) -> Lock:
66
- """Get or create lock for user."""
67
- with self.global_lock:
68
- if user_id not in self.locks:
69
- self.locks[user_id] = Lock()
70
- return self.locks[user_id]
71
-
72
- async def get_engine(self, user_id: str) -> RAGEngine:
73
- """Get or create RAG engine for user (lazy loading)."""
74
- if user_id in self.engines:
75
- return self.engines[user_id]
76
-
77
- user_lock = self.get_user_lock(user_id)
78
-
79
- with user_lock:
80
- if user_id in self.engines:
81
- return self.engines[user_id]
82
-
83
- print(f"πŸ”§ Initializing RAG for user {user_id}...")
84
-
85
- # Sync from HF
86
- await asyncio.to_thread(hf_storage.sync_storage_from_hf, user_id)
87
-
88
- # User-specific paths
89
- base_dir = os.path.dirname(os.path.abspath(__file__))
90
- user_storage_dir = os.path.join(base_dir, "users", user_id, "storage")
91
-
92
- # Initialize engine
93
- engine = await asyncio.to_thread(
94
- RAGEngine,
95
- gemini_api_key=GEMINI_API_KEY,
96
- storage_dir=user_storage_dir
97
- )
98
-
99
- self.engines[user_id] = engine
100
- print(f"βœ… RAG ready for user {user_id}")
101
-
102
- return engine
103
-
104
-
105
- rag_manager = UserRAGManager()
106
-
107
- # ============================================
108
- # MODELS
109
- # ============================================
110
-
111
- class UploadResponse(BaseModel):
112
- document_id: str
113
- filename: str
114
- status: str
115
- message: str
116
- pages: Optional[int] = None
117
- chunks: Optional[int] = None
118
-
119
-
120
- class QueryRequest(BaseModel):
121
- question: str
122
- top_k: Optional[int] = 5
123
-
124
-
125
- class QueryResponse(BaseModel):
126
- answer: str
127
- sources: List[dict]
128
-
129
-
130
- class DocumentInfo(BaseModel):
131
- doc_id: str
132
- filename: str
133
- upload_timestamp: str
134
- num_chunks: int
135
- num_pages: int
136
-
137
-
138
- class StatsResponse(BaseModel):
139
- total_documents: int
140
- total_chunks: int
141
- index_size: int
142
-
143
-
144
- class DeleteResponse(BaseModel):
145
- status: str
146
- message: str
147
-
148
-
149
- # ============================================
150
- # STARTUP
151
- # ============================================
152
-
153
- @app.on_event("startup")
154
- async def startup_event():
155
- print("πŸš€ Multi-PDF RAG System v2.0")
156
- print(f"πŸ“¦ HF Storage: {'Enabled' if hf_storage.enabled else 'Disabled'}")
157
- print("βœ… Server ready (per-user lazy loading)")
158
-
159
-
160
- # ============================================
161
- # ENDPOINTS
162
- # ============================================
163
-
164
- @app.get("/health")
165
- async def health_check():
166
- """Health check (no auth required)."""
167
- return {"status": "ok"}
168
-
169
-
170
- @app.post("/upload", response_model=UploadResponse)
171
- async def upload_pdf(
172
- file: UploadFile = File(...),
173
- user_id: str = Depends(get_current_user)
174
- ):
175
- """
176
- Upload PDF for authenticated user.
177
- Requires: X-API-KEY header
178
- """
179
- # Validate PDF
180
- if not file.filename.lower().endswith('.pdf'):
181
- raise HTTPException(400, "Only PDF files allowed")
182
-
183
- if file.content_type and file.content_type not in ['application/pdf']:
184
- raise HTTPException(400, "Invalid MIME type")
185
-
186
- # Read content
187
- content = await file.read()
188
-
189
- # Size limit (10MB)
190
- if len(content) > 10 * 1024 * 1024:
191
- raise HTTPException(413, "File too large (max 10MB)")
192
-
193
- try:
194
- engine = await rag_manager.get_engine(user_id)
195
- user_lock = rag_manager.get_user_lock(user_id)
196
-
197
- with user_lock:
198
- result = await asyncio.to_thread(
199
- engine.upload_document,
200
- filename=file.filename,
201
- file_content=content,
202
- action="auto"
203
- )
204
-
205
- if result["status"] == "success":
206
- # Save PDF to user's uploaded_pdfs folder
207
- base_dir = os.path.dirname(os.path.abspath(__file__))
208
- user_pdfs_dir = os.path.join(base_dir, "users", user_id, "uploaded_pdfs")
209
- os.makedirs(user_pdfs_dir, exist_ok=True)
210
-
211
- pdf_path = os.path.join(user_pdfs_dir, file.filename)
212
- with open(pdf_path, "wb") as f:
213
- f.write(content)
214
-
215
- await asyncio.to_thread(
216
- hf_storage.push_storage_to_hf,
217
- user_id,
218
- f"Upload {file.filename}"
219
- )
220
-
221
- print(f"βœ… Upload success for user {user_id}: {file.filename}")
222
-
223
- return UploadResponse(
224
- document_id=result.get("doc_id", ""),
225
- filename=file.filename,
226
- status="success",
227
- message="Uploaded successfully",
228
- pages=result.get("pages"),
229
- chunks=result.get("chunks")
230
- )
231
- else:
232
- raise HTTPException(400, result.get("message", "Upload failed"))
233
-
234
- except HTTPException:
235
- raise
236
- except Exception as e:
237
- print(f"❌ Upload error (user {user_id}): {e}")
238
- raise HTTPException(500, "Upload failed")
239
-
240
-
241
- @app.post("/query", response_model=QueryResponse)
242
- async def query_documents(
243
- request: QueryRequest,
244
- user_id: str = Depends(get_current_user)
245
- ):
246
- """
247
- Query user's documents using RAG.
248
- Requires: X-API-KEY header
249
- """
250
- try:
251
- engine = await rag_manager.get_engine(user_id)
252
-
253
- result = await asyncio.to_thread(
254
- engine.ask,
255
- query=request.question,
256
- top_k=request.top_k
257
- )
258
-
259
- print(f"βœ… Query success for user {user_id}")
260
-
261
- return QueryResponse(
262
- answer=result["answer"],
263
- sources=result.get("sources", [])
264
- )
265
-
266
- except Exception as e:
267
- print(f"❌ Query error (user {user_id}): {e}")
268
- raise HTTPException(500, "Query failed")
269
-
270
-
271
- @app.get("/documents", response_model=List[DocumentInfo])
272
- async def get_documents(user_id: str = Depends(get_current_user)):
273
- """
274
- Get all documents for authenticated user.
275
- Requires: X-API-KEY header
276
- """
277
- try:
278
- engine = await rag_manager.get_engine(user_id)
279
- documents = await asyncio.to_thread(engine.get_all_documents)
280
-
281
- return [
282
- DocumentInfo(
283
- doc_id=doc["doc_id"],
284
- filename=doc["filename"],
285
- upload_timestamp=doc["upload_timestamp"],
286
- num_chunks=doc["num_chunks"],
287
- num_pages=doc["num_pages"]
288
- )
289
- for doc in documents
290
- ]
291
-
292
- except Exception as e:
293
- print(f"❌ Get documents error (user {user_id}): {e}")
294
- raise HTTPException(500, "Failed to retrieve documents")
295
-
296
-
297
- @app.delete("/documents/{doc_id}", response_model=DeleteResponse)
298
- async def delete_document(
299
- doc_id: str,
300
- user_id: str = Depends(get_current_user)
301
- ):
302
- """
303
- Delete document for authenticated user.
304
- Requires: X-API-KEY header
305
- """
306
- try:
307
- engine = await rag_manager.get_engine(user_id)
308
- user_lock = rag_manager.get_user_lock(user_id)
309
-
310
- with user_lock:
311
- result = await asyncio.to_thread(engine.delete_document, doc_id)
312
-
313
- if result["status"] == "success":
314
- await asyncio.to_thread(
315
- hf_storage.push_storage_to_hf,
316
- user_id,
317
- f"Delete {doc_id}"
318
- )
319
-
320
- print(f"βœ… Delete success for user {user_id}: {doc_id}")
321
-
322
- return DeleteResponse(
323
- status="success",
324
- message=result["message"]
325
- )
326
- else:
327
- raise HTTPException(404, result["message"])
328
-
329
- except HTTPException:
330
- raise
331
- except Exception as e:
332
- print(f"❌ Delete error (user {user_id}): {e}")
333
- raise HTTPException(500, "Deletion failed")
334
-
335
-
336
- @app.get("/stats", response_model=StatsResponse)
337
- async def get_stats(user_id: str = Depends(get_current_user)):
338
- """
339
- Get stats for authenticated user.
340
- Requires: X-API-KEY header
341
- """
342
- try:
343
- engine = await rag_manager.get_engine(user_id)
344
- stats = await asyncio.to_thread(engine.get_stats)
345
-
346
- return StatsResponse(
347
- total_documents=stats["total_documents"],
348
- total_chunks=stats["total_chunks"],
349
- index_size=stats["index_size"]
350
- )
351
-
352
- except Exception as e:
353
- print(f"❌ Stats error (user {user_id}): {e}")
354
- raise HTTPException(500, "Failed to retrieve stats")
355
-
356
-
357
- # ============================================
358
- # MAIN
359
- # ============================================
360
-
361
- if __name__ == "__main__":
362
- import uvicorn
363
- port = int(os.environ.get("PORT", 8000))
364
- uvicorn.run("main:app", host="0.0.0.0", port=port, reload=True)
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ FastAPI Backend for Multi-PDF RAG System with Per-User Storage
3
+ ===============================================================
4
+ Secure multi-user API with:
5
+ - API key authentication
6
+ - Per-user storage isolation
7
+ - PDF upload and management
8
+ - RAG-based question answering
9
+ - HF persistent storage
10
+ """
11
+
12
+ import os
13
+ import asyncio
14
+ from typing import List, Optional, Dict
15
+ from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
16
+ from fastapi.middleware.cors import CORSMiddleware
17
+ from pydantic import BaseModel
18
+ from threading import Lock
19
+ from rag_engine import RAGEngine
20
+ from hf_storage import create_hf_storage_manager
21
+ from auth import get_current_user
22
+ from user_management import create_hf_user_manager
23
+ from dotenv import load_dotenv
24
+
25
+ # ============================================
26
+ # CONFIGURATION
27
+ # ============================================
28
+
29
+ load_dotenv()
30
+
31
+ GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
32
+ HF_TOKEN = os.environ.get("HF_TOKEN")
33
+ HF_REPO = os.environ.get("HF_REPO", "Hamza4100/multi-pdf-storage")
34
+
35
+ if not GEMINI_API_KEY:
36
+ raise RuntimeError("❌ GEMINI_API_KEY not set")
37
+
38
+ hf_storage = create_hf_storage_manager(hf_token=HF_TOKEN, hf_repo=HF_REPO)
39
+ hf_user_manager = create_hf_user_manager(hf_token=HF_TOKEN, hf_repo=HF_REPO)
40
+
41
+ app = FastAPI(
42
+ title="Multi-PDF RAG System",
43
+ description="Secure multi-user RAG API with persistent storage",
44
+ version="2.0.0"
45
+ )
46
+
47
+ app.add_middleware(
48
+ CORSMiddleware,
49
+ allow_origins=["*"],
50
+ allow_credentials=True,
51
+ allow_methods=["*"],
52
+ allow_headers=["*"],
53
+ )
54
+
55
+ # ============================================
56
+ # PER-USER RAG ENGINE MANAGER
57
+ # ============================================
58
+
59
+ class UserRAGManager:
60
+ """Manages per-user RAG engine instances with lazy loading."""
61
+
62
+ def __init__(self):
63
+ self.engines: Dict[str, RAGEngine] = {}
64
+ self.locks: Dict[str, Lock] = {}
65
+ self.global_lock = Lock()
66
+
67
+ def get_user_lock(self, user_id: str) -> Lock:
68
+ """Get or create lock for user."""
69
+ with self.global_lock:
70
+ if user_id not in self.locks:
71
+ self.locks[user_id] = Lock()
72
+ return self.locks[user_id]
73
+
74
+ async def get_engine(self, user_id: str) -> RAGEngine:
75
+ """Get or create RAG engine for user (lazy loading)."""
76
+ if user_id in self.engines:
77
+ return self.engines[user_id]
78
+
79
+ user_lock = self.get_user_lock(user_id)
80
+
81
+ with user_lock:
82
+ if user_id in self.engines:
83
+ return self.engines[user_id]
84
+
85
+ print(f"πŸ”§ Initializing RAG for user {user_id}...")
86
+
87
+ # Sync from HF
88
+ await asyncio.to_thread(hf_storage.sync_storage_from_hf, user_id)
89
+
90
+ # User-specific paths
91
+ base_dir = os.path.dirname(os.path.abspath(__file__))
92
+ user_storage_dir = os.path.join(base_dir, "users", user_id, "storage")
93
+
94
+ # Initialize engine
95
+ engine = await asyncio.to_thread(
96
+ RAGEngine,
97
+ gemini_api_key=GEMINI_API_KEY,
98
+ storage_dir=user_storage_dir
99
+ )
100
+
101
+ self.engines[user_id] = engine
102
+ print(f"βœ… RAG ready for user {user_id}")
103
+
104
+ return engine
105
+
106
+
107
+ rag_manager = UserRAGManager()
108
+
109
+ # ============================================
110
+ # MODELS
111
+ # ============================================
112
+
113
+ class UploadResponse(BaseModel):
114
+ document_id: str
115
+ filename: str
116
+ status: str
117
+ message: str
118
+ pages: Optional[int] = None
119
+ chunks: Optional[int] = None
120
+
121
+
122
+ class QueryRequest(BaseModel):
123
+ question: str
124
+ top_k: Optional[int] = 5
125
+
126
+
127
+ class QueryResponse(BaseModel):
128
+ answer: str
129
+ sources: List[dict]
130
+
131
+
132
+ class DocumentInfo(BaseModel):
133
+ doc_id: str
134
+ filename: str
135
+ upload_timestamp: str
136
+ num_chunks: int
137
+ num_pages: int
138
+
139
+
140
+ class StatsResponse(BaseModel):
141
+ total_documents: int
142
+ total_chunks: int
143
+ index_size: int
144
+
145
+
146
+ class DeleteResponse(BaseModel):
147
+ status: str
148
+ message: str
149
+
150
+
151
+ # ============================================
152
+ # STARTUP
153
+ # ============================================
154
+
155
+ @app.on_event("startup")
156
+ async def startup_event():
157
+ print("πŸš€ Multi-PDF RAG System v2.0")
158
+ print(f"πŸ“¦ HF Storage: {'Enabled' if hf_storage.enabled else 'Disabled'}")
159
+
160
+ # Load and display user count from HF
161
+ if hf_user_manager.enabled:
162
+ users_count = len(hf_user_manager.get_all_users())
163
+ print(f"πŸ“₯ Loaded {users_count} user(s) from HF repository")
164
+ print(f"βœ… User management: Enabled (HF-based)")
165
+ else:
166
+ print(f"⚠️ User management: Disabled (check HF_TOKEN and HF_REPO)")
167
+
168
+ print("βœ… Server ready (per-user lazy loading)")
169
+
170
+
171
+ # ============================================
172
+ # ENDPOINTS
173
+ # ============================================
174
+
175
+ @app.get("/health")
176
+ async def health_check():
177
+ """Health check (no auth required)."""
178
+ return {"status": "ok"}
179
+
180
+
181
+ @app.post("/upload", response_model=UploadResponse)
182
+ async def upload_pdf(
183
+ file: UploadFile = File(...),
184
+ user_id: str = Depends(get_current_user)
185
+ ):
186
+ """
187
+ Upload PDF for authenticated user.
188
+ Requires: X-API-KEY header
189
+ """
190
+ # Validate PDF
191
+ if not file.filename.lower().endswith('.pdf'):
192
+ raise HTTPException(400, "Only PDF files allowed")
193
+
194
+ if file.content_type and file.content_type not in ['application/pdf']:
195
+ raise HTTPException(400, "Invalid MIME type")
196
+
197
+ # Read content
198
+ content = await file.read()
199
+
200
+ # Size limit (10MB)
201
+ if len(content) > 10 * 1024 * 1024:
202
+ raise HTTPException(413, "File too large (max 10MB)")
203
+
204
+ try:
205
+ engine = await rag_manager.get_engine(user_id)
206
+ user_lock = rag_manager.get_user_lock(user_id)
207
+
208
+ with user_lock:
209
+ result = await asyncio.to_thread(
210
+ engine.upload_document,
211
+ filename=file.filename,
212
+ file_content=content,
213
+ action="auto"
214
+ )
215
+
216
+ if result["status"] == "success":
217
+ # Save PDF to user's uploaded_pdfs folder
218
+ base_dir = os.path.dirname(os.path.abspath(__file__))
219
+ user_pdfs_dir = os.path.join(base_dir, "users", user_id, "uploaded_pdfs")
220
+ os.makedirs(user_pdfs_dir, exist_ok=True)
221
+
222
+ pdf_path = os.path.join(user_pdfs_dir, file.filename)
223
+ with open(pdf_path, "wb") as f:
224
+ f.write(content)
225
+
226
+ await asyncio.to_thread(
227
+ hf_storage.push_storage_to_hf,
228
+ user_id,
229
+ f"Upload {file.filename}"
230
+ )
231
+
232
+ print(f"βœ… Upload success for user {user_id}: {file.filename}")
233
+
234
+ return UploadResponse(
235
+ document_id=result.get("doc_id", ""),
236
+ filename=file.filename,
237
+ status="success",
238
+ message="Uploaded successfully",
239
+ pages=result.get("pages"),
240
+ chunks=result.get("chunks")
241
+ )
242
+ else:
243
+ raise HTTPException(400, result.get("message", "Upload failed"))
244
+
245
+ except HTTPException:
246
+ raise
247
+ except Exception as e:
248
+ print(f"❌ Upload error (user {user_id}): {e}")
249
+ raise HTTPException(500, "Upload failed")
250
+
251
+
252
+ @app.post("/query", response_model=QueryResponse)
253
+ async def query_documents(
254
+ request: QueryRequest,
255
+ user_id: str = Depends(get_current_user)
256
+ ):
257
+ """
258
+ Query user's documents using RAG.
259
+ Requires: X-API-KEY header
260
+ """
261
+ try:
262
+ engine = await rag_manager.get_engine(user_id)
263
+
264
+ result = await asyncio.to_thread(
265
+ engine.ask,
266
+ query=request.question,
267
+ top_k=request.top_k
268
+ )
269
+
270
+ print(f"βœ… Query success for user {user_id}")
271
+
272
+ return QueryResponse(
273
+ answer=result["answer"],
274
+ sources=result.get("sources", [])
275
+ )
276
+
277
+ except Exception as e:
278
+ print(f"❌ Query error (user {user_id}): {e}")
279
+ raise HTTPException(500, "Query failed")
280
+
281
+
282
+ @app.get("/documents", response_model=List[DocumentInfo])
283
+ async def get_documents(user_id: str = Depends(get_current_user)):
284
+ """
285
+ Get all documents for authenticated user.
286
+ Requires: X-API-KEY header
287
+ """
288
+ try:
289
+ engine = await rag_manager.get_engine(user_id)
290
+ documents = await asyncio.to_thread(engine.get_all_documents)
291
+
292
+ return [
293
+ DocumentInfo(
294
+ doc_id=doc["doc_id"],
295
+ filename=doc["filename"],
296
+ upload_timestamp=doc["upload_timestamp"],
297
+ num_chunks=doc["num_chunks"],
298
+ num_pages=doc["num_pages"]
299
+ )
300
+ for doc in documents
301
+ ]
302
+
303
+ except Exception as e:
304
+ print(f"❌ Get documents error (user {user_id}): {e}")
305
+ raise HTTPException(500, "Failed to retrieve documents")
306
+
307
+
308
+ @app.delete("/documents/{doc_id}", response_model=DeleteResponse)
309
+ async def delete_document(
310
+ doc_id: str,
311
+ user_id: str = Depends(get_current_user)
312
+ ):
313
+ """
314
+ Delete document for authenticated user.
315
+ Requires: X-API-KEY header
316
+ """
317
+ try:
318
+ engine = await rag_manager.get_engine(user_id)
319
+ user_lock = rag_manager.get_user_lock(user_id)
320
+
321
+ with user_lock:
322
+ result = await asyncio.to_thread(engine.delete_document, doc_id)
323
+
324
+ if result["status"] == "success":
325
+ await asyncio.to_thread(
326
+ hf_storage.push_storage_to_hf,
327
+ user_id,
328
+ f"Delete {doc_id}"
329
+ )
330
+
331
+ print(f"βœ… Delete success for user {user_id}: {doc_id}")
332
+
333
+ return DeleteResponse(
334
+ status="success",
335
+ message=result["message"]
336
+ )
337
+ else:
338
+ raise HTTPException(404, result["message"])
339
+
340
+ except HTTPException:
341
+ raise
342
+ except Exception as e:
343
+ print(f"❌ Delete error (user {user_id}): {e}")
344
+ raise HTTPException(500, "Deletion failed")
345
+
346
+
347
+ @app.get("/stats", response_model=StatsResponse)
348
+ async def get_stats(user_id: str = Depends(get_current_user)):
349
+ """
350
+ Get stats for authenticated user.
351
+ Requires: X-API-KEY header
352
+ """
353
+ try:
354
+ engine = await rag_manager.get_engine(user_id)
355
+ stats = await asyncio.to_thread(engine.get_stats)
356
+
357
+ return StatsResponse(
358
+ total_documents=stats["total_documents"],
359
+ total_chunks=stats["total_chunks"],
360
+ index_size=stats["index_size"]
361
+ )
362
+
363
+ except Exception as e:
364
+ print(f"❌ Stats error (user {user_id}): {e}")
365
+ raise HTTPException(500, "Failed to retrieve stats")
366
+
367
+
368
+ # ============================================
369
+ # MAIN
370
+ # ============================================
371
+
372
+ if __name__ == "__main__":
373
+ import uvicorn
374
+ port = int(os.environ.get("PORT", 8000))
375
+ uvicorn.run("main:app", host="0.0.0.0", port=port, reload=True)