kn29 commited on
Commit
d5bcbe3
·
verified ·
1 Parent(s): 21cc4ca

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +49 -30
app.py CHANGED
@@ -61,8 +61,6 @@ SESSION_STORES = {}
61
  STORE_LOCK = threading.RLock()
62
  CLEANUP_INTERVAL = 1800 # 30 minutes
63
  STORE_TTL = 1800 # 30 minutes
64
- # Note: asyncio.to_thread() uses its own internal thread pool, so a dedicated
65
- # global pool is not strictly necessary unless fine-grained control is needed.
66
 
67
  # --- Pydantic Models ---
68
  class ChatRequest(BaseModel):
@@ -148,6 +146,8 @@ def init_rag_models():
148
  def load_session_from_mongodb(session_id: str) -> Dict[str, Any]:
149
  """Load pre-existing session data and embeddings from MongoDB"""
150
  session_logger = create_session_logger(session_id)
 
 
151
  if not DB:
152
  raise ValueError("Database not connected")
153
 
@@ -170,11 +170,12 @@ def load_session_from_mongodb(session_id: str) -> Dict[str, Any]:
170
  groq_api_key = os.getenv("GROQ_API_KEY")
171
  session_rag = SessionRAG(session_id, groq_api_key)
172
 
173
- # Load the existing chunks and indices (don't recreate embeddings)
174
  session_rag.load_existing_session_data(chunks_list)
175
 
