""" Health Check and System Status Router """ from datetime import datetime from fastapi import APIRouter import sys import os # Add src to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from api.models import HealthStatus, InitializationStatus router = APIRouter(prefix="/health", tags=["health"]) @router.get("/", response_model=HealthStatus) async def health_check(): """ Check the health status of the API and its components """ components = {} # Check agent availability try: from agent import safe_run_agent components["agent"] = "healthy" except Exception: components["agent"] = "unhealthy" # Check vector store try: from vector_store import VectorStore components["vector_store"] = "healthy" except Exception: components["vector_store"] = "unhealthy" # Check data loaders try: from data_loaders import load_pdf_documents components["data_loaders"] = "healthy" except Exception: components["data_loaders"] = "unhealthy" # Check tools try: from tools import medical_guidelines_knowledge_tool components["tools"] = "healthy" except Exception: components["tools"] = "unhealthy" # Check initialization status initialization_status = None try: from background_init import ( is_initialization_complete, get_initialization_status, is_initialization_successful, get_initialization_error ) initialization_status = InitializationStatus( is_complete=is_initialization_complete(), status_message=get_initialization_status(), is_successful=is_initialization_successful(), error=str(get_initialization_error()) if get_initialization_error() else None ) except Exception as e: initialization_status = InitializationStatus( is_complete=False, status_message=f"Unable to check initialization status: {str(e)}", is_successful=False, error=str(e) ) # Overall status overall_status = "healthy" if all( status == "healthy" for status in components.values() ) else "degraded" return HealthStatus( status=overall_status, version="1.0.0", timestamp=datetime.now().isoformat(), components=components, initialization=initialization_status ) @router.get("/ping") async def ping(): """ Simple ping endpoint for basic connectivity check """ return {"message": "pong", "timestamp": datetime.now().isoformat()} @router.get("/initialization", response_model=InitializationStatus) async def get_initialization_status(): """ Get the current initialization status of background components """ try: from background_init import ( is_initialization_complete, get_initialization_status, is_initialization_successful, get_initialization_error ) return InitializationStatus( is_complete=is_initialization_complete(), status_message=get_initialization_status(), is_successful=is_initialization_successful(), error=str(get_initialization_error()) if get_initialization_error() else None ) except Exception as e: return InitializationStatus( is_complete=False, status_message=f"Unable to check initialization status: {str(e)}", is_successful=False, error=str(e) ) @router.get("/version") async def get_version(): """ Get API version information """ return { "version": "1.0.0", "name": "Medical RAG AI Advisor API", "description": "Professional API for medical information retrieval and advisory services", "build_date": "2024-01-01" } @router.get("/sessions") async def get_active_sessions(): """ Get list of all active conversation sessions """ try: from core.agent import get_active_sessions sessions = get_active_sessions() return { "active_sessions": sessions, "count": len(sessions), "timestamp": datetime.now().isoformat() } except Exception as e: return { "error": str(e), "active_sessions": [], "count": 0 } @router.delete("/sessions/{session_id}") async def clear_session(session_id: str): """ Clear conversation memory for a specific session Args: session_id: The session identifier to clear """ try: from core.agent import clear_session_memory success = clear_session_memory(session_id) if success: return { "message": f"Session '{session_id}' cleared successfully", "session_id": session_id, "timestamp": datetime.now().isoformat() } else: return { "message": f"Session '{session_id}' not found", "session_id": session_id, "timestamp": datetime.now().isoformat() } except Exception as e: return { "error": str(e), "session_id": session_id } @router.delete("/sessions") async def clear_all_sessions(): """ Clear all conversation sessions """ try: from core.agent import clear_memory clear_memory() return { "message": "All sessions cleared successfully", "timestamp": datetime.now().isoformat() } except Exception as e: return { "error": str(e) } @router.get("/sessions/{session_id}/summary") async def get_session_summary(session_id: str): """ Get conversation history summary for a specific session Args: session_id: The session identifier """ try: from core.agent import get_memory_summary summary = get_memory_summary(session_id) return { "session_id": session_id, "summary": summary, "timestamp": datetime.now().isoformat() } except Exception as e: return { "error": str(e), "session_id": session_id }