Spaces:
Runtime error
Runtime error
File size: 8,355 Bytes
eaaed5e 1713970 eaaed5e 1713970 eaaed5e 1713970 e010bf4 1713970 eaaed5e 1713970 eaaed5e eab3278 eaaed5e dce2995 eaaed5e dce2995 e010bf4 dce2995 5afe3ac 4ba8404 bc5bc44 1713970 eaaed5e 1713970 eaaed5e 1713970 eaaed5e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# Src/api/fastapi_app.py
import os
from datetime import datetime
from fastapi import FastAPI, Query, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sqlalchemy import text, select
# DB and services
from ..services.db import (
init_db, seed_data, get_session,
Patient, Doctor, Medicine, # ORM models for admin endpoints
)
from ..services.patient_service import register_patient as save_patient, get_patient_full_case
from ..services.medicine_service import check_medicine_availability
from ..services.doctor_service import release_stale_doctors
from ..services.doctor_assignment import assign_doctor_with_gemma
from ..services.summarizer import summarize_patient_case
# RAG
from ..rag.rag_pipeline import rag_query_multimodal
# Agent system
from ..agent.orchestrator import orchestrate_query
from ..agent.agent_executor import get_agent_executor
app = FastAPI(title="Medical Agentic Bot Backend FastAPI", version="1.0.0")
# ----------------------------
# CORS (Streamlit Space + local dev)
# ----------------------------
frontend_origin = os.getenv("FRONTEND_ORIGIN") # e.g. https://username-yourstreamlitspace.hf.space
origins = [o for o in [frontend_origin, "http://localhost:8501"] if o]
# fallback (dev): if nothing set, allow all (you can tighten later)
allow_origins = origins if origins else ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ----------------------------
# On Startup: DB Init + Seed
# ----------------------------
@app.on_event("startup")
def startup_event():
init_db()
seed_data()
# ----------------------------
# Root
# ----------------------------
# class Message(BaseModel):
# message: str
# @app.get("/", response_model=Message, summary="Root Endpoint", tags=["Root"])
# def read_root():
# return {"message": "Welcome to My Awesome API!"}
class Message(BaseModel):
message: str
OWNER: str
Purpose: str
@app.get("/", response_model=Message, summary="Root Endpoint", tags=["Root"])
def read_root():
"""
Root endpoint for checking the API status.
Returns a welcome message.
"""
return {"message": "Welcome to My Awesome Backend FastAPI Application!",
"OWNER": "MOHIT GUPTA",
"Purpose": "THIS API IS BACKEND FASTAPI FOR AGENTIC MEDICAL AI BOT."}
# ----------------------------
# 1. Medical RAG Chatbot
# ----------------------------
@app.get("/query")
def query_bot(q: str = Query(...), authorization: str = Header(...)):
hf_token = authorization.replace("Bearer ", "")
answer, references = rag_query_multimodal(q, k=10, hf_token=hf_token)
return {"answer": answer, "references": references}
# ----------------------------
# 2. Register Patient + Assign Doctor
# ----------------------------
class PatientData(BaseModel):
name: str
age: int
reason: str
@app.post("/register_patient")
def register_patient_api(data: PatientData, authorization: str = Header(...)):
hf_token = authorization.replace("Bearer ", "")
release_stale_doctors()
doctor, reasoning = assign_doctor_with_gemma(data.reason, hf_token=hf_token)
if not doctor:
return {"message": "No suitable doctor found. Please try again later."}
patient_record = save_patient(
{"name": data.name, "age": data.age, "reason": data.reason},
doctor["id"]
)
return {
"patient_id": patient_record["id"],
"assigned_doctor": doctor,
"reasoning": reasoning
}
# ----------------------------
# 3. Check Registration & Confirm Appointment
# ----------------------------
class AppointmentData(BaseModel):
name: str
@app.post("/check_registration_status")
def check_registration_status(data: AppointmentData, authorization: str = Header(...)):
"""
Looks up the patient by name, verifies/sets doctor availability,
and confirms appointment—same behavior as before.
"""
with get_session() as s:
record = s.execute(
text("""
SELECT p.id AS patient_id, p.name AS patient_name, d.id AS doctor_id,
d.name AS doctor_name, d.specialization, d.available
FROM patients p
LEFT JOIN doctors d ON p.doctor_id = d.id
WHERE p.name = :name
LIMIT 1
"""),
{"name": data.name}
).mappings().first()
if not record:
return {"message": "Patient not registered. Please register first."}
if not record["doctor_id"]:
return {"message": "No doctor assigned yet. Please register again."}
if record["available"] == 1:
s.execute(
text("""
UPDATE doctors
SET available = 0, last_booked_at = :ts
WHERE id = :doc_id
"""),
{"ts": datetime.utcnow().isoformat(), "doc_id": record["doctor_id"]}
)
return {
"message": (
f"Appointment confirmed with {record['doctor_name']} "
f"({record['specialization']}) for {record['patient_name']}."
),
"doctor_name": record["doctor_name"],
"specialization": record["specialization"],
}
# ----------------------------
# 4. Medicine Availability
# ----------------------------
@app.get("/medicine_availability")
def medicine_availability_api(name: str = Query(...), authorization: str = Header(...)):
result = check_medicine_availability(name)
return {"message": result}
# ----------------------------
# 5. Summarize Patient Case
# ----------------------------
@app.get("/summarize_case/{patient_id}")
def summarize_case_api(patient_id: int, authorization: str = Header(...)):
hf_token = authorization.replace("Bearer ", "")
patient_data = get_patient_full_case(patient_id)
if not patient_data:
return {"message": "Patient not found"}
summary = summarize_patient_case(patient_data, hf_token=hf_token)
return {"summary": summary}
# ----------------------------
# 6. LangChain Agent Endpoint
# ----------------------------
@app.get("/agent_query")
def agent_query(q: str = Query(...), authorization: str = Header(...)):
hf_token = authorization.replace("Bearer ", "")
agent = get_agent_executor(hf_token=hf_token)
response = agent.run(q)
return {"response": response}
# ----------------------------
# 7. Lightweight Orchestrator Endpoint
# ----------------------------
@app.get("/orchestrator_query")
def orchestrator_query(q: str = Query(...), authorization: str = Header(...)):
hf_token = authorization.replace("Bearer ", "")
result, references = orchestrate_query(q, hf_token=hf_token)
return {"result": result, "references": references}
@app.post("/release_stale_doctors")
def release_stale_doctors_api():
release_stale_doctors()
return {"status": "success"}
# ----------------------------
# 8. Admin list endpoints (for Streamlit dashboard)
# ----------------------------
@app.get("/admin/patients")
def admin_list_patients(authorization: str = Header(...)):
with get_session() as s:
rows = s.execute(select(Patient)).scalars().all()
return {"items": [
{
"id": p.id,
"name": p.name,
"age": p.age,
"reason": p.reason,
"registered_at": p.registered_at,
"doctor_id": p.doctor_id,
} for p in rows
]}
@app.get("/admin/doctors")
def admin_list_doctors(authorization: str = Header(...)):
with get_session() as s:
rows = s.execute(select(Doctor)).scalars().all()
return {"items": [
{
"id": d.id,
"name": d.name,
"specialization": d.specialization,
"available": d.available,
"last_booked_at": d.last_booked_at,
} for d in rows
]}
@app.get("/admin/medicines")
def admin_list_medicines(authorization: str = Header(...)):
with get_session() as s:
rows = s.execute(select(Medicine)).scalars().all()
return {"items": [
{"id": m.id, "name": m.name, "stock": m.stock} for m in rows
]}
|