# app.py — Biomedical Device Troubleshooting Assistant (Education-only, Agentic) # Deps: chainlit==1.0.200, python-dotenv==1.0.1, openai==1.35.13 # .env: GEMINI_API_KEY=... (preferred; uses OpenAI-compatible Gemini endpoint) # OR OPENAI_API_KEY=... import os, re, json from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional import chainlit as cl from dotenv import load_dotenv from openai import AsyncOpenAI as _SDKAsyncOpenAI # ========================= # Minimal "agents" shim # ========================= def set_tracing_disabled(_: bool = True): # no-op placeholder return True def function_tool(func: Callable): func._is_tool = True return func class AsyncOpenAI: def __init__(self, api_key: str, base_url: Optional[str] = None): kwargs = {"api_key": api_key} if base_url: kwargs["base_url"] = base_url self._client = _SDKAsyncOpenAI(**kwargs) @property def client(self): return self._client class OpenAIChatCompletionsModel: def __init__(self, model: str, openai_client: AsyncOpenAI): self.model = model self.client = openai_client.client @dataclass class Agent: name: str instructions: str model: OpenAIChatCompletionsModel tools: Optional[List[Callable]] = field(default_factory=list) _tool_specs: Optional[List[Dict[str, Any]]] = field(default=None, init=False) def tool_specs(self) -> Optional[List[Dict[str, Any]]]: if not self.tools: return None if self._tool_specs is None: specs: List[Dict[str, Any]] = [] for t in self.tools: if not getattr(t, "_is_tool", False): continue argnames = list(t.__code__.co_varnames[:t.__code__.co_argcount]) specs.append({ "type": "function", "function": { "name": t.__name__, "description": (t.__doc__ or "")[:512], "parameters": { "type": "object", "properties": {p: {"type": "string"} for p in argnames}, "required": argnames, }, }, }) self._tool_specs = specs return self._tool_specs class Runner: """Robust tool loop with forced finalization (prevents endless Plan→Act cycles).""" @staticmethod async def run( agent: Agent, user_input: str, context: Optional[Dict[str, Any]] = None, turns: int = 7, max_tools: int = 3, ): msgs = [ {"role": "system", "content": agent.instructions}, { "role": "system", "content": ( "You have access to these tools: " + ", ".join(t.__name__ for t in (agent.tools or []) if getattr(t, "_is_tool", False)) + f". Use at most {max_tools} tool calls, then produce the final answer in the required sections." ), }, {"role": "user", "content": user_input}, ] tool_specs = agent.tool_specs() tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)} tool_uses = 0 for step in range(max(1, min(12, turns))): force_final = (tool_uses >= max_tools) or (step == turns - 1) resp = await agent.model.client.chat.completions.create( model=agent.model.model, messages=msgs, tools=None if force_final else tool_specs, tool_choice="none" if force_final else "auto", ) msg = resp.choices[0].message tool_calls = getattr(msg, "tool_calls", None) # Normal completion (no tool calls) => return if not tool_calls: out = type("Result", (), {})() out.final_output = (msg.content or "").strip() out.context = context or {} out.final_output_as = lambda *_: out.final_output return out # If we forced finalization but the model still tried tools, nudge and iterate if force_final: msgs.append({ "role": "system", "content": "Do not call tools anymore. Produce the final answer now in the required sections." }) continue # Execute tool calls msgs.append({"role": "assistant", "content": msg.content or "", "tool_calls": tool_calls}) for call in tool_calls: fn_name = call.function.name args = json.loads(call.function.arguments or "{}") if fn_name not in tool_map: result = {"error": f"Unknown tool '{fn_name}'. Available: " + ", ".join(tool_map.keys())} else: try: result = tool_map[fn_name](**args) except Exception as e: result = {"error": str(e)} msgs.append({ "role": "tool", "tool_call_id": call.id, "name": fn_name, "content": json.dumps(result), }) tool_uses += 1 # One last attempt to finalize with tools disabled msgs.append({"role": "system", "content": "Stop tool use. Provide the final answer now."}) resp = await agent.model.client.chat.completions.create( model=agent.model.model, messages=msgs, tools=None, tool_choice="none" ) out = type("Result", (), {})() out.final_output = (resp.choices[0].message.content or "I couldn’t generate a plan.").strip() out.context = context or {} out.final_output_as = lambda *_: out.final_output return out # ========================= # Model setup (Gemini via OpenAI-compatible OR OpenAI) # ========================= def setup_model() -> OpenAIChatCompletionsModel: load_dotenv() gem = os.getenv("Gem") oai = os.getenv("OPENAI_API_KEY") if gem: client = AsyncOpenAI(api_key=gem, base_url="https://generativelanguage.googleapis.com/v1beta/openai/") return OpenAIChatCompletionsModel("gemini-2.5-flash", client) if oai: client = AsyncOpenAI(api_key=oai) return OpenAIChatCompletionsModel("gpt-4o-mini", client) raise RuntimeError("Add GEMINI_API_KEY or OPENAI_API_KEY to your .env") llm_model = setup_model() set_tracing_disabled(True) # ========================= # Guardrails (fast & local) # ========================= COMMANDS = ("/help", "/policy", "/agent", "/simple", "/tools") TOPIC_TOKENS = tuple(""" biomedical biomed device equipment oem troubleshoot fault error alarm probe sensor lead cable battery power calibration qc verification analyzer self-test ventilator defibrillator infusion pump ecg oximeter nibp monitor ultrasound anesthesia syringe suction spirometer glucometer flow pressure temperature module waveform leak occlusion charger display keypad """.split()) RE_FORBIDDEN = { "clinical_advice": re.compile(r"\b(diagnos(e|is|tic)|prescrib|medicat|treat(ment|ing)?|dose|drug|therapy)\b", re.I), "invasive_repair": re.compile(r"\b(open(ing)?\s+(device|casing|cover)|solder|reflow|board[- ]level|replace\s+(capacitor|ic))\b", re.I), "alarm_bypass": re.compile(r"\b(bypass|disable|silence)\s+(alarm|alert|safety|interlock)\b", re.I), "firmware": re.compile(r"\b(firmware|bootloader|root|jailbreak|unlock\s+(service|engineer)\s*mode|password\s*override|backdoor)\b", re.I), } RE_PHI = { "email": re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.I), "phone": re.compile(r"(?:\+\d{1,3}[-\s.]*)?(?:\(?\d{3,4}\)?[-\s.]*)?\d{3}[-\s.]?\d{4}"), "id": re.compile(r"\b(CNIC|MRN|passport|nid|national\s*id|social\s*security|aadhaar)\b", re.I), } def on_topic(text: str) -> bool: low = (text or "").lower().strip() return bool(low) and (low.startswith(COMMANDS) or any(t in low for t in TOPIC_TOKENS)) def guard_issues(text: str) -> List[str]: low = (text or "").lower() issues = [k for k, rx in RE_FORBIDDEN.items() if rx.search(low)] if any(rx.search(text) for rx in RE_PHI.values()): issues.append("phi") return issues # ========================= # Education-only tools # ========================= def _bullets(items: List[str]) -> str: return "\n".join(f"- {x}" for x in (items or [])) or "-" @function_tool def quick_reference(device_and_symptom: str) -> dict: """ Generic, non-invasive reference for troubleshooting (education-only). """ s = (device_and_symptom or "").lower() life = any(k in s for k in ["ventilator", "defibrillator", "infusion pump"]) return { "safety": [ "If attached to a patient, ensure backup monitoring before checks.", "Do not open the casing; follow OEM & facility policy.", "Do not bypass or silence alarms beyond OEM instructions.", *(["Life-support device: remove from service and use backup if malfunction suspected."] if life else []) ], "common_faults": [ "Power/battery issues or loose connections.", "Damaged/incorrect accessories (leads, probes, tubing).", "Configuration/profile mismatch for the clinical setup.", "Environmental interference (EMI), filters/clogs, or mechanical obstruction." ], "quick_checks": [ "Note model/serial/firmware and any error codes.", "Verify power or battery; try another outlet; reseat user-removable battery.", "Inspect & reseat accessories; swap with a known-good.", "Confirm settings; restore defaults if safe.", "Run self-test and review OEM quick-reference." ], "qc": [ "Verify against a reference/simulator (ECG, SpO₂, NIBP, flow) where applicable.", "Perform electrical safety tests per policy.", "Document & compare with prior QC." ], "escalate": [ "Any failed self-test or QC/safety test.", "Persistent faults after basic checks or visible damage/liquid ingress.", "Life-critical pathway: remove from service and escalate to Biomed/OEM." ], } @function_tool def safety_policy() -> dict: """Concise policy block enforced by the assistant.""" return {"policy": [ "Education-only troubleshooting; no clinical advice.", "No invasive repairs; no alarm bypass; no firmware tampering.", "No collecting/sharing personal identifiers.", "OEM manuals & local policy take precedence." ]} @function_tool def record_step(note: str) -> dict: """Append an action/observation to an agent trace (shown to user).""" trace: List[str] = cl.user_session.get("agent_trace") or [] clean = " ".join((note or "").split()) if clean: trace.append(clean[:500]) cl.user_session.set("agent_trace", trace) return {"ok": True, "len": len(trace)} @function_tool def error_code_lookup(model: str, code: str) -> dict: """Friendly generic notes for common codes (extendable).""" key = f"{(model or '').strip().lower()}::{(code or '').strip().lower()}" KB = { "generic::e01": {"meaning": "General startup/self-test failure.", "notes": ["Safe power-cycle (off-patient). Check mains/battery & connections. Re-run self-test. Escalate if persistent."]}, "generic::occlusion": {"meaning": "Flow obstruction detected.", "notes": ["Check clamps/kinks/filters. Reload set per OEM. Do not bypass alarms."]}, "monitor::leads_off": {"meaning": "ECG leads not detected / poor contact.", "notes": ["Verify placement & skin prep. Swap known-good leads. Route away from mains/EMI."]}, } data = KB.get(key) or KB.get(f"generic::{(code or '').strip().lower()}") if not data: return {"found": False, "message": "No local entry. Use OEM quick-ref.", "examples": list(KB.keys())} return {"found": True, "model": model, "code": code, "meaning": data["meaning"], "education_notes": data["notes"]} @function_tool def acronym_expand(term: str) -> dict: """Explain common biomed/acquisition acronyms (study aid).""" t = (term or "").strip().lower() MAP = { "ecg": "Electrocardiogram.", "nibp": "Non-Invasive Blood Pressure.", "spo2": "Peripheral oxygen saturation (pulse oximetry).", "peep": "Positive End-Expiratory Pressure (ventilation).", "fio2": "Fraction of inspired oxygen.", "vt": "Tidal Volume.", "hu": "Hounsfield Units (CT).", "snr": "Signal-to-Noise Ratio.", "tr": "Repetition Time (MRI).", "te": "Echo Time (MRI).", "flair": "Fluid-Attenuated Inversion Recovery (MRI).", "adc": "Apparent Diffusion Coefficient (MRI).", } return {"term": term, "meaning": MAP.get(t, "Not in the quick list. Use OEM/teaching refs.")} @function_tool def unit_convert(value: str, from_unit: str, to_unit: str) -> dict: """ QC-friendly conversions: mmHg↔kPa (1 kPa = 7.50062 mmHg) | L/min↔mL/s (1 L/min = 16.6667 mL/s) | mV↔µV (1 mV = 1000 µV) """ def f(x): return float(str(x).strip()) v = f(value); fu = (from_unit or "").lower(); tu = (to_unit or "").lower() if fu == "mmhg" and tu == "kpa": return {"ok": True, "result": v/7.50062} if fu == "kpa" and tu == "mmhg": return {"ok": True, "result": v*7.50062} if fu in ["l/min","lpm"] and tu in ["ml/s","mlps"]: return {"ok": True, "result": v*1000/60} if fu in ["ml/s","mlps"] and tu in ["l/min","lpm"]: return {"ok": True, "result": v*60/1000} if fu == "mv" and tu in ["µv","uv"]: return {"ok": True, "result": v*1000} if fu in ["µv","uv"] and tu == "mv": return {"ok": True, "result": v/1000} return {"ok": False, "error": "Unsupported units. Try mmHg↔kPa, L/min↔mL/s, mV↔µV."} @function_tool def triage_priority(device_and_symptom: str) -> dict: """Heuristic priority: life-critical / high / standard (education-only).""" s = (device_and_symptom or "").lower() life = any(k in s for k in ["ventilator","defibrillator","infusion pump"]) high = any(k in s for k in ["alarm","occlusion","no output","no reading","self-test fail","charging issue"]) return {"priority": "life-critical" if life else ("high" if high else "standard"), "explain": "Heuristic only. Follow local escalation policy."} @function_tool def simulator_baselines(device: str) -> dict: """Educational simulator targets (generic; not OEM-specific).""" d = (device or "").lower() bank = { "ecg": ["1 mV @ 10 mm/mV (paper 25 mm/s).", "Check lead-fault detection & baseline stability."], "nibp": ["Phantom ~120/80 mmHg within policy tolerance.", "Leak test & overpressure safety."], "spo2": ["Simulator steps e.g., 97%, 90%, 85% (device-dependent).", "Verify pleth and probe integrity."], "ventilator": ["Tidal volume/flow with analyzer; verify alarms.", "Circuit leak test; filters per policy."], } for k, v in bank.items(): if k in d: return {"device": device, "baselines": v} return {"device": device, "baselines": ["Use appropriate reference/simulator.", "Document vs prior QC; check alarm trips."]} @function_tool def env_checklist(_: str) -> dict: """Environment/power/EMI checklist (safe).""" return {"checks": [ "Verify mains outlet & ground; power indicator on.", "Avoid extension cords; try a known-good outlet.", "Route signal cables away from mains/transformers.", "Eliminate strong RF/EMI sources nearby.", "Check filters/vents; ambient temp/humidity within spec." ]} @function_tool def accessory_checklist(_: str) -> dict: """Accessory fit/damage/compatibility checklist.""" return {"checks": [ "Inspect connectors for bent pins/corrosion; ensure tight seating.", "Check cables/tubing for cuts/kinks; confirm model compatibility.", "Swap to a known-good accessory.", "Respect single-use vs reusable policy." ]} @function_tool def build_qc_template(device: str) -> dict: """Markdown QC template for notes/CMMS (education-only).""" d = (device or "Device").strip() md = f"""### QC/Verification — {d} - Date / Tech: - Model / Serial / FW: - Visual Inspection: (✔/✖) Notes: - Electrical Safety: (✔/✖) Leakage / Ground: - Functional Checks: - Reference/Simulator values → Measured: - Alarms exercised → - Accessories (list & status): - Final Assessment: Pass / Conditional / Fail - Next Steps / Escalation: """ return {"markdown": md} # ========================= # Agentic planner + simple fallback # ========================= PLANNER_INSTRUCTIONS = ( "You are an **agentic biomedical device troubleshooting planner** for clinical engineers.\n" "SCOPE (strict): education-only. No clinical advice. No invasive repairs. No alarm bypass. No firmware tampering. No PHI.\n" "Use at most **3 tool calls** and then **produce the final answer with no further tool calls**.\n" "Loop up to 3 times: PLAN → ACT (tool) → OBSERVE (summarize via record_step).\n\n" "Finally output exactly these sections:\n" "1) Safety First\n" "2) Likely Causes (ranked)\n" "3) Step-by-Step Checks (non-invasive; no alarm bypass)\n" "4) QC/Calibration\n" "5) Escalate When\n" "End with a one-line summary. If life-critical & patient-connected is implied, start with:\n" "REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n" ) planner_agent = Agent( name="Agentic Planner", instructions=PLANNER_INSTRUCTIONS, model=llm_model, tools=[ quick_reference, safety_policy, record_step, error_code_lookup, acronym_expand, unit_convert, triage_priority, simulator_baselines, env_checklist, accessory_checklist, build_qc_template ], ) SIMPLE_SYSTEM = ( "You are a biomedical device troubleshooting assistant (education-only). " "Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary." ) async def simple_plan(user_desc: str) -> str: resp = await llm_model.client.chat.completions.create( model=llm_model.model, messages=[{"role": "system", "content": SIMPLE_SYSTEM}, {"role": "user", "content": f"Device & symptom:\n{user_desc.strip()}"}], ) return resp.choices[0].message.content or "" # ========================= # UI strings # ========================= WELCOME = ( "🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n" "Education-only. No diagnosis, no invasive service, no alarm bypass. OEM & policy rule the day.\n\n" "Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n" "Commands: **/agent** agentic (default) · **/simple** single-pass · **/policy** rules · **/tools** list tools · **/help**" ) POLICY = ( "🛡️ **Safety & Scope Policy**\n" "- Scope: biomedical device troubleshooting (education-only).\n" "- No clinical advice (diagnosis/treatment/dosing/medications).\n" "- No invasive repairs; no alarm bypass; no firmware tampering.\n" "- No collecting/sharing personal identifiers (PHI).\n" "- OEM manuals & local policy take priority." ) REFUSAL = ( "🚫 I can’t help with diagnosis/treatment, invasive repair, alarm bypass, firmware hacks, or handling personal identifiers.\n" "I can guide safe, non-invasive troubleshooting and educational QC steps." ) TOOLS_LIST = ( "🔧 **Available Tools**\n" "- quick_reference(text)\n" "- safety_policy()\n" "- record_step(note)\n" "- error_code_lookup(model, code)\n" "- acronym_expand(term)\n" "- unit_convert(value, from_unit, to_unit)\n" "- triage_priority(text)\n" "- simulator_baselines(device)\n" "- env_checklist(context)\n" "- accessory_checklist(device)\n" "- build_qc_template(device)\n" ) # ========================= # Chainlit flow # ========================= @cl.on_chat_start async def on_start(): cl.user_session.set("mode", "agent") # 'agent' | 'simple' cl.user_session.set("agent_trace", []) await cl.Message(content=WELCOME).send() @cl.on_message async def on_message(msg: cl.Message): text = (msg.content or "").strip() low = text.lower() # Commands if low.startswith("/help"): await cl.Message(content=WELCOME).send(); return if low.startswith("/policy"): await cl.Message(content=POLICY).send(); return if low.startswith("/tools"): await cl.Message(content=TOOLS_LIST).send(); return if low.startswith("/agent"): cl.user_session.set("mode", "agent") await cl.Message(content="✅ Agentic mode enabled.").send(); return if low.startswith("/simple"): cl.user_session.set("mode", "simple") await cl.Message(content="✅ Simple mode enabled.").send(); return # Guardrails if not on_topic(text): await cl.Message( content="I only support **biomedical device troubleshooting**. " "Describe the device & symptom (e.g., “ECG noisy baseline”)." ).send(); return issues = guard_issues(text) if issues: await cl.Message(content=REFUSAL + "\n\n" + POLICY).send(); return mode = cl.user_session.get("mode") or "agent" trace_before = len(cl.user_session.get("agent_trace") or []) # Generate answer if mode == "agent": try: result = await Runner.run(planner_agent, text, turns=7, max_tools=3) answer = result.final_output or "I couldn’t generate a plan." except Exception as e: await cl.Message(content=f"⚠️ Planner error: {e}\nFalling back to simple mode.").send() answer = await simple_plan(text) else: answer = await simple_plan(text) # Agent trace (new steps) trace = cl.user_session.get("agent_trace") or [] new_trace = trace[trace_before:] trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n" if new_trace else "" # Deterministic quick reference qref = quick_reference(text) quick_md = ( f"### 📘 Quick Reference\n" f"**Safety**\n{_bullets(qref.get('safety'))}\n\n" f"**Common Faults**\n{_bullets(qref.get('common_faults'))}\n\n" f"**Quick Checks**\n{_bullets(qref.get('quick_checks'))}\n\n" f"**QC / Calibration**\n{_bullets(qref.get('qc'))}\n\n" f"**Escalate If**\n{_bullets(qref.get('escalate'))}\n\n" f"> ⚠️ Education-only. Follow OEM & local policy.\n" ) await cl.Message(content=f"{trace_md}{quick_md}\n---\n{answer}").send()