|
|
""" |
|
|
CareFlow Nexus - Pharmacy Agent |
|
|
Hugging Face Deployment Ready |
|
|
|
|
|
This agent manages medication dispensing, inventory tracking, and prescription fulfillment |
|
|
for the CareFlow Nexus hospital operating system. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import os |
|
|
import uuid |
|
|
from datetime import datetime, timedelta |
|
|
from enum import Enum |
|
|
from typing import Any, Dict, List, Literal, Optional |
|
|
|
|
|
import google.generativeai as genai |
|
|
import uvicorn |
|
|
from fastapi import BackgroundTasks, FastAPI, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
AGENT_VERSION = "1.0.0" |
|
|
AGENT_NAME = "Pharmacy Agent" |
|
|
AGENT_ID = "pharmacy-agent-001" |
|
|
|
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
if GEMINI_API_KEY: |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
gemini_model = genai.GenerativeModel("gemini-pro") |
|
|
else: |
|
|
gemini_model = None |
|
|
print("⚠️ Warning: GEMINI_API_KEY not found. AI features will be disabled.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MedicationStatus(str, Enum): |
|
|
PENDING = "pending" |
|
|
IN_PROGRESS = "in_progress" |
|
|
READY = "ready" |
|
|
DISPENSED = "dispensed" |
|
|
CANCELLED = "cancelled" |
|
|
OUT_OF_STOCK = "out_of_stock" |
|
|
|
|
|
|
|
|
class PrescriptionRequest(BaseModel): |
|
|
prescription_id: Optional[str] = None |
|
|
patient_id: str = Field(..., description="Patient identifier") |
|
|
patient_name: str = Field(..., description="Patient name") |
|
|
doctor_id: str = Field(..., description="Prescribing doctor ID") |
|
|
medications: List[Dict[str, Any]] = Field(..., description="List of medications") |
|
|
priority: Literal["low", "medium", "high", "urgent"] = "medium" |
|
|
notes: Optional[str] = None |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"patient_id": "P12345", |
|
|
"patient_name": "John Doe", |
|
|
"doctor_id": "D001", |
|
|
"medications": [ |
|
|
{ |
|
|
"name": "Amoxicillin", |
|
|
"dosage": "500mg", |
|
|
"frequency": "3x daily", |
|
|
"duration": "7 days", |
|
|
"quantity": 21, |
|
|
} |
|
|
], |
|
|
"priority": "high", |
|
|
"notes": "Patient has penicillin allergy - verify alternative", |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class InventoryItem(BaseModel): |
|
|
medication_name: str |
|
|
stock_quantity: int |
|
|
unit: str = "units" |
|
|
expiry_date: Optional[str] = None |
|
|
reorder_level: int = 50 |
|
|
location: Optional[str] = None |
|
|
|
|
|
|
|
|
class PrescriptionResponse(BaseModel): |
|
|
prescription_id: str |
|
|
status: MedicationStatus |
|
|
patient_id: str |
|
|
patient_name: str |
|
|
medications: List[Dict[str, Any]] |
|
|
estimated_time: Optional[str] = None |
|
|
pharmacist_notes: Optional[str] = None |
|
|
created_at: str |
|
|
updated_at: str |
|
|
|
|
|
|
|
|
class TaskStatusResponse(BaseModel): |
|
|
task_id: str |
|
|
status: Literal["pending", "in_progress", "completed", "failed"] |
|
|
progress: int = Field(ge=0, le=100) |
|
|
message: str |
|
|
result: Optional[Dict[str, Any]] = None |
|
|
created_at: str |
|
|
updated_at: str |
|
|
|
|
|
|
|
|
class HealthResponse(BaseModel): |
|
|
status: Literal["healthy", "degraded", "unhealthy"] |
|
|
agent: str |
|
|
version: str |
|
|
uptime_seconds: float |
|
|
active_tasks: int |
|
|
total_processed: int |
|
|
timestamp: str |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prescriptions_db: Dict[str, Dict[str, Any]] = {} |
|
|
tasks_db: Dict[str, Dict[str, Any]] = {} |
|
|
inventory_db: Dict[str, InventoryItem] = { |
|
|
"Amoxicillin": InventoryItem( |
|
|
medication_name="Amoxicillin", |
|
|
stock_quantity=500, |
|
|
unit="tablets", |
|
|
expiry_date="2025-12-31", |
|
|
reorder_level=100, |
|
|
location="A-12", |
|
|
), |
|
|
"Ibuprofen": InventoryItem( |
|
|
medication_name="Ibuprofen", |
|
|
stock_quantity=800, |
|
|
unit="tablets", |
|
|
expiry_date="2025-11-30", |
|
|
reorder_level=150, |
|
|
location="B-05", |
|
|
), |
|
|
"Paracetamol": InventoryItem( |
|
|
medication_name="Paracetamol", |
|
|
stock_quantity=1200, |
|
|
unit="tablets", |
|
|
expiry_date="2026-03-15", |
|
|
reorder_level=200, |
|
|
location="B-06", |
|
|
), |
|
|
"Insulin": InventoryItem( |
|
|
medication_name="Insulin", |
|
|
stock_quantity=75, |
|
|
unit="vials", |
|
|
expiry_date="2025-06-30", |
|
|
reorder_level=20, |
|
|
location="C-01-Refrigerated", |
|
|
), |
|
|
"Aspirin": InventoryItem( |
|
|
medication_name="Aspirin", |
|
|
stock_quantity=600, |
|
|
unit="tablets", |
|
|
expiry_date="2025-10-20", |
|
|
reorder_level=100, |
|
|
location="A-15", |
|
|
), |
|
|
} |
|
|
|
|
|
|
|
|
agent_stats = {"start_time": datetime.now(), "total_processed": 0, "active_tasks": 0} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="CareFlow Nexus - Pharmacy Agent", |
|
|
description="AI-powered medication management and dispensing agent", |
|
|
version=AGENT_VERSION, |
|
|
docs_url="/docs", |
|
|
redoc_url="/redoc", |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def check_drug_interactions(medications: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
"""Use Gemini AI to check for potential drug interactions""" |
|
|
if not gemini_model: |
|
|
return { |
|
|
"has_interactions": False, |
|
|
"message": "AI check unavailable", |
|
|
"warnings": [], |
|
|
} |
|
|
|
|
|
try: |
|
|
med_names = [med.get("name", "") for med in medications] |
|
|
prompt = f"""As a clinical pharmacist AI, analyze these medications for potential drug interactions: |
|
|
|
|
|
Medications: {", ".join(med_names)} |
|
|
|
|
|
Provide: |
|
|
1. Any potential drug-drug interactions |
|
|
2. Severity level (low/medium/high/critical) |
|
|
3. Recommended actions or alternatives |
|
|
4. Special monitoring requirements |
|
|
|
|
|
Format response as JSON with keys: has_interactions (bool), severity (string), interactions (list), recommendations (list)""" |
|
|
|
|
|
response = gemini_model.generate_content(prompt) |
|
|
|
|
|
|
|
|
ai_analysis = { |
|
|
"has_interactions": "interaction" in response.text.lower(), |
|
|
"ai_response": response.text, |
|
|
"checked_at": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
return ai_analysis |
|
|
except Exception as e: |
|
|
return { |
|
|
"has_interactions": False, |
|
|
"error": str(e), |
|
|
"warnings": ["AI check failed"], |
|
|
} |
|
|
|
|
|
|
|
|
async def generate_pharmacist_notes(prescription: Dict[str, Any]) -> str: |
|
|
"""Use Gemini AI to generate professional pharmacist notes""" |
|
|
if not gemini_model: |
|
|
return "Prescription verified and approved for dispensing." |
|
|
|
|
|
try: |
|
|
prompt = f"""As a clinical pharmacist, provide brief professional notes for this prescription: |
|
|
|
|
|
Patient: {prescription.get("patient_name")} |
|
|
Medications: {json.dumps(prescription.get("medications", []), indent=2)} |
|
|
Priority: {prescription.get("priority")} |
|
|
Doctor's Notes: {prescription.get("notes", "None")} |
|
|
|
|
|
Provide: |
|
|
1. Key counseling points for the patient |
|
|
2. Administration instructions |
|
|
3. Important warnings or precautions |
|
|
4. Storage requirements |
|
|
|
|
|
Keep it concise (max 3-4 sentences).""" |
|
|
|
|
|
response = gemini_model.generate_content(prompt) |
|
|
return response.text.strip() |
|
|
except Exception as e: |
|
|
return f"Prescription approved. Standard counseling recommended. (AI note generation failed: {str(e)})" |
|
|
|
|
|
|
|
|
def check_inventory(medications: List[Dict[str, Any]]) -> tuple[bool, List[str]]: |
|
|
"""Check if all medications are in stock""" |
|
|
out_of_stock = [] |
|
|
|
|
|
for med in medications: |
|
|
med_name = med.get("name", "") |
|
|
quantity = med.get("quantity", 0) |
|
|
|
|
|
if med_name in inventory_db: |
|
|
if inventory_db[med_name].stock_quantity < quantity: |
|
|
out_of_stock.append( |
|
|
f"{med_name} (need {quantity}, have {inventory_db[med_name].stock_quantity})" |
|
|
) |
|
|
else: |
|
|
out_of_stock.append(f"{med_name} (not in inventory)") |
|
|
|
|
|
return len(out_of_stock) == 0, out_of_stock |
|
|
|
|
|
|
|
|
def update_inventory(medications: List[Dict[str, Any]]): |
|
|
"""Update inventory after dispensing""" |
|
|
for med in medications: |
|
|
med_name = med.get("name", "") |
|
|
quantity = med.get("quantity", 0) |
|
|
|
|
|
if med_name in inventory_db: |
|
|
inventory_db[med_name].stock_quantity -= quantity |
|
|
|
|
|
|
|
|
async def process_prescription_task(prescription_id: str): |
|
|
"""Background task to process prescription""" |
|
|
try: |
|
|
|
|
|
tasks_db[prescription_id]["status"] = "in_progress" |
|
|
tasks_db[prescription_id]["progress"] = 10 |
|
|
tasks_db[prescription_id]["message"] = "Verifying prescription..." |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
prescription = prescriptions_db[prescription_id] |
|
|
tasks_db[prescription_id]["progress"] = 20 |
|
|
tasks_db[prescription_id]["message"] = "Checking drug interactions with AI..." |
|
|
|
|
|
interaction_check = await check_drug_interactions(prescription["medications"]) |
|
|
if interaction_check.get("has_interactions"): |
|
|
prescriptions_db[prescription_id]["pharmacist_notes"] = ( |
|
|
f"⚠️ AI Alert: {interaction_check.get('ai_response', 'Potential interactions detected')}" |
|
|
) |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
tasks_db[prescription_id]["progress"] = 30 |
|
|
tasks_db[prescription_id]["message"] = "Checking inventory..." |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
prescription = prescriptions_db[prescription_id] |
|
|
in_stock, out_of_stock_items = check_inventory(prescription["medications"]) |
|
|
|
|
|
if not in_stock: |
|
|
tasks_db[prescription_id]["status"] = "failed" |
|
|
tasks_db[prescription_id]["progress"] = 100 |
|
|
tasks_db[prescription_id]["message"] = ( |
|
|
f"Out of stock: {', '.join(out_of_stock_items)}" |
|
|
) |
|
|
prescriptions_db[prescription_id]["status"] = MedicationStatus.OUT_OF_STOCK |
|
|
prescriptions_db[prescription_id]["pharmacist_notes"] = ( |
|
|
f"Out of stock: {', '.join(out_of_stock_items)}" |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
tasks_db[prescription_id]["progress"] = 50 |
|
|
tasks_db[prescription_id]["message"] = "Preparing medications..." |
|
|
await asyncio.sleep(2) |
|
|
|
|
|
tasks_db[prescription_id]["progress"] = 70 |
|
|
tasks_db[prescription_id]["message"] = "Labeling and packaging..." |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
tasks_db[prescription_id]["progress"] = 90 |
|
|
tasks_db[prescription_id]["message"] = "Final quality check..." |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
update_inventory(prescription["medications"]) |
|
|
|
|
|
|
|
|
tasks_db[prescription_id]["progress"] = 95 |
|
|
tasks_db[prescription_id]["message"] = "Generating counseling notes..." |
|
|
|
|
|
ai_notes = await generate_pharmacist_notes(prescription) |
|
|
if not prescriptions_db[prescription_id].get("pharmacist_notes"): |
|
|
prescriptions_db[prescription_id]["pharmacist_notes"] = ai_notes |
|
|
|
|
|
|
|
|
tasks_db[prescription_id]["status"] = "completed" |
|
|
tasks_db[prescription_id]["progress"] = 100 |
|
|
tasks_db[prescription_id]["message"] = "Prescription ready for pickup" |
|
|
tasks_db[prescription_id]["result"] = { |
|
|
"prescription_id": prescription_id, |
|
|
"ready_time": datetime.now().isoformat(), |
|
|
"ai_enhanced": gemini_model is not None, |
|
|
} |
|
|
|
|
|
prescriptions_db[prescription_id]["status"] = MedicationStatus.READY |
|
|
prescriptions_db[prescription_id]["updated_at"] = datetime.now().isoformat() |
|
|
|
|
|
agent_stats["total_processed"] += 1 |
|
|
agent_stats["active_tasks"] -= 1 |
|
|
|
|
|
except Exception as e: |
|
|
tasks_db[prescription_id]["status"] = "failed" |
|
|
tasks_db[prescription_id]["progress"] = 100 |
|
|
tasks_db[prescription_id]["message"] = f"Error: {str(e)}" |
|
|
agent_stats["active_tasks"] -= 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", tags=["General"]) |
|
|
async def root(): |
|
|
"""Root endpoint""" |
|
|
return { |
|
|
"agent": AGENT_NAME, |
|
|
"version": AGENT_VERSION, |
|
|
"status": "online", |
|
|
"endpoints": { |
|
|
"health": "/health", |
|
|
"docs": "/docs", |
|
|
"prescriptions": "/api/pharmacy/prescriptions", |
|
|
"inventory": "/api/pharmacy/inventory", |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse, tags=["General"]) |
|
|
async def health_check(): |
|
|
"""Health check endpoint""" |
|
|
uptime = (datetime.now() - agent_stats["start_time"]).total_seconds() |
|
|
|
|
|
return HealthResponse( |
|
|
status="healthy" if gemini_model else "degraded", |
|
|
agent=AGENT_NAME, |
|
|
version=AGENT_VERSION, |
|
|
uptime_seconds=uptime, |
|
|
active_tasks=agent_stats["active_tasks"], |
|
|
total_processed=agent_stats["total_processed"], |
|
|
timestamp=datetime.now().isoformat(), |
|
|
) |
|
|
|
|
|
|
|
|
@app.post( |
|
|
"/api/pharmacy/prescriptions", |
|
|
response_model=PrescriptionResponse, |
|
|
tags=["Pharmacy"], |
|
|
) |
|
|
async def create_prescription( |
|
|
request: PrescriptionRequest, background_tasks: BackgroundTasks |
|
|
): |
|
|
""" |
|
|
Submit a new prescription for processing |
|
|
|
|
|
The agent will: |
|
|
1. Verify prescription details |
|
|
2. Check inventory availability |
|
|
3. Prepare medications |
|
|
4. Package and label |
|
|
5. Notify when ready for pickup |
|
|
""" |
|
|
|
|
|
prescription_id = request.prescription_id or f"RX-{uuid.uuid4().hex[:8].upper()}" |
|
|
|
|
|
|
|
|
if prescription_id in prescriptions_db: |
|
|
raise HTTPException(status_code=400, detail="Prescription ID already exists") |
|
|
|
|
|
|
|
|
in_stock, out_of_stock_items = check_inventory(request.medications) |
|
|
|
|
|
|
|
|
prescription = { |
|
|
"prescription_id": prescription_id, |
|
|
"status": MedicationStatus.PENDING |
|
|
if in_stock |
|
|
else MedicationStatus.OUT_OF_STOCK, |
|
|
"patient_id": request.patient_id, |
|
|
"patient_name": request.patient_name, |
|
|
"doctor_id": request.doctor_id, |
|
|
"medications": request.medications, |
|
|
"priority": request.priority, |
|
|
"notes": request.notes, |
|
|
"estimated_time": (datetime.now() + timedelta(minutes=15)).isoformat() |
|
|
if in_stock |
|
|
else None, |
|
|
"pharmacist_notes": None |
|
|
if in_stock |
|
|
else f"Out of stock: {', '.join(out_of_stock_items)}", |
|
|
"created_at": datetime.now().isoformat(), |
|
|
"updated_at": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
prescriptions_db[prescription_id] = prescription |
|
|
|
|
|
|
|
|
tasks_db[prescription_id] = { |
|
|
"task_id": prescription_id, |
|
|
"status": "pending" if in_stock else "failed", |
|
|
"progress": 0 if in_stock else 100, |
|
|
"message": "Prescription submitted" |
|
|
if in_stock |
|
|
else f"Out of stock: {', '.join(out_of_stock_items)}", |
|
|
"result": None, |
|
|
"created_at": datetime.now().isoformat(), |
|
|
"updated_at": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
if in_stock: |
|
|
agent_stats["active_tasks"] += 1 |
|
|
background_tasks.add_task(process_prescription_task, prescription_id) |
|
|
|
|
|
return PrescriptionResponse(**prescription) |
|
|
|
|
|
|
|
|
@app.get( |
|
|
"/api/pharmacy/prescriptions/{prescription_id}", |
|
|
response_model=PrescriptionResponse, |
|
|
tags=["Pharmacy"], |
|
|
) |
|
|
async def get_prescription(prescription_id: str): |
|
|
"""Get prescription status by ID""" |
|
|
if prescription_id not in prescriptions_db: |
|
|
raise HTTPException(status_code=404, detail="Prescription not found") |
|
|
|
|
|
return PrescriptionResponse(**prescriptions_db[prescription_id]) |
|
|
|
|
|
|
|
|
@app.get( |
|
|
"/api/pharmacy/prescriptions", |
|
|
response_model=List[PrescriptionResponse], |
|
|
tags=["Pharmacy"], |
|
|
) |
|
|
async def list_prescriptions( |
|
|
status: Optional[MedicationStatus] = None, |
|
|
patient_id: Optional[str] = None, |
|
|
limit: int = 50, |
|
|
): |
|
|
"""List all prescriptions with optional filters""" |
|
|
results = list(prescriptions_db.values()) |
|
|
|
|
|
if status: |
|
|
results = [p for p in results if p["status"] == status] |
|
|
|
|
|
if patient_id: |
|
|
results = [p for p in results if p["patient_id"] == patient_id] |
|
|
|
|
|
results = sorted(results, key=lambda x: x["created_at"], reverse=True) |
|
|
results = results[:limit] |
|
|
|
|
|
return [PrescriptionResponse(**p) for p in results] |
|
|
|
|
|
|
|
|
@app.post("/api/pharmacy/prescriptions/{prescription_id}/dispense", tags=["Pharmacy"]) |
|
|
async def dispense_prescription(prescription_id: str): |
|
|
"""Mark prescription as dispensed (picked up by patient)""" |
|
|
if prescription_id not in prescriptions_db: |
|
|
raise HTTPException(status_code=404, detail="Prescription not found") |
|
|
|
|
|
prescription = prescriptions_db[prescription_id] |
|
|
|
|
|
if prescription["status"] != MedicationStatus.READY: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Prescription not ready for dispensing. Current status: {prescription['status']}", |
|
|
) |
|
|
|
|
|
prescription["status"] = MedicationStatus.DISPENSED |
|
|
prescription["updated_at"] = datetime.now().isoformat() |
|
|
|
|
|
return { |
|
|
"message": "Prescription dispensed successfully", |
|
|
"prescription_id": prescription_id, |
|
|
"dispensed_at": prescription["updated_at"], |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/pharmacy/prescriptions/{prescription_id}/cancel", tags=["Pharmacy"]) |
|
|
async def cancel_prescription(prescription_id: str, reason: Optional[str] = None): |
|
|
"""Cancel a prescription""" |
|
|
if prescription_id not in prescriptions_db: |
|
|
raise HTTPException(status_code=404, detail="Prescription not found") |
|
|
|
|
|
prescription = prescriptions_db[prescription_id] |
|
|
|
|
|
if prescription["status"] == MedicationStatus.DISPENSED: |
|
|
raise HTTPException( |
|
|
status_code=400, detail="Cannot cancel dispensed prescription" |
|
|
) |
|
|
|
|
|
prescription["status"] = MedicationStatus.CANCELLED |
|
|
prescription["pharmacist_notes"] = f"Cancelled: {reason or 'No reason provided'}" |
|
|
prescription["updated_at"] = datetime.now().isoformat() |
|
|
|
|
|
return { |
|
|
"message": "Prescription cancelled successfully", |
|
|
"prescription_id": prescription_id, |
|
|
} |
|
|
|
|
|
|
|
|
@app.get( |
|
|
"/api/pharmacy/inventory", response_model=List[InventoryItem], tags=["Pharmacy"] |
|
|
) |
|
|
async def get_inventory(low_stock_only: bool = False): |
|
|
"""Get current medication inventory""" |
|
|
items = list(inventory_db.values()) |
|
|
|
|
|
if low_stock_only: |
|
|
items = [item for item in items if item.stock_quantity <= item.reorder_level] |
|
|
|
|
|
return items |
|
|
|
|
|
|
|
|
@app.put( |
|
|
"/api/pharmacy/inventory/{medication_name}", |
|
|
response_model=InventoryItem, |
|
|
tags=["Pharmacy"], |
|
|
) |
|
|
async def update_inventory_item(medication_name: str, item: InventoryItem): |
|
|
"""Update inventory for a medication""" |
|
|
inventory_db[medication_name] = item |
|
|
return item |
|
|
|
|
|
|
|
|
@app.get( |
|
|
"/api/pharmacy/tasks/{task_id}", response_model=TaskStatusResponse, tags=["Tasks"] |
|
|
) |
|
|
async def get_task_status(task_id: str): |
|
|
"""Get task processing status""" |
|
|
if task_id not in tasks_db: |
|
|
raise HTTPException(status_code=404, detail="Task not found") |
|
|
|
|
|
return TaskStatusResponse(**tasks_db[task_id]) |
|
|
|
|
|
|
|
|
@app.get("/api/pharmacy/stream/{prescription_id}", tags=["Streaming"]) |
|
|
async def stream_prescription_status(prescription_id: str): |
|
|
""" |
|
|
Server-Sent Events (SSE) endpoint for real-time prescription updates |
|
|
""" |
|
|
if prescription_id not in prescriptions_db: |
|
|
raise HTTPException(status_code=404, detail="Prescription not found") |
|
|
|
|
|
async def event_generator(): |
|
|
last_status = None |
|
|
|
|
|
while True: |
|
|
if prescription_id in tasks_db: |
|
|
task = tasks_db[prescription_id] |
|
|
current_status = task["status"] |
|
|
|
|
|
|
|
|
if current_status != last_status: |
|
|
event_data = { |
|
|
"prescription_id": prescription_id, |
|
|
"status": task["status"], |
|
|
"progress": task["progress"], |
|
|
"message": task["message"], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
} |
|
|
|
|
|
yield f"data: {json.dumps(event_data)}\n\n" |
|
|
last_status = current_status |
|
|
|
|
|
|
|
|
if current_status in ["completed", "failed"]: |
|
|
break |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
yield f"data: {json.dumps({'status': 'stream_ended'})}\n\n" |
|
|
|
|
|
return StreamingResponse( |
|
|
event_generator(), |
|
|
media_type="text/event-stream", |
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/api/pharmacy/stats", tags=["Analytics"]) |
|
|
async def get_statistics(): |
|
|
"""Get pharmacy agent statistics""" |
|
|
return { |
|
|
"total_prescriptions": len(prescriptions_db), |
|
|
"active_prescriptions": len( |
|
|
[ |
|
|
p |
|
|
for p in prescriptions_db.values() |
|
|
if p["status"] |
|
|
in [MedicationStatus.PENDING, MedicationStatus.IN_PROGRESS] |
|
|
] |
|
|
), |
|
|
"completed_today": agent_stats["total_processed"], |
|
|
"active_tasks": agent_stats["active_tasks"], |
|
|
"inventory_items": len(inventory_db), |
|
|
"low_stock_items": len( |
|
|
[ |
|
|
item |
|
|
for item in inventory_db.values() |
|
|
if item.stock_quantity <= item.reorder_level |
|
|
] |
|
|
), |
|
|
"uptime_seconds": (datetime.now() - agent_stats["start_time"]).total_seconds(), |
|
|
"ai_enabled": gemini_model is not None, |
|
|
"gemini_configured": GEMINI_API_KEY is not None, |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/pharmacy/ai/analyze", tags=["AI Features"]) |
|
|
async def ai_analyze_prescription( |
|
|
patient_name: str, |
|
|
medications: List[Dict[str, Any]], |
|
|
medical_history: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Use Gemini AI to analyze prescription for potential issues |
|
|
|
|
|
Provides: |
|
|
- Drug interaction analysis |
|
|
- Dosage recommendations |
|
|
- Patient counseling points |
|
|
- Safety warnings |
|
|
""" |
|
|
if not gemini_model: |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="AI features unavailable. GEMINI_API_KEY not configured.", |
|
|
) |
|
|
|
|
|
try: |
|
|
prompt = f"""As an expert clinical pharmacist AI, analyze this prescription: |
|
|
|
|
|
Patient: {patient_name} |
|
|
Medical History: {medical_history or "Not provided"} |
|
|
Medications: {json.dumps(medications, indent=2)} |
|
|
|
|
|
Provide comprehensive analysis: |
|
|
1. Drug-drug interactions (if any) |
|
|
2. Contraindications based on medical history |
|
|
3. Dosage appropriateness |
|
|
4. Patient counseling points |
|
|
5. Monitoring recommendations |
|
|
6. Safety warnings |
|
|
|
|
|
Be specific and clinically relevant.""" |
|
|
|
|
|
response = gemini_model.generate_content(prompt) |
|
|
|
|
|
|
|
|
interaction_check = await check_drug_interactions(medications) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"analysis": response.text, |
|
|
"interaction_check": interaction_check, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"model": "gemini-pro", |
|
|
} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"AI analysis failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False, log_level="info") |
|
|
|