|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os, re, json |
|
|
from dataclasses import dataclass, field |
|
|
from typing import Any, Callable, Dict, List, Optional |
|
|
|
|
|
import chainlit as cl |
|
|
from dotenv import load_dotenv |
|
|
from openai import AsyncOpenAI as _SDKAsyncOpenAI |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_tracing_disabled(_: bool = True): |
|
|
return True |
|
|
|
|
|
def function_tool(func: Callable): |
|
|
func._is_tool = True |
|
|
return func |
|
|
|
|
|
class AsyncOpenAI: |
|
|
def __init__(self, api_key: str, base_url: Optional[str] = None): |
|
|
kwargs = {"api_key": api_key} |
|
|
if base_url: |
|
|
kwargs["base_url"] = base_url |
|
|
self._client = _SDKAsyncOpenAI(**kwargs) |
|
|
@property |
|
|
def client(self): |
|
|
return self._client |
|
|
|
|
|
class OpenAIChatCompletionsModel: |
|
|
def __init__(self, model: str, openai_client: AsyncOpenAI): |
|
|
self.model = model |
|
|
self.client = openai_client.client |
|
|
|
|
|
@dataclass |
|
|
class Agent: |
|
|
name: str |
|
|
instructions: str |
|
|
model: OpenAIChatCompletionsModel |
|
|
tools: Optional[List[Callable]] = field(default_factory=list) |
|
|
_tool_specs: Optional[List[Dict[str, Any]]] = field(default=None, init=False) |
|
|
|
|
|
def tool_specs(self) -> Optional[List[Dict[str, Any]]]: |
|
|
if not self.tools: |
|
|
return None |
|
|
if self._tool_specs is None: |
|
|
specs: List[Dict[str, Any]] = [] |
|
|
for t in self.tools: |
|
|
if not getattr(t, "_is_tool", False): |
|
|
continue |
|
|
argnames = list(t.__code__.co_varnames[:t.__code__.co_argcount]) |
|
|
specs.append({ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": t.__name__, |
|
|
"description": (t.__doc__ or "")[:512], |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": {p: {"type": "string"} for p in argnames}, |
|
|
"required": argnames, |
|
|
}, |
|
|
}, |
|
|
}) |
|
|
self._tool_specs = specs |
|
|
return self._tool_specs |
|
|
|
|
|
class Runner: |
|
|
"""Robust tool loop with forced finalization (prevents endless Plan→Act cycles).""" |
|
|
@staticmethod |
|
|
async def run( |
|
|
agent: Agent, |
|
|
user_input: str, |
|
|
context: Optional[Dict[str, Any]] = None, |
|
|
turns: int = 7, |
|
|
max_tools: int = 3, |
|
|
): |
|
|
msgs = [ |
|
|
{"role": "system", "content": agent.instructions}, |
|
|
{ |
|
|
"role": "system", |
|
|
"content": ( |
|
|
"You have access to these tools: " |
|
|
+ ", ".join(t.__name__ for t in (agent.tools or []) if getattr(t, "_is_tool", False)) |
|
|
+ f". Use at most {max_tools} tool calls, then produce the final answer in the required sections." |
|
|
), |
|
|
}, |
|
|
{"role": "user", "content": user_input}, |
|
|
] |
|
|
|
|
|
tool_specs = agent.tool_specs() |
|
|
tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)} |
|
|
tool_uses = 0 |
|
|
|
|
|
for step in range(max(1, min(12, turns))): |
|
|
force_final = (tool_uses >= max_tools) or (step == turns - 1) |
|
|
|
|
|
resp = await agent.model.client.chat.completions.create( |
|
|
model=agent.model.model, |
|
|
messages=msgs, |
|
|
tools=None if force_final else tool_specs, |
|
|
tool_choice="none" if force_final else "auto", |
|
|
) |
|
|
msg = resp.choices[0].message |
|
|
tool_calls = getattr(msg, "tool_calls", None) |
|
|
|
|
|
|
|
|
if not tool_calls: |
|
|
out = type("Result", (), {})() |
|
|
out.final_output = (msg.content or "").strip() |
|
|
out.context = context or {} |
|
|
out.final_output_as = lambda *_: out.final_output |
|
|
return out |
|
|
|
|
|
|
|
|
if force_final: |
|
|
msgs.append({ |
|
|
"role": "system", |
|
|
"content": "Do not call tools anymore. Produce the final answer now in the required sections." |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
msgs.append({"role": "assistant", "content": msg.content or "", "tool_calls": tool_calls}) |
|
|
for call in tool_calls: |
|
|
fn_name = call.function.name |
|
|
args = json.loads(call.function.arguments or "{}") |
|
|
if fn_name not in tool_map: |
|
|
result = {"error": f"Unknown tool '{fn_name}'. Available: " + ", ".join(tool_map.keys())} |
|
|
else: |
|
|
try: |
|
|
result = tool_map[fn_name](**args) |
|
|
except Exception as e: |
|
|
result = {"error": str(e)} |
|
|
msgs.append({ |
|
|
"role": "tool", |
|
|
"tool_call_id": call.id, |
|
|
"name": fn_name, |
|
|
"content": json.dumps(result), |
|
|
}) |
|
|
tool_uses += 1 |
|
|
|
|
|
|
|
|
msgs.append({"role": "system", "content": "Stop tool use. Provide the final answer now."}) |
|
|
resp = await agent.model.client.chat.completions.create( |
|
|
model=agent.model.model, messages=msgs, tools=None, tool_choice="none" |
|
|
) |
|
|
out = type("Result", (), {})() |
|
|
out.final_output = (resp.choices[0].message.content or "I couldn’t generate a plan.").strip() |
|
|
out.context = context or {} |
|
|
out.final_output_as = lambda *_: out.final_output |
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def setup_model() -> OpenAIChatCompletionsModel: |
|
|
load_dotenv() |
|
|
gem = os.getenv("Gem") |
|
|
oai = os.getenv("OPENAI_API_KEY") |
|
|
if gem: |
|
|
client = AsyncOpenAI(api_key=gem, base_url="https://generativelanguage.googleapis.com/v1beta/openai/") |
|
|
return OpenAIChatCompletionsModel("gemini-2.5-flash", client) |
|
|
if oai: |
|
|
client = AsyncOpenAI(api_key=oai) |
|
|
return OpenAIChatCompletionsModel("gpt-4o-mini", client) |
|
|
raise RuntimeError("Add GEMINI_API_KEY or OPENAI_API_KEY to your .env") |
|
|
|
|
|
llm_model = setup_model() |
|
|
set_tracing_disabled(True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COMMANDS = ("/help", "/policy", "/agent", "/simple", "/tools") |
|
|
|
|
|
TOPIC_TOKENS = tuple(""" |
|
|
biomedical biomed device equipment oem troubleshoot fault error alarm probe sensor lead cable battery power calibration qc |
|
|
verification analyzer self-test ventilator defibrillator infusion pump ecg oximeter nibp monitor ultrasound anesthesia syringe |
|
|
suction spirometer glucometer flow pressure temperature module waveform leak occlusion charger display keypad |
|
|
""".split()) |
|
|
|
|
|
RE_FORBIDDEN = { |
|
|
"clinical_advice": re.compile(r"\b(diagnos(e|is|tic)|prescrib|medicat|treat(ment|ing)?|dose|drug|therapy)\b", re.I), |
|
|
"invasive_repair": re.compile(r"\b(open(ing)?\s+(device|casing|cover)|solder|reflow|board[- ]level|replace\s+(capacitor|ic))\b", re.I), |
|
|
"alarm_bypass": re.compile(r"\b(bypass|disable|silence)\s+(alarm|alert|safety|interlock)\b", re.I), |
|
|
"firmware": re.compile(r"\b(firmware|bootloader|root|jailbreak|unlock\s+(service|engineer)\s*mode|password\s*override|backdoor)\b", re.I), |
|
|
} |
|
|
RE_PHI = { |
|
|
"email": re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.I), |
|
|
"phone": re.compile(r"(?:\+\d{1,3}[-\s.]*)?(?:\(?\d{3,4}\)?[-\s.]*)?\d{3}[-\s.]?\d{4}"), |
|
|
"id": re.compile(r"\b(CNIC|MRN|passport|nid|national\s*id|social\s*security|aadhaar)\b", re.I), |
|
|
} |
|
|
|
|
|
def on_topic(text: str) -> bool: |
|
|
low = (text or "").lower().strip() |
|
|
return bool(low) and (low.startswith(COMMANDS) or any(t in low for t in TOPIC_TOKENS)) |
|
|
|
|
|
def guard_issues(text: str) -> List[str]: |
|
|
low = (text or "").lower() |
|
|
issues = [k for k, rx in RE_FORBIDDEN.items() if rx.search(low)] |
|
|
if any(rx.search(text) for rx in RE_PHI.values()): |
|
|
issues.append("phi") |
|
|
return issues |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _bullets(items: List[str]) -> str: |
|
|
return "\n".join(f"- {x}" for x in (items or [])) or "-" |
|
|
|
|
|
@function_tool |
|
|
def quick_reference(device_and_symptom: str) -> dict: |
|
|
""" |
|
|
Generic, non-invasive reference for troubleshooting (education-only). |
|
|
""" |
|
|
s = (device_and_symptom or "").lower() |
|
|
life = any(k in s for k in ["ventilator", "defibrillator", "infusion pump"]) |
|
|
return { |
|
|
"safety": [ |
|
|
"If attached to a patient, ensure backup monitoring before checks.", |
|
|
"Do not open the casing; follow OEM & facility policy.", |
|
|
"Do not bypass or silence alarms beyond OEM instructions.", |
|
|
*(["Life-support device: remove from service and use backup if malfunction suspected."] if life else []) |
|
|
], |
|
|
"common_faults": [ |
|
|
"Power/battery issues or loose connections.", |
|
|
"Damaged/incorrect accessories (leads, probes, tubing).", |
|
|
"Configuration/profile mismatch for the clinical setup.", |
|
|
"Environmental interference (EMI), filters/clogs, or mechanical obstruction." |
|
|
], |
|
|
"quick_checks": [ |
|
|
"Note model/serial/firmware and any error codes.", |
|
|
"Verify power or battery; try another outlet; reseat user-removable battery.", |
|
|
"Inspect & reseat accessories; swap with a known-good.", |
|
|
"Confirm settings; restore defaults if safe.", |
|
|
"Run self-test and review OEM quick-reference." |
|
|
], |
|
|
"qc": [ |
|
|
"Verify against a reference/simulator (ECG, SpO₂, NIBP, flow) where applicable.", |
|
|
"Perform electrical safety tests per policy.", |
|
|
"Document & compare with prior QC." |
|
|
], |
|
|
"escalate": [ |
|
|
"Any failed self-test or QC/safety test.", |
|
|
"Persistent faults after basic checks or visible damage/liquid ingress.", |
|
|
"Life-critical pathway: remove from service and escalate to Biomed/OEM." |
|
|
], |
|
|
} |
|
|
|
|
|
@function_tool |
|
|
def safety_policy() -> dict: |
|
|
"""Concise policy block enforced by the assistant.""" |
|
|
return {"policy": [ |
|
|
"Education-only troubleshooting; no clinical advice.", |
|
|
"No invasive repairs; no alarm bypass; no firmware tampering.", |
|
|
"No collecting/sharing personal identifiers.", |
|
|
"OEM manuals & local policy take precedence." |
|
|
]} |
|
|
|
|
|
@function_tool |
|
|
def record_step(note: str) -> dict: |
|
|
"""Append an action/observation to an agent trace (shown to user).""" |
|
|
trace: List[str] = cl.user_session.get("agent_trace") or [] |
|
|
clean = " ".join((note or "").split()) |
|
|
if clean: |
|
|
trace.append(clean[:500]) |
|
|
cl.user_session.set("agent_trace", trace) |
|
|
return {"ok": True, "len": len(trace)} |
|
|
|
|
|
@function_tool |
|
|
def error_code_lookup(model: str, code: str) -> dict: |
|
|
"""Friendly generic notes for common codes (extendable).""" |
|
|
key = f"{(model or '').strip().lower()}::{(code or '').strip().lower()}" |
|
|
KB = { |
|
|
"generic::e01": {"meaning": "General startup/self-test failure.", |
|
|
"notes": ["Safe power-cycle (off-patient). Check mains/battery & connections. Re-run self-test. Escalate if persistent."]}, |
|
|
"generic::occlusion": {"meaning": "Flow obstruction detected.", |
|
|
"notes": ["Check clamps/kinks/filters. Reload set per OEM. Do not bypass alarms."]}, |
|
|
"monitor::leads_off": {"meaning": "ECG leads not detected / poor contact.", |
|
|
"notes": ["Verify placement & skin prep. Swap known-good leads. Route away from mains/EMI."]}, |
|
|
} |
|
|
data = KB.get(key) or KB.get(f"generic::{(code or '').strip().lower()}") |
|
|
if not data: |
|
|
return {"found": False, "message": "No local entry. Use OEM quick-ref.", "examples": list(KB.keys())} |
|
|
return {"found": True, "model": model, "code": code, "meaning": data["meaning"], "education_notes": data["notes"]} |
|
|
|
|
|
@function_tool |
|
|
def acronym_expand(term: str) -> dict: |
|
|
"""Explain common biomed/acquisition acronyms (study aid).""" |
|
|
t = (term or "").strip().lower() |
|
|
MAP = { |
|
|
"ecg": "Electrocardiogram.", |
|
|
"nibp": "Non-Invasive Blood Pressure.", |
|
|
"spo2": "Peripheral oxygen saturation (pulse oximetry).", |
|
|
"peep": "Positive End-Expiratory Pressure (ventilation).", |
|
|
"fio2": "Fraction of inspired oxygen.", |
|
|
"vt": "Tidal Volume.", |
|
|
"hu": "Hounsfield Units (CT).", |
|
|
"snr": "Signal-to-Noise Ratio.", |
|
|
"tr": "Repetition Time (MRI).", |
|
|
"te": "Echo Time (MRI).", |
|
|
"flair": "Fluid-Attenuated Inversion Recovery (MRI).", |
|
|
"adc": "Apparent Diffusion Coefficient (MRI).", |
|
|
} |
|
|
return {"term": term, "meaning": MAP.get(t, "Not in the quick list. Use OEM/teaching refs.")} |
|
|
|
|
|
@function_tool |
|
|
def unit_convert(value: str, from_unit: str, to_unit: str) -> dict: |
|
|
""" |
|
|
QC-friendly conversions: |
|
|
mmHg↔kPa (1 kPa = 7.50062 mmHg) | L/min↔mL/s (1 L/min = 16.6667 mL/s) | mV↔µV (1 mV = 1000 µV) |
|
|
""" |
|
|
def f(x): return float(str(x).strip()) |
|
|
v = f(value); fu = (from_unit or "").lower(); tu = (to_unit or "").lower() |
|
|
if fu == "mmhg" and tu == "kpa": return {"ok": True, "result": v/7.50062} |
|
|
if fu == "kpa" and tu == "mmhg": return {"ok": True, "result": v*7.50062} |
|
|
if fu in ["l/min","lpm"] and tu in ["ml/s","mlps"]: return {"ok": True, "result": v*1000/60} |
|
|
if fu in ["ml/s","mlps"] and tu in ["l/min","lpm"]: return {"ok": True, "result": v*60/1000} |
|
|
if fu == "mv" and tu in ["µv","uv"]: return {"ok": True, "result": v*1000} |
|
|
if fu in ["µv","uv"] and tu == "mv": return {"ok": True, "result": v/1000} |
|
|
return {"ok": False, "error": "Unsupported units. Try mmHg↔kPa, L/min↔mL/s, mV↔µV."} |
|
|
|
|
|
@function_tool |
|
|
def triage_priority(device_and_symptom: str) -> dict: |
|
|
"""Heuristic priority: life-critical / high / standard (education-only).""" |
|
|
s = (device_and_symptom or "").lower() |
|
|
life = any(k in s for k in ["ventilator","defibrillator","infusion pump"]) |
|
|
high = any(k in s for k in ["alarm","occlusion","no output","no reading","self-test fail","charging issue"]) |
|
|
return {"priority": "life-critical" if life else ("high" if high else "standard"), |
|
|
"explain": "Heuristic only. Follow local escalation policy."} |
|
|
|
|
|
@function_tool |
|
|
def simulator_baselines(device: str) -> dict: |
|
|
"""Educational simulator targets (generic; not OEM-specific).""" |
|
|
d = (device or "").lower() |
|
|
bank = { |
|
|
"ecg": ["1 mV @ 10 mm/mV (paper 25 mm/s).", "Check lead-fault detection & baseline stability."], |
|
|
"nibp": ["Phantom ~120/80 mmHg within policy tolerance.", "Leak test & overpressure safety."], |
|
|
"spo2": ["Simulator steps e.g., 97%, 90%, 85% (device-dependent).", "Verify pleth and probe integrity."], |
|
|
"ventilator": ["Tidal volume/flow with analyzer; verify alarms.", "Circuit leak test; filters per policy."], |
|
|
} |
|
|
for k, v in bank.items(): |
|
|
if k in d: return {"device": device, "baselines": v} |
|
|
return {"device": device, "baselines": ["Use appropriate reference/simulator.", "Document vs prior QC; check alarm trips."]} |
|
|
|
|
|
@function_tool |
|
|
def env_checklist(_: str) -> dict: |
|
|
"""Environment/power/EMI checklist (safe).""" |
|
|
return {"checks": [ |
|
|
"Verify mains outlet & ground; power indicator on.", |
|
|
"Avoid extension cords; try a known-good outlet.", |
|
|
"Route signal cables away from mains/transformers.", |
|
|
"Eliminate strong RF/EMI sources nearby.", |
|
|
"Check filters/vents; ambient temp/humidity within spec." |
|
|
]} |
|
|
|
|
|
@function_tool |
|
|
def accessory_checklist(_: str) -> dict: |
|
|
"""Accessory fit/damage/compatibility checklist.""" |
|
|
return {"checks": [ |
|
|
"Inspect connectors for bent pins/corrosion; ensure tight seating.", |
|
|
"Check cables/tubing for cuts/kinks; confirm model compatibility.", |
|
|
"Swap to a known-good accessory.", |
|
|
"Respect single-use vs reusable policy." |
|
|
]} |
|
|
|
|
|
@function_tool |
|
|
def build_qc_template(device: str) -> dict: |
|
|
"""Markdown QC template for notes/CMMS (education-only).""" |
|
|
d = (device or "Device").strip() |
|
|
md = f"""### QC/Verification — {d} |
|
|
- Date / Tech: |
|
|
- Model / Serial / FW: |
|
|
- Visual Inspection: (✔/✖) Notes: |
|
|
- Electrical Safety: (✔/✖) Leakage / Ground: |
|
|
- Functional Checks: |
|
|
- Reference/Simulator values → Measured: |
|
|
- Alarms exercised → |
|
|
- Accessories (list & status): |
|
|
- Final Assessment: Pass / Conditional / Fail |
|
|
- Next Steps / Escalation: |
|
|
""" |
|
|
return {"markdown": md} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PLANNER_INSTRUCTIONS = ( |
|
|
"You are an **agentic biomedical device troubleshooting planner** for clinical engineers.\n" |
|
|
"SCOPE (strict): education-only. No clinical advice. No invasive repairs. No alarm bypass. No firmware tampering. No PHI.\n" |
|
|
"Use at most **3 tool calls** and then **produce the final answer with no further tool calls**.\n" |
|
|
"Loop up to 3 times: PLAN → ACT (tool) → OBSERVE (summarize via record_step).\n\n" |
|
|
"Finally output exactly these sections:\n" |
|
|
"1) Safety First\n" |
|
|
"2) Likely Causes (ranked)\n" |
|
|
"3) Step-by-Step Checks (non-invasive; no alarm bypass)\n" |
|
|
"4) QC/Calibration\n" |
|
|
"5) Escalate When\n" |
|
|
"End with a one-line summary. If life-critical & patient-connected is implied, start with:\n" |
|
|
"REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n" |
|
|
) |
|
|
|
|
|
planner_agent = Agent( |
|
|
name="Agentic Planner", |
|
|
instructions=PLANNER_INSTRUCTIONS, |
|
|
model=llm_model, |
|
|
tools=[ |
|
|
quick_reference, safety_policy, record_step, error_code_lookup, acronym_expand, |
|
|
unit_convert, triage_priority, simulator_baselines, env_checklist, |
|
|
accessory_checklist, build_qc_template |
|
|
], |
|
|
) |
|
|
|
|
|
SIMPLE_SYSTEM = ( |
|
|
"You are a biomedical device troubleshooting assistant (education-only). " |
|
|
"Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary." |
|
|
) |
|
|
|
|
|
async def simple_plan(user_desc: str) -> str: |
|
|
resp = await llm_model.client.chat.completions.create( |
|
|
model=llm_model.model, |
|
|
messages=[{"role": "system", "content": SIMPLE_SYSTEM}, |
|
|
{"role": "user", "content": f"Device & symptom:\n{user_desc.strip()}"}], |
|
|
) |
|
|
return resp.choices[0].message.content or "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WELCOME = ( |
|
|
"🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n" |
|
|
"Education-only. No diagnosis, no invasive service, no alarm bypass. OEM & policy rule the day.\n\n" |
|
|
"Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n" |
|
|
"Commands: **/agent** agentic (default) · **/simple** single-pass · **/policy** rules · **/tools** list tools · **/help**" |
|
|
) |
|
|
POLICY = ( |
|
|
"🛡️ **Safety & Scope Policy**\n" |
|
|
"- Scope: biomedical device troubleshooting (education-only).\n" |
|
|
"- No clinical advice (diagnosis/treatment/dosing/medications).\n" |
|
|
"- No invasive repairs; no alarm bypass; no firmware tampering.\n" |
|
|
"- No collecting/sharing personal identifiers (PHI).\n" |
|
|
"- OEM manuals & local policy take priority." |
|
|
) |
|
|
REFUSAL = ( |
|
|
"🚫 I can’t help with diagnosis/treatment, invasive repair, alarm bypass, firmware hacks, or handling personal identifiers.\n" |
|
|
"I can guide safe, non-invasive troubleshooting and educational QC steps." |
|
|
) |
|
|
TOOLS_LIST = ( |
|
|
"🔧 **Available Tools**\n" |
|
|
"- quick_reference(text)\n" |
|
|
"- safety_policy()\n" |
|
|
"- record_step(note)\n" |
|
|
"- error_code_lookup(model, code)\n" |
|
|
"- acronym_expand(term)\n" |
|
|
"- unit_convert(value, from_unit, to_unit)\n" |
|
|
"- triage_priority(text)\n" |
|
|
"- simulator_baselines(device)\n" |
|
|
"- env_checklist(context)\n" |
|
|
"- accessory_checklist(device)\n" |
|
|
"- build_qc_template(device)\n" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.on_chat_start |
|
|
async def on_start(): |
|
|
cl.user_session.set("mode", "agent") |
|
|
cl.user_session.set("agent_trace", []) |
|
|
await cl.Message(content=WELCOME).send() |
|
|
|
|
|
@cl.on_message |
|
|
async def on_message(msg: cl.Message): |
|
|
text = (msg.content or "").strip() |
|
|
low = text.lower() |
|
|
|
|
|
|
|
|
if low.startswith("/help"): |
|
|
await cl.Message(content=WELCOME).send(); return |
|
|
if low.startswith("/policy"): |
|
|
await cl.Message(content=POLICY).send(); return |
|
|
if low.startswith("/tools"): |
|
|
await cl.Message(content=TOOLS_LIST).send(); return |
|
|
if low.startswith("/agent"): |
|
|
cl.user_session.set("mode", "agent") |
|
|
await cl.Message(content="✅ Agentic mode enabled.").send(); return |
|
|
if low.startswith("/simple"): |
|
|
cl.user_session.set("mode", "simple") |
|
|
await cl.Message(content="✅ Simple mode enabled.").send(); return |
|
|
|
|
|
|
|
|
if not on_topic(text): |
|
|
await cl.Message( |
|
|
content="I only support **biomedical device troubleshooting**. " |
|
|
"Describe the device & symptom (e.g., “ECG noisy baseline”)." |
|
|
).send(); return |
|
|
|
|
|
issues = guard_issues(text) |
|
|
if issues: |
|
|
await cl.Message(content=REFUSAL + "\n\n" + POLICY).send(); return |
|
|
|
|
|
mode = cl.user_session.get("mode") or "agent" |
|
|
trace_before = len(cl.user_session.get("agent_trace") or []) |
|
|
|
|
|
|
|
|
if mode == "agent": |
|
|
try: |
|
|
result = await Runner.run(planner_agent, text, turns=7, max_tools=3) |
|
|
answer = result.final_output or "I couldn’t generate a plan." |
|
|
except Exception as e: |
|
|
await cl.Message(content=f"⚠️ Planner error: {e}\nFalling back to simple mode.").send() |
|
|
answer = await simple_plan(text) |
|
|
else: |
|
|
answer = await simple_plan(text) |
|
|
|
|
|
|
|
|
trace = cl.user_session.get("agent_trace") or [] |
|
|
new_trace = trace[trace_before:] |
|
|
trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n" if new_trace else "" |
|
|
|
|
|
|
|
|
qref = quick_reference(text) |
|
|
quick_md = ( |
|
|
f"### 📘 Quick Reference\n" |
|
|
f"**Safety**\n{_bullets(qref.get('safety'))}\n\n" |
|
|
f"**Common Faults**\n{_bullets(qref.get('common_faults'))}\n\n" |
|
|
f"**Quick Checks**\n{_bullets(qref.get('quick_checks'))}\n\n" |
|
|
f"**QC / Calibration**\n{_bullets(qref.get('qc'))}\n\n" |
|
|
f"**Escalate If**\n{_bullets(qref.get('escalate'))}\n\n" |
|
|
f"> ⚠️ Education-only. Follow OEM & local policy.\n" |
|
|
) |
|
|
|
|
|
await cl.Message(content=f"{trace_md}{quick_md}\n---\n{answer}").send() |
|
|
|