Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import time | |
| import requests | |
| import logging | |
| # Add parent directory to path so we can import utils | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from utils.gemini_client import ask_gemini | |
| from utils.supabase_client import fetch_context_for_task | |
| BACKEND_BASE = os.environ.get("BACKEND_BASE", "http://localhost:7860") | |
| POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", 5)) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| def poll_tasks(role): | |
| url = f"{BACKEND_BASE}/api/tasks/pending" | |
| params = {"role": role, "include": "context"} | |
| try: | |
| r = requests.get(url, params=params, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| except Exception as e: | |
| logging.exception("Failed to poll tasks") | |
| return [] | |
| def post_decision(task_id, payload): | |
| url = f"{BACKEND_BASE}/api/agent/tasks/{task_id}/decision" | |
| r = requests.post(url, json=payload, timeout=30) | |
| if r.status_code >= 300: | |
| logging.error("Decision post failed: %s %s", r.status_code, r.text) | |
| else: | |
| logging.info("Decision posted for task %s: %s", task_id, payload) | |
| return r | |
| def master_loop(): | |
| logging.info("MASTER loop started") | |
| while True: | |
| tasks = poll_tasks("MASTER") | |
| for t in tasks: | |
| task_id = t["id"] | |
| logging.info("MASTER got task %s type=%s", task_id, t.get("type")) | |
| # build prompt for gemini using context | |
| context = t.get("context", {}) | |
| prompt = f""" | |
| You are the Master Agent for hospital bed assignments. | |
| Task: {t.get('type')} | |
| Patient: {t.get('patient_id')} | |
| Available beds (summary): {[b['id'] for b in context.get('available_beds', [])]} | |
| Constraints: none | |
| Provide a JSON decision with one of these actions: assign_bed (bed_id), defer (reason) | |
| Respond with only JSON. | |
| """ | |
| reply = ask_gemini(prompt) | |
| logging.info("Gemini raw reply: %s", reply) | |
| # try to parse json from reply; expect minimal JSON | |
| import json | |
| try: | |
| decision_json = json.loads(reply.strip()) | |
| except Exception: | |
| # fallback simple heuristic | |
| if "assign_bed" in reply and "bed" in reply: | |
| # naive parse | |
| import re | |
| m = re.search(r"(\d+)", reply) | |
| bed_id = int(m.group(1)) if m else None | |
| decision_json = {"action": "assign_bed", "bed_id": bed_id} | |
| else: | |
| decision_json = {"action": "defer", "reason": "could_not_parse_gemini"} | |
| post_decision(task_id, decision_json) | |
| time.sleep(POLL_INTERVAL) | |
| def cleaner_loop(): | |
| logging.info("CLEANER loop started") | |
| while True: | |
| tasks = poll_tasks("CLEANER") | |
| for t in tasks: | |
| task_id = t["id"] | |
| logging.info("CLEANER got task %s", task_id) | |
| # simple logic: pick first available cleaner from context.staff where role == 'cleaner' | |
| context = t.get("context", {}) | |
| cleaners = [s for s in context.get("staff", []) if s.get("role") == "cleaner" and s.get("available")] | |
| if not cleaners: | |
| post_decision(task_id, {"action": "defer", "reason": "no_cleaner_available"}) | |
| continue | |
| cleaner = cleaners[0] | |
| payload = {"action": "assign_cleaner", "cleaner_id": cleaner["id"]} | |
| post_decision(task_id, payload) | |
| # simulate cleaning time by waiting (optional) and then mark cleaning done via backend driver or new task creation | |
| time.sleep(POLL_INTERVAL) | |
| def nurse_loop(): | |
| logging.info("NURSE loop started") | |
| while True: | |
| tasks = poll_tasks("NURSE") | |
| for t in tasks: | |
| task_id = t["id"] | |
| logging.info("NURSE got task %s", task_id) | |
| context = t.get("context", {}) | |
| nurses = [s for s in context.get("staff", []) if s.get("role") == "nurse" and s.get("available")] | |
| if not nurses: | |
| post_decision(task_id, {"action": "defer", "reason": "no_nurse_available"}) | |
| continue | |
| nurse = nurses[0] | |
| payload = {"action": "assign_nurse", "nurse_id": nurse["id"]} | |
| post_decision(task_id, payload) | |
| time.sleep(POLL_INTERVAL) | |
| if __name__ == "__main__": | |
| # run all loops concurrently in threads | |
| import threading | |
| threads = [ | |
| threading.Thread(target=master_loop, daemon=True), | |
| threading.Thread(target=cleaner_loop, daemon=True), | |
| threading.Thread(target=nurse_loop, daemon=True) | |
| ] | |
| for th in threads: | |
| th.start() | |
| logging.info("All agent threads started") | |
| # keep main thread alive | |
| while True: | |
| time.sleep(60) |