176
  session_store = {
177
  "session_rag": session_rag,
 
178
  "metadata": {
179
  "session_id": session_id,
180
  "title": session_doc.get("filename", "Document"),
@@ -186,10 +187,10 @@ def load_session_from_mongodb(session_id: str) -> Dict[str, Any]:
186
  session_logger.info("Session loaded from MongoDB with existing embeddings")
187
  return session_store
188
 
189
-
190
  def get_chat_history_safely(session_id: str, limit: int = 50) -> List[Dict[str, Any]]:
191
  """Get chat history with error handling."""
192
- if not DB: return []
 
193
  try:
194
  chats_cursor = DB.chats.find({"session_id": session_id}).sort("created_at", -1).limit(limit)
195
  # Reverse to get chronological order [oldest -> newest]
@@ -228,11 +229,21 @@ async def periodic_cleanup():
228
  logger.info("Running periodic cleanup of expired sessions...")
229
  cleanup_expired_sessions()
230
 
 
 
 
 
 
 
 
 
 
 
 
231
 
232
  # --- Application Lifespan ---
233
  @asynccontextmanager
234
  async def lifespan(app: FastAPI):
235
- # --- FIX APPLIED: Variable initialized to None ---
236
  cleanup_task = None
237
  APP_STATE["startup_time"] = datetime.utcnow()
238
  logger.info("Starting Advanced RAG Chat Service...")
@@ -252,7 +263,6 @@ async def lifespan(app: FastAPI):
252
  MONGO_CLIENT.close()
253
  logger.info("Shutdown complete.")
254
 
255
-
256
  # --- FastAPI App and Endpoints ---
257
  app = FastAPI(
258
  title="Advanced RAG Chat Service",
@@ -263,7 +273,10 @@ app = FastAPI(
263
 
264
  app.add_middleware(
265
  CORSMiddleware,
266
- allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],
 
 
 
267
  )
268
 
269
  @app.get("/")
@@ -275,7 +288,7 @@ async def health_check():
275
  uptime = (datetime.utcnow() - APP_STATE["startup_time"]).total_seconds()
276
  with STORE_LOCK:
277
  active_sessions = len(SESSION_STORES)
278
- indexed_sessions = sum(1 for s in SESSION_STORES.values() if s["indexed"])
279
 
280
  status = "healthy"
281
  if not RAG_MODELS_INITIALIZED or not APP_STATE["mongodb_connected"]:
@@ -284,7 +297,7 @@ async def health_check():
284
  return HealthResponse(
285
  status=status,
286
  mongodb_connected=APP_STATE["mongodb_connected"],
287
- rag_models_initialized=RAG_MODELS_INITIALIZED, # Correct field name
288
  faiss_available=FAISS_AVAILABLE,
289
  active_sessions=active_sessions,
290
  memory_usage={"loaded_sessions": active_sessions, "indexed_sessions": indexed_sessions},
@@ -298,22 +311,35 @@ async def chat_with_document(session_id: str, request: ChatRequest):
298
  start_time = time.time()
299
 
300
  try:
 
 
301
  # Check if session is already loaded in memory
302
  with STORE_LOCK:
303
  if session_id not in SESSION_STORES:
304
  # Lazy load: Load session from MongoDB when first chat request comes
305
  session_logger.info("Loading session from MongoDB for first chat request")
306
- session_store = await asyncio.to_thread(load_session_from_mongodb, session_id)
307
- SESSION_STORES[session_id] = session_store
 
 
 
 
 
308
 
309
  # Update last access time
310
  SESSION_LAST_ACCESS[session_id] = datetime.utcnow()
311
  session_rag = SESSION_STORES[session_id]["session_rag"]
312
- # Process the query
313
- session_logger.info(f"Processing query: {request.message[:100]}...")
 
 
314
  result = await asyncio.to_thread(session_rag.query_documents, request.message, top_k=5)
315
- APP_STATE["total_queries"] += 1
316
 
 
 
 
 
 
317
  answer = result.get('answer', 'Unable to generate an answer.')
318
 
319
  # Save chat messages asynchronously
@@ -321,7 +347,7 @@ async def chat_with_document(session_id: str, request: ChatRequest):
321
  asyncio.create_task(save_chat_message_safely(session_id, "assistant", answer))
322
 
323
  processing_time = time.time() - start_time
324
- session_logger.info(f"Query processed in {processing_time:.2f}s.")
325
 
326
  return ChatResponse(
327
  success=True,
@@ -333,16 +359,20 @@ async def chat_with_document(session_id: str, request: ChatRequest):
333
  query_analysis=result.get('query_analysis'),
334
  confidence=result.get('confidence')
335
  )
 
 
 
336
  except Exception as e:
337
  session_logger.error(f"Chat processing failed: {e}", exc_info=True)
338
- raise HTTPException(status_code=500, detail=f"Chat processing error: {e}")
 
339
 
340
  @app.get("/history/{session_id}")
341
  async def get_session_history(session_id: str):
342
  """Retrieves chat history for a session."""
343
  if not DB:
344
  raise HTTPException(status_code=503, detail="Database not connected")
345
- # --- FIX APPLIED: Consistent threading model ---
346
  history = await asyncio.to_thread(get_chat_history_safely, session_id)
347
  return {"session_id": session_id, "chat_history": history}
348
 
@@ -352,19 +382,8 @@ async def cleanup_session(session_id: str):
352
  cleanup_session_resources(session_id)
353
  return {"success": True, "message": f"Session {session_id} cleaned up."}
354
 
355
- async def save_chat_message_safely(session_id: str, role: str, message: str):
356
- """Saves chat messages in a non-blocking way."""
357
- if not DB: return
358
- try:
359
- await asyncio.to_thread(
360
- DB.chats.insert_one,
361
- {"session_id": session_id, "role": role, "message": message, "created_at": datetime.utcnow()}
362
- )
363
- except Exception as e:
364
- logger.error(f"Failed to save chat message for session {session_id}: {e}")
365
-
366
  if __name__ == "__main__":
367
  import uvicorn
368
  port = int(os.getenv("PORT", 7861))
369
  logger.info(f"Starting server on http://0.0.0.0:{port}")
370
- uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)
 
61
  STORE_LOCK = threading.RLock()
62
  CLEANUP_INTERVAL = 1800 # 30 minutes
63
  STORE_TTL = 1800 # 30 minutes
 
 
64
 
65
  # --- Pydantic Models ---
66
  class ChatRequest(BaseModel):
 
146
  def load_session_from_mongodb(session_id: str) -> Dict[str, Any]:
147
  """Load pre-existing session data and embeddings from MongoDB"""
148
  session_logger = create_session_logger(session_id)
149
+ session_logger.info(f"Loading session from MongoDB: {session_id}")
150
+
151
  if not DB:
152
  raise ValueError("Database not connected")
153
 
 
170
  groq_api_key = os.getenv("GROQ_API_KEY")
171
  session_rag = SessionRAG(session_id, groq_api_key)
172
 
173
+ session_logger.info(f"Loading existing session data with {len(chunks_list)} chunks")
174
  session_rag.load_existing_session_data(chunks_list)
175
 
176
  session_store = {
177
  "session_rag": session_rag,
178
+ "indexed": True, # Add this field for health check
179
  "metadata": {
180
  "session_id": session_id,
181
  "title": session_doc.get("filename", "Document"),
 
187
  session_logger.info("Session loaded from MongoDB with existing embeddings")
188
  return session_store
189
 
 
190
  def get_chat_history_safely(session_id: str, limit: int = 50) -> List[Dict[str, Any]]:
191
  """Get chat history with error handling."""
192
+ if not DB:
193
+ return []
194
  try:
195
  chats_cursor = DB.chats.find({"session_id": session_id}).sort("created_at", -1).limit(limit)
196
  # Reverse to get chronological order [oldest -> newest]
 
229
  logger.info("Running periodic cleanup of expired sessions...")
230
  cleanup_expired_sessions()
231
 
232
+ async def save_chat_message_safely(session_id: str, role: str, message: str):
233
+ """Saves chat messages in a non-blocking way."""
234
+ if not DB:
235
+ return
236
+ try:
237
+ await asyncio.to_thread(
238
+ DB.chats.insert_one,
239
+ {"session_id": session_id, "role": role, "message": message, "created_at": datetime.utcnow()}
240
+ )
241
+ except Exception as e:
242
+ logger.error(f"Failed to save chat message for session {session_id}: {e}")
243
 
244
  # --- Application Lifespan ---
245
  @asynccontextmanager
246
  async def lifespan(app: FastAPI):
 
247
  cleanup_task = None
248
  APP_STATE["startup_time"] = datetime.utcnow()
249
  logger.info("Starting Advanced RAG Chat Service...")
 
263
  MONGO_CLIENT.close()
264
  logger.info("Shutdown complete.")
265
 
 
266
  # --- FastAPI App and Endpoints ---
267
  app = FastAPI(
268
  title="Advanced RAG Chat Service",
 
273
 
274
  app.add_middleware(
275
  CORSMiddleware,
276
+ allow_origins=["*"],
277
+ allow_credentials=True,
278
+ allow_methods=["*"],
279
+ allow_headers=["*"],
280
  )
281
 
282
  @app.get("/")
 
288
  uptime = (datetime.utcnow() - APP_STATE["startup_time"]).total_seconds()
289
  with STORE_LOCK:
290
  active_sessions = len(SESSION_STORES)
291
+ indexed_sessions = sum(1 for s in SESSION_STORES.values() if s.get("indexed", False))
292
 
293
  status = "healthy"
294
  if not RAG_MODELS_INITIALIZED or not APP_STATE["mongodb_connected"]:
 
297
  return HealthResponse(
298
  status=status,
299
  mongodb_connected=APP_STATE["mongodb_connected"],
300
+ rag_models_initialized=RAG_MODELS_INITIALIZED,
301
  faiss_available=FAISS_AVAILABLE,
302
  active_sessions=active_sessions,
303
  memory_usage={"loaded_sessions": active_sessions, "indexed_sessions": indexed_sessions},
 
311
  start_time = time.time()
312
 
313
  try:
314
+ session_logger.info(f"Received chat request: {request.message[:100]}...")
315
+
316
  # Check if session is already loaded in memory
317
  with STORE_LOCK:
318
  if session_id not in SESSION_STORES:
319
  # Lazy load: Load session from MongoDB when first chat request comes
320
  session_logger.info("Loading session from MongoDB for first chat request")
321
+ try:
322
+ session_store = await asyncio.to_thread(load_session_from_mongodb, session_id)
323
+ SESSION_STORES[session_id] = session_store
324
+ session_logger.info("Session loaded successfully from MongoDB")
325
+ except Exception as load_error:
326
+ session_logger.error(f"Failed to load session: {load_error}")
327
+ raise HTTPException(status_code=404, detail=f"Failed to load session: {str(load_error)}")
328
 
329
  # Update last access time
330
  SESSION_LAST_ACCESS[session_id] = datetime.utcnow()
331
  session_rag = SESSION_STORES[session_id]["session_rag"]
332
+
333
+ session_logger.info(f"Processing query with SessionRAG...")
334
+
335
+ # Process the query using SessionRAG
336
  result = await asyncio.to_thread(session_rag.query_documents, request.message, top_k=5)
 
337
 
338
+ if 'error' in result:
339
+ session_logger.error(f"Query processing error: {result['error']}")
340
+ raise HTTPException(status_code=500, detail=result['error'])
341
+
342
+ APP_STATE["total_queries"] += 1
343
  answer = result.get('answer', 'Unable to generate an answer.')
344
 
345
  # Save chat messages asynchronously
 
347
  asyncio.create_task(save_chat_message_safely(session_id, "assistant", answer))
348
 
349
  processing_time = time.time() - start_time
350
+ session_logger.info(f"Query processed successfully in {processing_time:.2f}s")
351
 
352
  return ChatResponse(
353
  success=True,
 
359
  query_analysis=result.get('query_analysis'),
360
  confidence=result.get('confidence')
361
  )
362
+
363
+ except HTTPException:
364
+ raise
365
  except Exception as e:
366
  session_logger.error(f"Chat processing failed: {e}", exc_info=True)
367
+ APP_STATE["errors"].append(f"Chat error: {str(e)}")
368
+ raise HTTPException(status_code=500, detail=f"Chat processing error: {str(e)}")
369
 
370
  @app.get("/history/{session_id}")
371
  async def get_session_history(session_id: str):
372
  """Retrieves chat history for a session."""
373
  if not DB:
374
  raise HTTPException(status_code=503, detail="Database not connected")
375
+
376
  history = await asyncio.to_thread(get_chat_history_safely, session_id)
377
  return {"session_id": session_id, "chat_history": history}
378
 
 
382
  cleanup_session_resources(session_id)
383
  return {"success": True, "message": f"Session {session_id} cleaned up."}
384
 
 
 
 
 
 
 
 
 
 
 
 
385
  if __name__ == "__main__":
386
  import uvicorn
387
  port = int(os.getenv("PORT", 7861))
388
  logger.info(f"Starting server on http://0.0.0.0:{port}")
389
+ uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)