careflow / agent_server.py
omgy's picture
Upload 8 files
10fcca6 verified
"""
Agent HTTP Server for CareFlow Nexus Backend Integration
Exposes AI agents as HTTP endpoints that the backend can call
Matches the API contract expected by backend/app/services/agents.py
"""
import asyncio
import logging
import sys
from datetime import datetime
from typing import Any, Dict, List, Optional
from allocator_agent import BedAllocatorAgent
from communicator_agent import CommunicatorAgent
from config import config
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from memory_agent import MemoryAgent
from pydantic import BaseModel
from services.firebase_service import FirebaseService
from services.gemini_service import GeminiService
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f"agent_server_{datetime.now().strftime('%Y%m%d')}.log"),
],
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="CareFlow AI Agent Server",
description="AI Agent Microservice for Hospital Bed Management",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global agent instances
firebase_service: Optional[FirebaseService] = None
gemini_service: Optional[GeminiService] = None
memory_agent: Optional[MemoryAgent] = None
bed_allocator_agent: Optional[BedAllocatorAgent] = None
communicator_agent: Optional[CommunicatorAgent] = None
# ==================== REQUEST/RESPONSE MODELS ====================
class BedAgentRequest(BaseModel):
"""Request model for bed assignment agent"""
patient: Dict[str, Any]
doctor_input: Dict[str, Any]
available_beds: List[Dict[str, Any]]
class BedAgentResponse(BaseModel):
"""Response model for bed assignment agent"""
recommended_bed_id: Optional[str]
reason: str
recommendations: List[Dict[str, Any]]
confidence: int
class CleanerAgentRequest(BaseModel):
"""Request model for cleaner assignment agent"""
bed_id: str
available_cleaners: List[Dict[str, Any]]
class CleanerAgentResponse(BaseModel):
"""Response model for cleaner assignment agent"""
selected_cleaner_id: Optional[str]
reason: str
class NurseAgentRequest(BaseModel):
"""Request model for nurse assignment agent"""
patient: Dict[str, Any]
bed: Dict[str, Any]
available_nurses: List[Dict[str, Any]]
class NurseAgentResponse(BaseModel):
"""Response model for nurse assignment agent"""
selected_nurse_id: Optional[str]
reason: str
# ==================== STARTUP/SHUTDOWN ====================
@app.on_event("startup")
async def startup_event():
"""Initialize all AI agents on server startup"""
global \
firebase_service, \
gemini_service, \
memory_agent, \
bed_allocator_agent, \
communicator_agent
try:
logger.info("=" * 60)
logger.info("CareFlow AI Agent Server Starting...")
logger.info("=" * 60)
# Validate configuration
logger.info("Validating configuration...")
if not config.validate():
raise Exception("Configuration validation failed")
logger.info("✓ Configuration valid")
# Initialize Firebase service
logger.info("Initializing Firebase service...")
firebase_service = FirebaseService(
service_account_path=config.firebase.service_account_path
)
logger.info("✓ Firebase service initialized")
# Initialize Gemini service
logger.info("Initializing Gemini AI service...")
gemini_service = GeminiService(
api_key=config.gemini.api_key,
model_name=config.gemini.model_name,
)
logger.info("✓ Gemini AI service initialized")
# Initialize Memory Agent
logger.info("Initializing Memory Agent...")
memory_agent = MemoryAgent(
firebase_service=firebase_service,
gemini_service=gemini_service,
refresh_interval=config.agent.state_refresh_interval,
)
await memory_agent.initialize()
logger.info("✓ Memory Agent initialized")
# Initialize Bed Allocator Agent
logger.info("Initializing Bed Allocator Agent...")
bed_allocator_agent = BedAllocatorAgent(
firebase_service=firebase_service,
gemini_service=gemini_service,
memory_agent=memory_agent,
rule_weight=config.agent.rule_weight,
)
logger.info("✓ Bed Allocator Agent initialized")
# Initialize Communicator Agent
logger.info("Initializing Communicator Agent...")
communicator_agent = CommunicatorAgent(
firebase_service=firebase_service,
gemini_service=gemini_service,
memory_agent=memory_agent,
max_staff_workload=config.agent.max_staff_workload,
)
logger.info("✓ Communicator Agent initialized")
logger.info("=" * 60)
logger.info("✓ ALL AGENTS READY")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Failed to initialize agents: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on server shutdown"""
logger.info("Shutting down AI Agent Server...")
# ==================== HELPER FUNCTIONS ====================
def _extract_basic_requirements(doctor_input: Dict[str, Any]) -> Dict[str, Any]:
"""Extract basic requirements from doctor input"""
diagnosis = doctor_input.get("diagnosis", "").lower()
special_instructions = doctor_input.get("special_instructions", "").lower()
combined_text = f"{diagnosis} {special_instructions}"
requirements = {
"needs_oxygen": any(
word in combined_text for word in ["oxygen", "o2", "respiratory"]
),
"needs_ventilator": any(
word in combined_text for word in ["ventilator", "intubation"]
),
"needs_cardiac_monitor": any(
word in combined_text for word in ["cardiac", "heart", "monitor"]
),
"needs_isolation": any(
word in combined_text for word in ["isolation", "infectious", "contagious"]
),
}
return requirements
def _infer_severity(diagnosis: str) -> str:
"""Infer severity from diagnosis text"""
diagnosis_lower = diagnosis.lower()
if any(
word in diagnosis_lower for word in ["critical", "severe", "emergency", "acute"]
):
return "critical"
elif any(word in diagnosis_lower for word in ["moderate", "significant"]):
return "high"
elif any(word in diagnosis_lower for word in ["mild", "minor"]):
return "low"
else:
return "moderate"
# ==================== HEALTH CHECK ====================
@app.get("/health")
async def health_check():
"""Health check endpoint"""
agents_ready = all(
[
firebase_service is not None,
gemini_service is not None,
memory_agent is not None,
bed_allocator_agent is not None,
communicator_agent is not None,
]
)
return {
"status": "healthy" if agents_ready else "initializing",
"agents_ready": agents_ready,
"timestamp": datetime.now().isoformat(),
}
# ==================== AGENT ENDPOINTS ====================
@app.post("/agent/bed-assignment", response_model=BedAgentResponse)
async def call_bed_agent(request: BedAgentRequest):
"""
Bed Assignment Agent Endpoint
Returns bed recommendations based on patient diagnosis and available beds.
Uses hybrid AI + rule-based scoring.
Input:
{
"patient": {...},
"doctor_input": {
"diagnosis": "Pneumonia",
"special_instructions": "Oxygen support"
},
"available_beds": [...]
}
Output:
{
"recommended_bed_id": "bed22",
"reason": "Supports oxygen",
"recommendations": [top 3 beds with scores],
"confidence": 85
}
"""
try:
if not bed_allocator_agent:
raise HTTPException(status_code=503, detail="Agents not initialized")
patient = request.patient
doctor_input = request.doctor_input
patient_id = patient.get("patient_id")
logger.info(f"Bed assignment request for patient: {patient_id}")
# Update patient with diagnosis information
await firebase_service.update_patient(
patient_id,
{
"diagnosis": doctor_input.get("diagnosis"),
"severity": _infer_severity(doctor_input.get("diagnosis", "")),
"requirements": _extract_basic_requirements(doctor_input),
},
)
# Call Bed Allocator Agent
result = await bed_allocator_agent.process({"patient_id": patient_id})
if not result.get("success"):
logger.warning(f"No suitable beds found for patient {patient_id}")
return BedAgentResponse(
recommended_bed_id=None,
reason=result.get("message", "No suitable beds found"),
recommendations=[],
confidence=0,
)
data = result.get("data", {})
recommendations = data.get("recommendations", [])
# Format response according to API contract
response = BedAgentResponse(
recommended_bed_id=recommendations[0].get("bed_id")
if recommendations
else None,
reason=recommendations[0].get("reasoning")
if recommendations
else "No beds available",
recommendations=[
{
"bed_id": rec.get("bed_id"),
"bed_number": rec.get("bed_number"),
"ward": rec.get("ward"),
"score": rec.get("score"),
"reasoning": rec.get("reasoning"),
"pros": rec.get("pros", []),
"cons": rec.get("cons", []),
}
for rec in recommendations[:3] # Top 3
],
confidence=data.get("confidence", 0),
)
logger.info(f"Bed assignment complete: {response.recommended_bed_id}")
return response
except Exception as e:
logger.error(f"Error in bed agent: {e}")
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
@app.post("/agent/cleaner-assignment", response_model=CleanerAgentResponse)
async def call_cleaner_agent(request: CleanerAgentRequest):
"""
Cleaner Assignment Agent Endpoint
Assigns a cleaner to prepare a bed.
Input:
{
"bed_id": "bed22",
"available_cleaners": [...]
}
Output:
{
"selected_cleaner_id": "c1",
"reason": "Least workload"
}
"""
try:
if not communicator_agent:
raise HTTPException(status_code=503, detail="Agents not initialized")
bed_id = request.bed_id
available_cleaners = request.available_cleaners
logger.info(f"Cleaner assignment request for bed: {bed_id}")
# Get bed information
bed = await firebase_service.get_bed(bed_id)
if not bed:
raise HTTPException(status_code=404, detail="Bed not found")
# Use Communicator Agent to assign staff
task_data = {
"task_type": "cleaning",
"description": f"Clean bed {bed.get('bed_id')} in {bed.get('ward')}",
"bed_id": bed_id,
"priority": "high",
"role": "cleaner",
}
result = await communicator_agent.process(
{"type": "assign_staff", "task_data": task_data}
)
if result.get("success"):
assignment = result.get("data", {})
response = CleanerAgentResponse(
selected_cleaner_id=assignment.get("staff_id"),
reason=assignment.get(
"reasoning", "Selected based on workload and availability"
),
)
else:
# Fallback: select first available cleaner
if available_cleaners:
response = CleanerAgentResponse(
selected_cleaner_id=available_cleaners[0].get("user_id"),
reason="First available cleaner",
)
else:
response = CleanerAgentResponse(
selected_cleaner_id=None,
reason="No cleaners available",
)
logger.info(f"Cleaner assignment complete: {response.selected_cleaner_id}")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in cleaner agent: {e}")
# Fallback
if request.available_cleaners:
return CleanerAgentResponse(
selected_cleaner_id=request.available_cleaners[0].get("user_id"),
reason=f"Fallback assignment due to error",
)
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
@app.post("/agent/nurse-assignment", response_model=NurseAgentResponse)
async def call_nurse_agent(request: NurseAgentRequest):
"""
Nurse Assignment Agent Endpoint
Assigns a nurse to prepare a bed for a patient.
Input:
{
"patient": {...},
"bed": {...},
"available_nurses": [...]
}
Output:
{
"selected_nurse_id": "n1",
"reason": "ICU trained"
}
"""
try:
if not communicator_agent:
raise HTTPException(status_code=503, detail="Agents not initialized")
patient = request.patient
bed = request.bed
available_nurses = request.available_nurses
logger.info(
f"Nurse assignment request for patient: {patient.get('patient_id')}"
)
# Use Communicator Agent to assign nurse
task_data = {
"task_type": "nursing",
"description": f"Prepare bed {bed.get('bed_id')} for patient {patient.get('name')}",
"bed_id": bed.get("bed_id"),
"patient_id": patient.get("patient_id"),
"priority": "high",
"role": "nurse",
}
result = await communicator_agent.process(
{"type": "assign_staff", "task_data": task_data}
)
if result.get("success"):
assignment = result.get("data", {})
response = NurseAgentResponse(
selected_nurse_id=assignment.get("staff_id"),
reason=assignment.get(
"reasoning", "Selected based on workload and specialization"
),
)
else:
# Fallback
if available_nurses:
response = NurseAgentResponse(
selected_nurse_id=available_nurses[0].get("user_id"),
reason="First available nurse",
)
else:
response = NurseAgentResponse(
selected_nurse_id=None,
reason="No nurses available",
)
logger.info(f"Nurse assignment complete: {response.selected_nurse_id}")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in nurse agent: {e}")
# Fallback
if request.available_nurses:
return NurseAgentResponse(
selected_nurse_id=request.available_nurses[0].get("user_id"),
reason=f"Fallback assignment",
)
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
# ==================== MAIN ====================
if __name__ == "__main__":
# Get port from environment or use default
import os
import uvicorn
port = int(os.getenv("AGENT_SERVER_PORT", "9000"))
logger.info(f"Starting Agent Server on port {port}...")
uvicorn.run(app, host="0.0.0.0", port=port)