ibadhasnain's picture
Update app.py
c837390 verified
# app.py — Biomedical Device Troubleshooting Assistant (Education-only, Agentic)
# Deps: chainlit==1.0.200, python-dotenv==1.0.1, openai==1.35.13
# .env: GEMINI_API_KEY=... (preferred; uses OpenAI-compatible Gemini endpoint)
# OR OPENAI_API_KEY=...
import os, re, json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
import chainlit as cl
from dotenv import load_dotenv
from openai import AsyncOpenAI as _SDKAsyncOpenAI
# =========================
# Minimal "agents" shim
# =========================
def set_tracing_disabled(_: bool = True): # no-op placeholder
return True
def function_tool(func: Callable):
func._is_tool = True
return func
class AsyncOpenAI:
def __init__(self, api_key: str, base_url: Optional[str] = None):
kwargs = {"api_key": api_key}
if base_url:
kwargs["base_url"] = base_url
self._client = _SDKAsyncOpenAI(**kwargs)
@property
def client(self):
return self._client
class OpenAIChatCompletionsModel:
def __init__(self, model: str, openai_client: AsyncOpenAI):
self.model = model
self.client = openai_client.client
@dataclass
class Agent:
name: str
instructions: str
model: OpenAIChatCompletionsModel
tools: Optional[List[Callable]] = field(default_factory=list)
_tool_specs: Optional[List[Dict[str, Any]]] = field(default=None, init=False)
def tool_specs(self) -> Optional[List[Dict[str, Any]]]:
if not self.tools:
return None
if self._tool_specs is None:
specs: List[Dict[str, Any]] = []
for t in self.tools:
if not getattr(t, "_is_tool", False):
continue
argnames = list(t.__code__.co_varnames[:t.__code__.co_argcount])
specs.append({
"type": "function",
"function": {
"name": t.__name__,
"description": (t.__doc__ or "")[:512],
"parameters": {
"type": "object",
"properties": {p: {"type": "string"} for p in argnames},
"required": argnames,
},
},
})
self._tool_specs = specs
return self._tool_specs
class Runner:
"""Robust tool loop with forced finalization (prevents endless Plan→Act cycles)."""
@staticmethod
async def run(
agent: Agent,
user_input: str,
context: Optional[Dict[str, Any]] = None,
turns: int = 7,
max_tools: int = 3,
):
msgs = [
{"role": "system", "content": agent.instructions},
{
"role": "system",
"content": (
"You have access to these tools: "
+ ", ".join(t.__name__ for t in (agent.tools or []) if getattr(t, "_is_tool", False))
+ f". Use at most {max_tools} tool calls, then produce the final answer in the required sections."
),
},
{"role": "user", "content": user_input},
]
tool_specs = agent.tool_specs()
tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)}
tool_uses = 0
for step in range(max(1, min(12, turns))):
force_final = (tool_uses >= max_tools) or (step == turns - 1)
resp = await agent.model.client.chat.completions.create(
model=agent.model.model,
messages=msgs,
tools=None if force_final else tool_specs,
tool_choice="none" if force_final else "auto",
)
msg = resp.choices[0].message
tool_calls = getattr(msg, "tool_calls", None)
# Normal completion (no tool calls) => return
if not tool_calls:
out = type("Result", (), {})()
out.final_output = (msg.content or "").strip()
out.context = context or {}
out.final_output_as = lambda *_: out.final_output
return out
# If we forced finalization but the model still tried tools, nudge and iterate
if force_final:
msgs.append({
"role": "system",
"content": "Do not call tools anymore. Produce the final answer now in the required sections."
})
continue
# Execute tool calls
msgs.append({"role": "assistant", "content": msg.content or "", "tool_calls": tool_calls})
for call in tool_calls:
fn_name = call.function.name
args = json.loads(call.function.arguments or "{}")
if fn_name not in tool_map:
result = {"error": f"Unknown tool '{fn_name}'. Available: " + ", ".join(tool_map.keys())}
else:
try:
result = tool_map[fn_name](**args)
except Exception as e:
result = {"error": str(e)}
msgs.append({
"role": "tool",
"tool_call_id": call.id,
"name": fn_name,
"content": json.dumps(result),
})
tool_uses += 1
# One last attempt to finalize with tools disabled
msgs.append({"role": "system", "content": "Stop tool use. Provide the final answer now."})
resp = await agent.model.client.chat.completions.create(
model=agent.model.model, messages=msgs, tools=None, tool_choice="none"
)
out = type("Result", (), {})()
out.final_output = (resp.choices[0].message.content or "I couldn’t generate a plan.").strip()
out.context = context or {}
out.final_output_as = lambda *_: out.final_output
return out
# =========================
# Model setup (Gemini via OpenAI-compatible OR OpenAI)
# =========================
def setup_model() -> OpenAIChatCompletionsModel:
load_dotenv()
gem = os.getenv("Gem")
oai = os.getenv("OPENAI_API_KEY")
if gem:
client = AsyncOpenAI(api_key=gem, base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
return OpenAIChatCompletionsModel("gemini-2.5-flash", client)
if oai:
client = AsyncOpenAI(api_key=oai)
return OpenAIChatCompletionsModel("gpt-4o-mini", client)
raise RuntimeError("Add GEMINI_API_KEY or OPENAI_API_KEY to your .env")
llm_model = setup_model()
set_tracing_disabled(True)
# =========================
# Guardrails (fast & local)
# =========================
COMMANDS = ("/help", "/policy", "/agent", "/simple", "/tools")
TOPIC_TOKENS = tuple("""
biomedical biomed device equipment oem troubleshoot fault error alarm probe sensor lead cable battery power calibration qc
verification analyzer self-test ventilator defibrillator infusion pump ecg oximeter nibp monitor ultrasound anesthesia syringe
suction spirometer glucometer flow pressure temperature module waveform leak occlusion charger display keypad
""".split())
RE_FORBIDDEN = {
"clinical_advice": re.compile(r"\b(diagnos(e|is|tic)|prescrib|medicat|treat(ment|ing)?|dose|drug|therapy)\b", re.I),
"invasive_repair": re.compile(r"\b(open(ing)?\s+(device|casing|cover)|solder|reflow|board[- ]level|replace\s+(capacitor|ic))\b", re.I),
"alarm_bypass": re.compile(r"\b(bypass|disable|silence)\s+(alarm|alert|safety|interlock)\b", re.I),
"firmware": re.compile(r"\b(firmware|bootloader|root|jailbreak|unlock\s+(service|engineer)\s*mode|password\s*override|backdoor)\b", re.I),
}
RE_PHI = {
"email": re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.I),
"phone": re.compile(r"(?:\+\d{1,3}[-\s.]*)?(?:\(?\d{3,4}\)?[-\s.]*)?\d{3}[-\s.]?\d{4}"),
"id": re.compile(r"\b(CNIC|MRN|passport|nid|national\s*id|social\s*security|aadhaar)\b", re.I),
}
def on_topic(text: str) -> bool:
low = (text or "").lower().strip()
return bool(low) and (low.startswith(COMMANDS) or any(t in low for t in TOPIC_TOKENS))
def guard_issues(text: str) -> List[str]:
low = (text or "").lower()
issues = [k for k, rx in RE_FORBIDDEN.items() if rx.search(low)]
if any(rx.search(text) for rx in RE_PHI.values()):
issues.append("phi")
return issues
# =========================
# Education-only tools
# =========================
def _bullets(items: List[str]) -> str:
return "\n".join(f"- {x}" for x in (items or [])) or "-"
@function_tool
def quick_reference(device_and_symptom: str) -> dict:
"""
Generic, non-invasive reference for troubleshooting (education-only).
"""
s = (device_and_symptom or "").lower()
life = any(k in s for k in ["ventilator", "defibrillator", "infusion pump"])
return {
"safety": [
"If attached to a patient, ensure backup monitoring before checks.",
"Do not open the casing; follow OEM & facility policy.",
"Do not bypass or silence alarms beyond OEM instructions.",
*(["Life-support device: remove from service and use backup if malfunction suspected."] if life else [])
],
"common_faults": [
"Power/battery issues or loose connections.",
"Damaged/incorrect accessories (leads, probes, tubing).",
"Configuration/profile mismatch for the clinical setup.",
"Environmental interference (EMI), filters/clogs, or mechanical obstruction."
],
"quick_checks": [
"Note model/serial/firmware and any error codes.",
"Verify power or battery; try another outlet; reseat user-removable battery.",
"Inspect & reseat accessories; swap with a known-good.",
"Confirm settings; restore defaults if safe.",
"Run self-test and review OEM quick-reference."
],
"qc": [
"Verify against a reference/simulator (ECG, SpO₂, NIBP, flow) where applicable.",
"Perform electrical safety tests per policy.",
"Document & compare with prior QC."
],
"escalate": [
"Any failed self-test or QC/safety test.",
"Persistent faults after basic checks or visible damage/liquid ingress.",
"Life-critical pathway: remove from service and escalate to Biomed/OEM."
],
}
@function_tool
def safety_policy() -> dict:
"""Concise policy block enforced by the assistant."""
return {"policy": [
"Education-only troubleshooting; no clinical advice.",
"No invasive repairs; no alarm bypass; no firmware tampering.",
"No collecting/sharing personal identifiers.",
"OEM manuals & local policy take precedence."
]}
@function_tool
def record_step(note: str) -> dict:
"""Append an action/observation to an agent trace (shown to user)."""
trace: List[str] = cl.user_session.get("agent_trace") or []
clean = " ".join((note or "").split())
if clean:
trace.append(clean[:500])
cl.user_session.set("agent_trace", trace)
return {"ok": True, "len": len(trace)}
@function_tool
def error_code_lookup(model: str, code: str) -> dict:
"""Friendly generic notes for common codes (extendable)."""
key = f"{(model or '').strip().lower()}::{(code or '').strip().lower()}"
KB = {
"generic::e01": {"meaning": "General startup/self-test failure.",
"notes": ["Safe power-cycle (off-patient). Check mains/battery & connections. Re-run self-test. Escalate if persistent."]},
"generic::occlusion": {"meaning": "Flow obstruction detected.",
"notes": ["Check clamps/kinks/filters. Reload set per OEM. Do not bypass alarms."]},
"monitor::leads_off": {"meaning": "ECG leads not detected / poor contact.",
"notes": ["Verify placement & skin prep. Swap known-good leads. Route away from mains/EMI."]},
}
data = KB.get(key) or KB.get(f"generic::{(code or '').strip().lower()}")
if not data:
return {"found": False, "message": "No local entry. Use OEM quick-ref.", "examples": list(KB.keys())}
return {"found": True, "model": model, "code": code, "meaning": data["meaning"], "education_notes": data["notes"]}
@function_tool
def acronym_expand(term: str) -> dict:
"""Explain common biomed/acquisition acronyms (study aid)."""
t = (term or "").strip().lower()
MAP = {
"ecg": "Electrocardiogram.",
"nibp": "Non-Invasive Blood Pressure.",
"spo2": "Peripheral oxygen saturation (pulse oximetry).",
"peep": "Positive End-Expiratory Pressure (ventilation).",
"fio2": "Fraction of inspired oxygen.",
"vt": "Tidal Volume.",
"hu": "Hounsfield Units (CT).",
"snr": "Signal-to-Noise Ratio.",
"tr": "Repetition Time (MRI).",
"te": "Echo Time (MRI).",
"flair": "Fluid-Attenuated Inversion Recovery (MRI).",
"adc": "Apparent Diffusion Coefficient (MRI).",
}
return {"term": term, "meaning": MAP.get(t, "Not in the quick list. Use OEM/teaching refs.")}
@function_tool
def unit_convert(value: str, from_unit: str, to_unit: str) -> dict:
"""
QC-friendly conversions:
mmHg↔kPa (1 kPa = 7.50062 mmHg) | L/min↔mL/s (1 L/min = 16.6667 mL/s) | mV↔µV (1 mV = 1000 µV)
"""
def f(x): return float(str(x).strip())
v = f(value); fu = (from_unit or "").lower(); tu = (to_unit or "").lower()
if fu == "mmhg" and tu == "kpa": return {"ok": True, "result": v/7.50062}
if fu == "kpa" and tu == "mmhg": return {"ok": True, "result": v*7.50062}
if fu in ["l/min","lpm"] and tu in ["ml/s","mlps"]: return {"ok": True, "result": v*1000/60}
if fu in ["ml/s","mlps"] and tu in ["l/min","lpm"]: return {"ok": True, "result": v*60/1000}
if fu == "mv" and tu in ["µv","uv"]: return {"ok": True, "result": v*1000}
if fu in ["µv","uv"] and tu == "mv": return {"ok": True, "result": v/1000}
return {"ok": False, "error": "Unsupported units. Try mmHg↔kPa, L/min↔mL/s, mV↔µV."}
@function_tool
def triage_priority(device_and_symptom: str) -> dict:
"""Heuristic priority: life-critical / high / standard (education-only)."""
s = (device_and_symptom or "").lower()
life = any(k in s for k in ["ventilator","defibrillator","infusion pump"])
high = any(k in s for k in ["alarm","occlusion","no output","no reading","self-test fail","charging issue"])
return {"priority": "life-critical" if life else ("high" if high else "standard"),
"explain": "Heuristic only. Follow local escalation policy."}
@function_tool
def simulator_baselines(device: str) -> dict:
"""Educational simulator targets (generic; not OEM-specific)."""
d = (device or "").lower()
bank = {
"ecg": ["1 mV @ 10 mm/mV (paper 25 mm/s).", "Check lead-fault detection & baseline stability."],
"nibp": ["Phantom ~120/80 mmHg within policy tolerance.", "Leak test & overpressure safety."],
"spo2": ["Simulator steps e.g., 97%, 90%, 85% (device-dependent).", "Verify pleth and probe integrity."],
"ventilator": ["Tidal volume/flow with analyzer; verify alarms.", "Circuit leak test; filters per policy."],
}
for k, v in bank.items():
if k in d: return {"device": device, "baselines": v}
return {"device": device, "baselines": ["Use appropriate reference/simulator.", "Document vs prior QC; check alarm trips."]}
@function_tool
def env_checklist(_: str) -> dict:
"""Environment/power/EMI checklist (safe)."""
return {"checks": [
"Verify mains outlet & ground; power indicator on.",
"Avoid extension cords; try a known-good outlet.",
"Route signal cables away from mains/transformers.",
"Eliminate strong RF/EMI sources nearby.",
"Check filters/vents; ambient temp/humidity within spec."
]}
@function_tool
def accessory_checklist(_: str) -> dict:
"""Accessory fit/damage/compatibility checklist."""
return {"checks": [
"Inspect connectors for bent pins/corrosion; ensure tight seating.",
"Check cables/tubing for cuts/kinks; confirm model compatibility.",
"Swap to a known-good accessory.",
"Respect single-use vs reusable policy."
]}
@function_tool
def build_qc_template(device: str) -> dict:
"""Markdown QC template for notes/CMMS (education-only)."""
d = (device or "Device").strip()
md = f"""### QC/Verification — {d}
- Date / Tech:
- Model / Serial / FW:
- Visual Inspection: (✔/✖) Notes:
- Electrical Safety: (✔/✖) Leakage / Ground:
- Functional Checks:
- Reference/Simulator values → Measured:
- Alarms exercised →
- Accessories (list & status):
- Final Assessment: Pass / Conditional / Fail
- Next Steps / Escalation:
"""
return {"markdown": md}
# =========================
# Agentic planner + simple fallback
# =========================
PLANNER_INSTRUCTIONS = (
"You are an **agentic biomedical device troubleshooting planner** for clinical engineers.\n"
"SCOPE (strict): education-only. No clinical advice. No invasive repairs. No alarm bypass. No firmware tampering. No PHI.\n"
"Use at most **3 tool calls** and then **produce the final answer with no further tool calls**.\n"
"Loop up to 3 times: PLAN → ACT (tool) → OBSERVE (summarize via record_step).\n\n"
"Finally output exactly these sections:\n"
"1) Safety First\n"
"2) Likely Causes (ranked)\n"
"3) Step-by-Step Checks (non-invasive; no alarm bypass)\n"
"4) QC/Calibration\n"
"5) Escalate When\n"
"End with a one-line summary. If life-critical & patient-connected is implied, start with:\n"
"REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n"
)
planner_agent = Agent(
name="Agentic Planner",
instructions=PLANNER_INSTRUCTIONS,
model=llm_model,
tools=[
quick_reference, safety_policy, record_step, error_code_lookup, acronym_expand,
unit_convert, triage_priority, simulator_baselines, env_checklist,
accessory_checklist, build_qc_template
],
)
SIMPLE_SYSTEM = (
"You are a biomedical device troubleshooting assistant (education-only). "
"Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary."
)
async def simple_plan(user_desc: str) -> str:
resp = await llm_model.client.chat.completions.create(
model=llm_model.model,
messages=[{"role": "system", "content": SIMPLE_SYSTEM},
{"role": "user", "content": f"Device & symptom:\n{user_desc.strip()}"}],
)
return resp.choices[0].message.content or ""
# =========================
# UI strings
# =========================
WELCOME = (
"🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n"
"Education-only. No diagnosis, no invasive service, no alarm bypass. OEM & policy rule the day.\n\n"
"Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n"
"Commands: **/agent** agentic (default) · **/simple** single-pass · **/policy** rules · **/tools** list tools · **/help**"
)
POLICY = (
"🛡️ **Safety & Scope Policy**\n"
"- Scope: biomedical device troubleshooting (education-only).\n"
"- No clinical advice (diagnosis/treatment/dosing/medications).\n"
"- No invasive repairs; no alarm bypass; no firmware tampering.\n"
"- No collecting/sharing personal identifiers (PHI).\n"
"- OEM manuals & local policy take priority."
)
REFUSAL = (
"🚫 I can’t help with diagnosis/treatment, invasive repair, alarm bypass, firmware hacks, or handling personal identifiers.\n"
"I can guide safe, non-invasive troubleshooting and educational QC steps."
)
TOOLS_LIST = (
"🔧 **Available Tools**\n"
"- quick_reference(text)\n"
"- safety_policy()\n"
"- record_step(note)\n"
"- error_code_lookup(model, code)\n"
"- acronym_expand(term)\n"
"- unit_convert(value, from_unit, to_unit)\n"
"- triage_priority(text)\n"
"- simulator_baselines(device)\n"
"- env_checklist(context)\n"
"- accessory_checklist(device)\n"
"- build_qc_template(device)\n"
)
# =========================
# Chainlit flow
# =========================
@cl.on_chat_start
async def on_start():
cl.user_session.set("mode", "agent") # 'agent' | 'simple'
cl.user_session.set("agent_trace", [])
await cl.Message(content=WELCOME).send()
@cl.on_message
async def on_message(msg: cl.Message):
text = (msg.content or "").strip()
low = text.lower()
# Commands
if low.startswith("/help"):
await cl.Message(content=WELCOME).send(); return
if low.startswith("/policy"):
await cl.Message(content=POLICY).send(); return
if low.startswith("/tools"):
await cl.Message(content=TOOLS_LIST).send(); return
if low.startswith("/agent"):
cl.user_session.set("mode", "agent")
await cl.Message(content="✅ Agentic mode enabled.").send(); return
if low.startswith("/simple"):
cl.user_session.set("mode", "simple")
await cl.Message(content="✅ Simple mode enabled.").send(); return
# Guardrails
if not on_topic(text):
await cl.Message(
content="I only support **biomedical device troubleshooting**. "
"Describe the device & symptom (e.g., “ECG noisy baseline”)."
).send(); return
issues = guard_issues(text)
if issues:
await cl.Message(content=REFUSAL + "\n\n" + POLICY).send(); return
mode = cl.user_session.get("mode") or "agent"
trace_before = len(cl.user_session.get("agent_trace") or [])
# Generate answer
if mode == "agent":
try:
result = await Runner.run(planner_agent, text, turns=7, max_tools=3)
answer = result.final_output or "I couldn’t generate a plan."
except Exception as e:
await cl.Message(content=f"⚠️ Planner error: {e}\nFalling back to simple mode.").send()
answer = await simple_plan(text)
else:
answer = await simple_plan(text)
# Agent trace (new steps)
trace = cl.user_session.get("agent_trace") or []
new_trace = trace[trace_before:]
trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n" if new_trace else ""
# Deterministic quick reference
qref = quick_reference(text)
quick_md = (
f"### 📘 Quick Reference\n"
f"**Safety**\n{_bullets(qref.get('safety'))}\n\n"
f"**Common Faults**\n{_bullets(qref.get('common_faults'))}\n\n"
f"**Quick Checks**\n{_bullets(qref.get('quick_checks'))}\n\n"
f"**QC / Calibration**\n{_bullets(qref.get('qc'))}\n\n"
f"**Escalate If**\n{_bullets(qref.get('escalate'))}\n\n"
f"> ⚠️ Education-only. Follow OEM & local policy.\n"
)
await cl.Message(content=f"{trace_md}{quick_md}\n---\n{answer}").send()