care_agets / agents /worker.py
omgy's picture
Update agents/worker.py
bbac27a verified
import os
import sys
import time
import requests
import logging
# Add parent directory to path so we can import utils
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.gemini_client import ask_gemini
from utils.supabase_client import fetch_context_for_task
BACKEND_BASE = os.environ.get("BACKEND_BASE", "http://localhost:7860")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", 5))
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def poll_tasks(role):
url = f"{BACKEND_BASE}/api/tasks/pending"
params = {"role": role, "include": "context"}
try:
r = requests.get(url, params=params, timeout=30)
r.raise_for_status()
return r.json()
except Exception as e:
logging.exception("Failed to poll tasks")
return []
def post_decision(task_id, payload):
url = f"{BACKEND_BASE}/api/agent/tasks/{task_id}/decision"
r = requests.post(url, json=payload, timeout=30)
if r.status_code >= 300:
logging.error("Decision post failed: %s %s", r.status_code, r.text)
else:
logging.info("Decision posted for task %s: %s", task_id, payload)
return r
def master_loop():
logging.info("MASTER loop started")
while True:
tasks = poll_tasks("MASTER")
for t in tasks:
task_id = t["id"]
logging.info("MASTER got task %s type=%s", task_id, t.get("type"))
# build prompt for gemini using context
context = t.get("context", {})
prompt = f"""
You are the Master Agent for hospital bed assignments.
Task: {t.get('type')}
Patient: {t.get('patient_id')}
Available beds (summary): {[b['id'] for b in context.get('available_beds', [])]}
Constraints: none
Provide a JSON decision with one of these actions: assign_bed (bed_id), defer (reason)
Respond with only JSON.
"""
reply = ask_gemini(prompt)
logging.info("Gemini raw reply: %s", reply)
# try to parse json from reply; expect minimal JSON
import json
try:
decision_json = json.loads(reply.strip())
except Exception:
# fallback simple heuristic
if "assign_bed" in reply and "bed" in reply:
# naive parse
import re
m = re.search(r"(\d+)", reply)
bed_id = int(m.group(1)) if m else None
decision_json = {"action": "assign_bed", "bed_id": bed_id}
else:
decision_json = {"action": "defer", "reason": "could_not_parse_gemini"}
post_decision(task_id, decision_json)
time.sleep(POLL_INTERVAL)
def cleaner_loop():
logging.info("CLEANER loop started")
while True:
tasks = poll_tasks("CLEANER")
for t in tasks:
task_id = t["id"]
logging.info("CLEANER got task %s", task_id)
# simple logic: pick first available cleaner from context.staff where role == 'cleaner'
context = t.get("context", {})
cleaners = [s for s in context.get("staff", []) if s.get("role") == "cleaner" and s.get("available")]
if not cleaners:
post_decision(task_id, {"action": "defer", "reason": "no_cleaner_available"})
continue
cleaner = cleaners[0]
payload = {"action": "assign_cleaner", "cleaner_id": cleaner["id"]}
post_decision(task_id, payload)
# simulate cleaning time by waiting (optional) and then mark cleaning done via backend driver or new task creation
time.sleep(POLL_INTERVAL)
def nurse_loop():
logging.info("NURSE loop started")
while True:
tasks = poll_tasks("NURSE")
for t in tasks:
task_id = t["id"]
logging.info("NURSE got task %s", task_id)
context = t.get("context", {})
nurses = [s for s in context.get("staff", []) if s.get("role") == "nurse" and s.get("available")]
if not nurses:
post_decision(task_id, {"action": "defer", "reason": "no_nurse_available"})
continue
nurse = nurses[0]
payload = {"action": "assign_nurse", "nurse_id": nurse["id"]}
post_decision(task_id, payload)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
# run all loops concurrently in threads
import threading
threads = [
threading.Thread(target=master_loop, daemon=True),
threading.Thread(target=cleaner_loop, daemon=True),
threading.Thread(target=nurse_loop, daemon=True)
]
for th in threads:
th.start()
logging.info("All agent threads started")
# keep main thread alive
while True:
time.sleep(60)