kn29 commited on
Commit
05dbe82
·
verified ·
1 Parent(s): 4215cf1

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +76 -32
app.py CHANGED
@@ -46,7 +46,7 @@ RAG_INITIALIZED = False
46
  SESSION_STORES = {}
47
  STORE_LOCK = threading.RLock()
48
  CLEANUP_INTERVAL = 3600 # 1 hour cleanup interval
49
- STORE_TTL = 24 * 3600 # 24 hours TTL for in-memory stores
50
 
51
  # Request/Response models
52
  class ChatRequest(BaseModel):
@@ -322,37 +322,56 @@ def get_chat_history(session_id: str, limit: int = 50) -> List[Dict[str, Any]]:
322
  logger.error(f"Failed to get chat history for session {session_id}: {e}")
323
  return []
324
 
 
 
 
 
 
 
325
  def cleanup_old_stores():
326
- """Background cleanup of old in-memory stores"""
327
- while True:
328
- try:
329
- current_time = datetime.utcnow()
330
- expired_sessions = []
331
-
332
- with STORE_LOCK:
333
- for session_id, store in SESSION_STORES.items():
334
- loaded_at = store["metadata"]["loaded_at"]
335
- if (current_time - loaded_at).total_seconds() > STORE_TTL:
336
- expired_sessions.append(session_id)
337
-
338
- for session_id in expired_sessions:
339
- # Clean up FAISS index and other resources
340
- if SESSION_STORES[session_id].get("faiss_index"):
341
- del SESSION_STORES[session_id]["faiss_index"]
342
- del SESSION_STORES[session_id]
343
- logger.info(f"Cleaned up expired store for session: {session_id}")
344
-
345
- if expired_sessions:
346
- logger.info(f"Cleaned up {len(expired_sessions)} expired session stores")
347
 
348
- except Exception as e:
349
- logger.error(f"Cleanup error: {e}")
 
 
 
 
350
 
351
- time.sleep(CLEANUP_INTERVAL)
 
 
 
 
352
 
353
- @app.on_event("startup")
354
- async def startup_event():
355
- """Initialize connections on startup"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356
  logger.info("Starting up Advanced RAG Chat Service...")
357
 
358
  # Connect to MongoDB
@@ -365,12 +384,37 @@ async def startup_event():
365
  logger.error("Failed to initialize RAG system")
366
  raise Exception("RAG initialization failed")
367
 
368
- # Start background cleanup thread
369
- cleanup_thread = threading.Thread(target=cleanup_old_stores, daemon=True)
370
- cleanup_thread.start()
371
- logger.info("Background cleanup thread started")
372
 
373
  logger.info("Startup completed successfully")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
374
 
375
  @app.get("/health", response_model=HealthResponse)
376
  async def health_check():
 
46
  SESSION_STORES = {}
47
  STORE_LOCK = threading.RLock()
48
  CLEANUP_INTERVAL = 3600 # 1 hour cleanup interval
49
+ STORE_TTL = 30 * 60 # 24 hours TTL for in-memory stores
50
 
51
  # Request/Response models
52
  class ChatRequest(BaseModel):
 
322
  logger.error(f"Failed to get chat history for session {session_id}: {e}")
323
  return []
324
 
325
+ import asyncio
326
+ from contextlib import asynccontextmanager
327
+
328
+ # Global cleanup task
329
+ cleanup_task = None
330
+
331
  def cleanup_old_stores():
332
+ """Background cleanup of old in-memory stores - single run"""
333
+ try:
334
+ current_time = datetime.utcnow()
335
+ expired_sessions = []
336
+
337
+ with STORE_LOCK:
338
+ for session_id, store in SESSION_STORES.items():
339
+ loaded_at = store["metadata"]["loaded_at"]
340
+ if (current_time - loaded_at).total_seconds() > STORE_TTL:
341
+ expired_sessions.append(session_id)
 
 
 
 
 
 
 
 
 
 
 
342
 
343
+ for session_id in expired_sessions:
344
+ # Clean up FAISS index and other resources
345
+ if SESSION_STORES[session_id].get("faiss_index"):
346
+ del SESSION_STORES[session_id]["faiss_index"]
347
+ del SESSION_STORES[session_id]
348
+ logger.info(f"Cleaned up expired store for session: {session_id}")
349
 
350
+ if expired_sessions:
351
+ logger.info(f"Cleaned up {len(expired_sessions)} expired session stores")
352
+
353
+ except Exception as e:
354
+ logger.error(f"Cleanup error: {e}")
355
 
356
+ async def periodic_cleanup():
357
+ """Async periodic cleanup task"""
358
+ global cleanup_task
359
+ try:
360
+ while True:
361
+ cleanup_old_stores()
362
+ await asyncio.sleep(CLEANUP_INTERVAL)
363
+ except asyncio.CancelledError:
364
+ logger.info("Cleanup task cancelled")
365
+ raise
366
+ except Exception as e:
367
+ logger.error(f"Periodic cleanup error: {e}")
368
+
369
+ @asynccontextmanager
370
+ async def lifespan(app: FastAPI):
371
+ """Application lifespan manager"""
372
+ global cleanup_task
373
+
374
+ # Startup
375
  logger.info("Starting up Advanced RAG Chat Service...")
376
 
377
  # Connect to MongoDB
 
384
  logger.error("Failed to initialize RAG system")
385
  raise Exception("RAG initialization failed")
386
 
387
+ # Start background cleanup task
388
+ cleanup_task = asyncio.create_task(periodic_cleanup())
389
+ logger.info("Background cleanup task started")
 
390
 
391
  logger.info("Startup completed successfully")
392
+
393
+ yield
394
+
395
+ # Shutdown
396
+ logger.info("Shutting down Advanced RAG Chat Service...")
397
+
398
+ if cleanup_task:
399
+ cleanup_task.cancel()
400
+ try:
401
+ await cleanup_task
402
+ except asyncio.CancelledError:
403
+ pass
404
+
405
+ if MONGO_CLIENT:
406
+ MONGO_CLIENT.close()
407
+
408
+ logger.info("Shutdown completed")
409
+
410
+ # Replace the FastAPI app initialization
411
+ app = FastAPI(
412
+ title="Advanced RAG Chat Service",
413
+ version="1.0.0",
414
+ lifespan=lifespan
415
+ )
416
+
417
+
418
 
419
  @app.get("/health", response_model=HealthResponse)
420
  async def health_check():