Spaces:
Running
Running
File size: 4,030 Bytes
2a8faae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
"""
Health Check and System Status Router
"""
from datetime import datetime
from fastapi import APIRouter
import sys
import os
# Add src to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from api.models import HealthStatus, InitializationStatus
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/", response_model=HealthStatus)
async def health_check():
"""
Check the health status of the API and its components
"""
components = {}
# Check agent availability
try:
from agent import safe_run_agent
components["agent"] = "healthy"
except Exception:
components["agent"] = "unhealthy"
# Check vector store
try:
from vector_store import VectorStore
components["vector_store"] = "healthy"
except Exception:
components["vector_store"] = "unhealthy"
# Check data loaders
try:
from data_loaders import load_pdf_documents
components["data_loaders"] = "healthy"
except Exception:
components["data_loaders"] = "unhealthy"
# Check tools
try:
from tools import medical_guidelines_knowledge_tool
components["tools"] = "healthy"
except Exception:
components["tools"] = "unhealthy"
# Check initialization status
initialization_status = None
try:
from background_init import (
is_initialization_complete,
get_initialization_status,
is_initialization_successful,
get_initialization_error
)
initialization_status = InitializationStatus(
is_complete=is_initialization_complete(),
status_message=get_initialization_status(),
is_successful=is_initialization_successful(),
error=str(get_initialization_error()) if get_initialization_error() else None
)
except Exception as e:
initialization_status = InitializationStatus(
is_complete=False,
status_message=f"Unable to check initialization status: {str(e)}",
is_successful=False,
error=str(e)
)
# Overall status
overall_status = "healthy" if all(
status == "healthy" for status in components.values()
) else "degraded"
return HealthStatus(
status=overall_status,
version="1.0.0",
timestamp=datetime.now().isoformat(),
components=components,
initialization=initialization_status
)
@router.get("/ping")
async def ping():
"""
Simple ping endpoint for basic connectivity check
"""
return {"message": "pong", "timestamp": datetime.now().isoformat()}
@router.get("/initialization", response_model=InitializationStatus)
async def get_initialization_status():
"""
Get the current initialization status of background components
"""
try:
from background_init import (
is_initialization_complete,
get_initialization_status,
is_initialization_successful,
get_initialization_error
)
return InitializationStatus(
is_complete=is_initialization_complete(),
status_message=get_initialization_status(),
is_successful=is_initialization_successful(),
error=str(get_initialization_error()) if get_initialization_error() else None
)
except Exception as e:
return InitializationStatus(
is_complete=False,
status_message=f"Unable to check initialization status: {str(e)}",
is_successful=False,
error=str(e)
)
@router.get("/version")
async def get_version():
"""
Get API version information
"""
return {
"version": "1.0.0",
"name": "Medical RAG AI Advisor API",
"description": "Professional API for medical information retrieval and advisory services",
"build_date": "2024-01-01"
}
|