Spaces:
Running
Running
| """ | |
| Health Check and System Status Router | |
| """ | |
| from datetime import datetime | |
| from fastapi import APIRouter | |
| import sys | |
| import os | |
| # Add src to path for imports | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) | |
| from api.models import HealthStatus, InitializationStatus | |
| router = APIRouter(prefix="/health", tags=["health"]) | |
| async def health_check(): | |
| """ | |
| Check the health status of the API and its components | |
| """ | |
| components = {} | |
| # Check agent availability | |
| try: | |
| from agent import safe_run_agent | |
| components["agent"] = "healthy" | |
| except Exception: | |
| components["agent"] = "unhealthy" | |
| # Check vector store | |
| try: | |
| from vector_store import VectorStore | |
| components["vector_store"] = "healthy" | |
| except Exception: | |
| components["vector_store"] = "unhealthy" | |
| # Check data loaders | |
| try: | |
| from data_loaders import load_pdf_documents | |
| components["data_loaders"] = "healthy" | |
| except Exception: | |
| components["data_loaders"] = "unhealthy" | |
| # Check tools | |
| try: | |
| from tools import medical_guidelines_knowledge_tool | |
| components["tools"] = "healthy" | |
| except Exception: | |
| components["tools"] = "unhealthy" | |
| # Check initialization status | |
| initialization_status = None | |
| try: | |
| from background_init import ( | |
| is_initialization_complete, | |
| get_initialization_status, | |
| is_initialization_successful, | |
| get_initialization_error | |
| ) | |
| initialization_status = InitializationStatus( | |
| is_complete=is_initialization_complete(), | |
| status_message=get_initialization_status(), | |
| is_successful=is_initialization_successful(), | |
| error=str(get_initialization_error()) if get_initialization_error() else None | |
| ) | |
| except Exception as e: | |
| initialization_status = InitializationStatus( | |
| is_complete=False, | |
| status_message=f"Unable to check initialization status: {str(e)}", | |
| is_successful=False, | |
| error=str(e) | |
| ) | |
| # Overall status | |
| overall_status = "healthy" if all( | |
| status == "healthy" for status in components.values() | |
| ) else "degraded" | |
| return HealthStatus( | |
| status=overall_status, | |
| version="1.0.0", | |
| timestamp=datetime.now().isoformat(), | |
| components=components, | |
| initialization=initialization_status | |
| ) | |
| async def ping(): | |
| """ | |
| Simple ping endpoint for basic connectivity check | |
| """ | |
| return {"message": "pong", "timestamp": datetime.now().isoformat()} | |
| async def get_initialization_status(): | |
| """ | |
| Get the current initialization status of background components | |
| """ | |
| try: | |
| from background_init import ( | |
| is_initialization_complete, | |
| get_initialization_status, | |
| is_initialization_successful, | |
| get_initialization_error | |
| ) | |
| return InitializationStatus( | |
| is_complete=is_initialization_complete(), | |
| status_message=get_initialization_status(), | |
| is_successful=is_initialization_successful(), | |
| error=str(get_initialization_error()) if get_initialization_error() else None | |
| ) | |
| except Exception as e: | |
| return InitializationStatus( | |
| is_complete=False, | |
| status_message=f"Unable to check initialization status: {str(e)}", | |
| is_successful=False, | |
| error=str(e) | |
| ) | |
| async def get_version(): | |
| """ | |
| Get API version information | |
| """ | |
| return { | |
| "version": "1.0.0", | |
| "name": "Medical RAG AI Advisor API", | |
| "description": "Professional API for medical information retrieval and advisory services", | |
| "build_date": "2024-01-01" | |
| } | |