ibadhasnain commited on
Commit
bfb5ba6
·
verified ·
1 Parent(s): 79e94d6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +287 -277
app.py CHANGED
@@ -1,21 +1,20 @@
1
- # app.py — Biomedical Device Troubleshooting Assistant
2
- # Education-only · Manual (PDF/TXT) upload & search · Guardrails · Agentic planner (optional)
3
- # Deps: chainlit==1.0.200, python-dotenv==1.0.1, openai==1.35.13, pypdf>=4.2.0
4
 
5
- import os, io, re, json
6
- from typing import Any, Callable, Dict, List, Optional
7
  from dataclasses import dataclass, field
 
8
 
9
  import chainlit as cl
10
  from dotenv import load_dotenv
11
  from openai import AsyncOpenAI as _SDKAsyncOpenAI
12
- from pypdf import PdfReader
13
 
14
  # =========================
15
- # Minimal "agents" shim (no external package needed)
16
  # =========================
17
- def set_tracing_disabled(disabled: bool = True):
18
- return disabled
19
 
20
  def function_tool(func: Callable):
21
  func._is_tool = True
@@ -26,10 +25,10 @@ class AsyncOpenAI:
26
  kwargs = {"api_key": api_key}
27
  if base_url:
28
  kwargs["base_url"] = base_url
29
- self._client = _SDKAsyncOpenAI(**kwargs)
30
  @property
31
  def client(self):
32
- return self._client
33
 
34
  class OpenAIChatCompletionsModel:
35
  def __init__(self, model: str, openai_client: AsyncOpenAI):
@@ -42,11 +41,16 @@ class Agent:
42
  instructions: str
43
  model: OpenAIChatCompletionsModel
44
  tools: Optional[List[Callable]] = field(default_factory=list)
45
-
46
- def tool_specs(self) -> List[Dict[str, Any]]:
47
- specs = []
48
- for t in (self.tools or []):
49
- if getattr(t, "_is_tool", False):
 
 
 
 
 
50
  argnames = list(t.__code__.co_varnames[:t.__code__.co_argcount])
51
  specs.append({
52
  "type": "function",
@@ -60,39 +64,39 @@ class Agent:
60
  },
61
  },
62
  })
63
- return specs
 
64
 
65
  class Runner:
66
  @staticmethod
67
- async def run(agent: Agent, user_input: str, context: Optional[Dict[str, Any]] = None):
68
  msgs = [
69
  {"role": "system", "content": agent.instructions},
70
  {"role": "user", "content": user_input},
71
  ]
72
- tools = agent.tool_specs()
73
  tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)}
74
 
75
- # small Plan→Act→Observe budget
76
- for _ in range(6):
77
  resp = await agent.model.client.chat.completions.create(
78
  model=agent.model.model,
79
  messages=msgs,
80
- tools=tools or None,
81
- tool_choice="auto" if tools else None,
82
  )
83
  msg = resp.choices[0].message
84
  msgs.append({"role": "assistant", "content": msg.content or "", "tool_calls": msg.tool_calls})
85
 
86
  if msg.tool_calls:
 
87
  for call in msg.tool_calls:
88
  fn_name = call.function.name
89
  args = json.loads(call.function.arguments or "{}")
90
  result = {"error": f"Unknown tool: {fn_name}"}
91
- if fn_name in tool_map:
92
- try:
93
- result = tool_map[fn_name](**args)
94
- except Exception as e:
95
- result = {"error": str(e)}
96
  msgs.append({
97
  "role": "tool",
98
  "tool_call_id": call.id,
@@ -114,150 +118,79 @@ class Runner:
114
  return out
115
 
116
  # =========================
117
- # Provider setup (Gemini or OpenAI)
118
  # =========================
119
- load_dotenv()
120
- GEMINI_API_KEY = os.getenv("Gem")
121
- OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
122
-
123
- if GEMINI_API_KEY:
124
- API_KEY = GEMINI_API_KEY
125
- BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"
126
- MODEL_ID = "gemini-2.5-flash"
127
- elif OPENAI_API_KEY:
128
- API_KEY = OPENAI_API_KEY
129
- BASE_URL = None
130
- MODEL_ID = "gpt-4o-mini"
131
- else:
132
- raise RuntimeError("Missing GEMINI_API_KEY or OPENAI_API_KEY in environment.")
133
-
134
  set_tracing_disabled(True)
135
- ext_client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL)
136
- llm_model = OpenAIChatCompletionsModel(model=MODEL_ID, openai_client=ext_client)
137
 
138
  # =========================
139
- # Guardrails (topic & safety)
140
  # =========================
141
- ALLOWED_COMMANDS = ("/help", "/policy", "/manual", "/clear", "/hits", "/agent", "/simple")
142
-
143
- TOPIC_KEYWORDS = [
144
- "biomedical","biomed","device","equipment","oem","service manual",
145
- "troubleshoot","fault","error","alarm","probe","sensor","lead","cable",
146
- "battery","power","calibration","qc","verification","analyzer","self-test",
147
- "ventilator","defibrillator","infusion","pump","ecg","oximeter","nibp",
148
- "monitor","ultrasound","anesthesia","syringe","suction","spirometer","glucometer"
149
- ]
150
-
151
- RE_FORBIDDEN_CLINICAL = re.compile(r"\b(diagnos(e|is|tic)|prescrib|medicat|treat(ment|ing)?|dose|drug|therapy)\b", re.I)
152
- RE_INVASIVE_REPAIR = re.compile(r"\b(open(ing)?\s+(device|casing|cover)|solder|reflow|board[- ]level|replace\s+(capacitor|ic))\b", re.I)
153
- RE_ALARM_BYPASS = re.compile(r"\b(bypass|disable|silence)\s+(alarm|alert|safety|interlock)\b", re.I)
154
- RE_FIRMWARE_TAMPER = re.compile(r"\b(firmware|bootloader|root|jailbreak|unlock\s+(service|engineer)\s*mode|password\s*override|backdoor)\b", re.I)
155
- RE_EMAIL = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.I)
156
- RE_PHONE = re.compile(r"(?:\+\d{1,3}[-\s.]*)?(?:\(?\d{3,4}\)?[-\s.]*)?\d{3}[-\s.]?\d{4}")
157
- RE_IDHINT = re.compile(r"\b(CNIC|MRN|passport|nid|national\s*id|social\s*security|aadhaar)\b", re.I)
 
 
158
 
159
  def on_topic(text: str) -> bool:
160
  low = (text or "").lower().strip()
161
- if not low:
162
- return False
163
- if any(low.startswith(cmd) for cmd in ALLOWED_COMMANDS):
164
- return True
165
- return any(k in low for k in TOPIC_KEYWORDS)
166
 
167
- def local_guard(text: str) -> List[str]:
168
  low = (text or "").lower()
169
- issues = []
170
- if RE_FORBIDDEN_CLINICAL.search(low): issues.append("clinical_advice")
171
- if RE_INVASIVE_REPAIR.search(low): issues.append("invasive_repair")
172
- if RE_ALARM_BYPASS.search(low): issues.append("alarm_bypass")
173
- if RE_FIRMWARE_TAMPER.search(low): issues.append("firmware_tamper")
174
- if RE_EMAIL.search(text) or RE_PHONE.search(text) or RE_IDHINT.search(text):
175
- issues.append("phi_share_or_collect")
176
  return issues
177
 
178
  # =========================
179
- # Manual handling & search (robust PDF detection)
180
- # =========================
181
- def extract_pdf_pages(data: bytes):
182
- pages = []
183
- reader = PdfReader(io.BytesIO(data))
184
- for i, pg in enumerate(reader.pages, start=1):
185
- try:
186
- txt = pg.extract_text() or ""
187
- except Exception:
188
- txt = ""
189
- pages.append({"page": i, "text": txt})
190
- return pages
191
-
192
- def extract_txt_pages(data: bytes, chunk_chars: int = 1600):
193
- try: txt = data.decode("utf-8", errors="ignore")
194
- except Exception: txt = ""
195
- chunks = [{"page": i+1, "text": txt[i:i+chunk_chars]} for i in range(0, len(txt), chunk_chars)]
196
- return chunks or [{"page": 1, "text": ""}]
197
-
198
- def _is_pdf(file_obj, data: bytes) -> bool:
199
- # Chainlit may expose .type or nothing; avoid relying on .mime
200
- name = getattr(file_obj, "name", "") or getattr(file_obj, "filename", "")
201
- typ = getattr(file_obj, "type", "") or ""
202
- return (
203
- name.lower().endswith(".pdf")
204
- or typ == "application/pdf"
205
- or (isinstance(data, (bytes, bytearray)) and data[:5] == b"%PDF-")
206
- )
207
-
208
- def manual_hits(pages, query: str, topk: int = 3):
209
- if not pages: return []
210
- terms = [w for w in re.findall(r"\w+", (query or "").lower()) if len(w) > 2]
211
- scored = []
212
- for p in pages:
213
- low = (p.get("text") or "").lower()
214
- score = sum(low.count(t) for t in terms)
215
- if score > 0: scored.append((score, p))
216
- scored.sort(key=lambda x: x[0], reverse=True)
217
- hits = [p for _, p in scored[:topk]] or pages[:1]
218
- def excerpt(text: str, window: int = 420):
219
- t = text or ""
220
- low = t.lower()
221
- idxs = [low.find(tk) for tk in terms if tk in low]
222
- start = max(0, min([i for i in idxs if i >= 0], default=0) - window)
223
- end = min(len(t), start + 2*window)
224
- snippet = re.sub(r"\s+", " ", t[start:end]).strip()
225
- snippet = RE_EMAIL.sub("[REDACTED_EMAIL]", snippet)
226
- snippet = RE_PHONE.sub("[REDACTED_PHONE]", snippet)
227
- snippet = RE_IDHINT.sub("[ID]", snippet)
228
- return snippet
229
- return [f"[p.{h['page']}] {excerpt(h.get('text',''))}" for h in hits]
230
-
231
- # =========================
232
- # Tools the agent can call
233
  # =========================
234
- def _current_manual_pages():
235
- m = cl.user_session.get("manual")
236
- return (m or {}).get("pages", [])
237
-
238
- @function_tool
239
- def search_manual(query: str, topk: str = "3") -> dict:
240
- """Search the uploaded manual (PDF/TXT) for the query. Returns {'hits': [snippets...]}. If no manual, hits=[]"""
241
- pages = _current_manual_pages()
242
- k = int(topk) if str(topk).isdigit() else 3
243
- hits = manual_hits(pages, query, topk=k) if pages else []
244
- return {"hits": hits}
245
 
246
  @function_tool
247
  def quick_reference(device_and_symptom: str) -> dict:
248
  """
249
- Deterministic education-only bullets based on free-text device description.
250
  """
251
- d = (device_and_symptom or "").lower()
252
- life_critical = any(k in d for k in ["ventilator","defibrillator","infusion pump"])
253
  return {
254
  "safety": [
255
- "If attached to a patient, ensure backup/alternative monitoring before checks.",
256
- "No invasive service; do not open casing; follow OEM and facility policy.",
257
- "Do not bypass or silence alarms beyond OEM instructions."
258
- ] + (["Life-support device: remove from service and use backup if malfunction suspected."] if life_critical else []),
 
259
  "common_faults": [
260
- "Power/battery supply issues or loose connections.",
261
  "Damaged/incorrect accessories (leads, probes, tubing).",
262
  "Configuration/profile mismatch for the clinical setup.",
263
  "Environmental interference (EMI), filters/clogs, or mechanical obstruction."
@@ -265,67 +198,189 @@ def quick_reference(device_and_symptom: str) -> dict:
265
  "quick_checks": [
266
  "Note model/serial/firmware and any error codes.",
267
  "Verify power source/battery; try another outlet; reseat user-removable battery.",
268
- "Inspect and reseat accessories; swap with a known-good if available.",
269
- "Confirm settings match procedure; restore defaults if safe.",
270
- "Run device self-test if available and review OEM quick-reference."
271
  ],
272
  "qc": [
273
- "Verify against a reference/simulator where applicable (ECG, SpO2, NIBP, flow, etc.).",
274
- "Confirm scheduled electrical safety tests per policy.",
275
- "Document results and compare with historical QC."
276
  ],
277
  "escalate": [
278
  "Any failed self-test or QC/safety test.",
279
- "Persistent faults after basic checks or physical damage/liquid ingress.",
280
- "Life-critical pathway: remove from service immediately and escalate to Biomed/OEM."
281
- ]
282
  }
283
 
284
  @function_tool
285
  def safety_policy() -> dict:
286
- """Return a concise policy block enforced by the assistant."""
287
  return {"policy": [
288
  "Education-only troubleshooting; no clinical advice.",
289
- "No invasive repairs, no alarm bypass, no firmware tampering.",
290
  "No collecting/sharing personal identifiers.",
291
- "OEM manuals and local policy take precedence."
292
  ]}
293
 
294
  @function_tool
295
  def record_step(note: str) -> dict:
296
- """Append an action/observation to the agent trace shown to the user."""
297
  trace: List[str] = cl.user_session.get("agent_trace") or []
298
- clean = re.sub(r"\s+", " ", (note or "")).strip()
299
  if clean:
300
  trace.append(clean[:500])
301
  cl.user_session.set("agent_trace", trace)
302
  return {"ok": True, "len": len(trace)}
303
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
304
  # =========================
305
- # Agentic planner & simple fallback
306
  # =========================
307
  PLANNER_INSTRUCTIONS = (
308
- "You are an agentic biomedical device troubleshooting planner for clinical engineers.\n"
309
- "STRICT SCOPE: Education-only. Do NOT give clinical advice, do NOT instruct invasive repairs, do NOT bypass alarms, "
310
- "do NOT suggest firmware tampering. Respect policy.\n\n"
311
  "Loop up to 3 times:\n"
312
- "- PLAN: Decide what you need (e.g., search_manual for error keywords, or quick_reference).\n"
313
- "- ACT: Call a tool (search_manual, quick_reference, safety_policy, record_step).\n"
314
- "- OBSERVE: Read tool result. Summarize via record_step.\n"
315
- "Then produce a final plan with exactly these sections:\n"
 
316
  "1) Safety First\n"
317
  "2) Likely Causes (ranked)\n"
318
  "3) Step-by-Step Checks (non-invasive; no alarm bypass)\n"
319
  "4) QC/Calibration\n"
320
  "5) Escalate When\n"
321
- "End with a one-line summary. If life-critical & patient-connected is implied, start with: "
322
  "REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n"
323
  )
 
324
  planner_agent = Agent(
325
  name="Agentic Planner",
326
  instructions=PLANNER_INSTRUCTIONS,
327
  model=llm_model,
328
- tools=[search_manual, quick_reference, safety_policy, record_step],
 
 
 
 
329
  )
330
 
331
  SIMPLE_SYSTEM = (
@@ -333,13 +388,11 @@ SIMPLE_SYSTEM = (
333
  "Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary."
334
  )
335
 
336
- async def simple_llm_plan(user_desc: str, manual_excerpts: str = "") -> str:
337
- user_block = f"Device & symptom:\n{user_desc.strip()}"
338
- if manual_excerpts:
339
- user_block += f"\n\nManual excerpts:\n{manual_excerpts.strip()}"
340
- resp = await ext_client.client.chat.completions.create(
341
- model=MODEL_ID,
342
- messages=[{"role":"system","content":SIMPLE_SYSTEM},{"role":"user","content":user_block}],
343
  )
344
  return resp.choices[0].message.content or ""
345
 
@@ -348,38 +401,50 @@ async def simple_llm_plan(user_desc: str, manual_excerpts: str = "") -> str:
348
  # =========================
349
  WELCOME = (
350
  "🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n"
351
- "Education-only. No diagnosis, no invasive service, no alarm bypass. OEM manual & policy rule the day.\n\n"
352
  "Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n"
353
- "Commands: **/manual** upload PDF/TXT, **/hits** show manual matches, **/clear** remove manual,\n"
354
- "**/agent** agentic mode (default), **/simple** single-pass mode, **/policy** view rules."
355
  )
356
  POLICY = (
357
  "🛡️ **Safety & Scope Policy**\n"
358
- "- Scope: biomedical **device troubleshooting** (education-only).\n"
359
  "- No clinical advice (diagnosis/treatment/dosing/medications).\n"
360
- "- No invasive repairs (opening casing, soldering, board-level).\n"
361
- "- No alarm bypass or firmware tampering.\n"
362
- "- No collecting/sharing personal identifiers.\n"
363
  "- OEM manuals & local policy take priority."
364
  )
365
  REFUSAL = (
366
- "🚫 I can’t help with diagnosis/treatment, invasive repair, alarm bypass, firmware hacks, or collecting personal data.\n"
367
- "I can guide **safe, non-invasive troubleshooting** and educational QC steps."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
368
  )
369
 
370
  # =========================
371
  # Chainlit flow
372
  # =========================
373
  @cl.on_chat_start
374
- async def start():
375
- cl.user_session.set("manual", None)
376
- cl.user_session.set("agent_mode", "agent") # 'agent' | 'simple'
377
  cl.user_session.set("agent_trace", [])
378
  await cl.Message(content=WELCOME).send()
379
 
380
  @cl.on_message
381
- async def main(message: cl.Message):
382
- text = (message.content or "").strip()
383
  low = text.lower()
384
 
385
  # Commands
@@ -387,109 +452,54 @@ async def main(message: cl.Message):
387
  await cl.Message(content=WELCOME).send(); return
388
  if low.startswith("/policy"):
389
  await cl.Message(content=POLICY).send(); return
 
 
390
  if low.startswith("/agent"):
391
- cl.user_session.set("agent_mode", "agent")
392
- await cl.Message(content="✅ Agentic mode enabled (planner will Plan→Act→Observe).").send(); return
393
  if low.startswith("/simple"):
394
- cl.user_session.set("agent_mode", "simple")
395
- await cl.Message(content="✅ Simple mode enabled (single LLM pass).").send(); return
396
-
397
- if low.startswith("/manual"):
398
- files = await cl.AskFileMessage(
399
- content="Upload the **service manual** (PDF or TXT). Max ~20 MB.",
400
- accept=["application/pdf", "text/plain"], # UI hint only
401
- max_files=1, max_size_mb=20, timeout=240
402
- ).send()
403
- if not files:
404
- await cl.Message(content="No file received.").send(); return
405
-
406
- f = files[0]
407
- data = getattr(f, "content", None)
408
- if data is None and getattr(f, "path", None):
409
- with open(f.path, "rb") as fh:
410
- data = fh.read()
411
- if not data:
412
- await cl.Message(content="Received file but got empty content. Please re-upload.").send(); return
413
 
414
- try:
415
- if _is_pdf(f, data):
416
- pages = extract_pdf_pages(data)
417
- else:
418
- pages = extract_txt_pages(data)
419
- except Exception as e:
420
- await cl.Message(content=f"Couldn't read the manual: {e}").send(); return
421
-
422
- cl.user_session.set("manual", {"name": getattr(f, "name", "manual"), "pages": pages})
423
- await cl.Message(content=f"✅ Manual indexed: **{getattr(f, 'name', 'manual')}** — {len(pages)} page-chunks.").send()
424
- return
425
-
426
- if low.startswith("/clear"):
427
- cl.user_session.set("manual", None)
428
- await cl.Message(content="Manual cleared.").send()
429
- return
430
-
431
- if low.startswith("/hits"):
432
- m = cl.user_session.get("manual")
433
- if not m or not m.get("pages"):
434
- await cl.Message(content="No manual attached. Use **/manual** to upload one.").send(); return
435
- query = text.replace("/hits", "").strip() or "device fault error alarm"
436
- hits = manual_hits(m["pages"], query, topk=3) or ["No obvious matches — try different wording."]
437
- await cl.Message(content="### 🔎 Manual Matches\n" + "\n\n".join(hits)).send()
438
- return
439
-
440
- # Topic & safety guard
441
  if not on_topic(text):
442
  await cl.Message(
443
  content="I only support **biomedical device troubleshooting**. "
444
- "Describe the device & symptom (e.g., “ECG noisy baseline”), or upload a manual with **/manual**."
445
- ).send()
446
- return
447
- issues = local_guard(text)
448
- if issues:
449
- await cl.Message(content=REFUSAL + "\n\n" + POLICY).send()
450
- return
451
 
452
- # Manual context (for simple fallback)
453
- manual = cl.user_session.get("manual")
454
- manual_name = manual.get("name") if manual else None
455
- excerpts = ""
456
- if manual and manual.get("pages"):
457
- excerpts = "\n".join(manual_hits(manual["pages"], text, topk=3))
458
 
459
- mode = cl.user_session.get("agent_mode") or "agent"
460
  trace_before = len(cl.user_session.get("agent_trace") or [])
461
 
462
  if mode == "agent":
463
  try:
464
- # Agentic planner will call tools (search_manual/quick_reference/record_step) itself
465
- result = await Runner.run(planner_agent, text)
466
  answer = result.final_output or "I couldn’t generate a plan."
467
  except Exception as e:
468
- await cl.Message(content=f"⚠️ Planner failed: {e}\nFalling back to simple mode.").send()
469
- answer = await simple_llm_plan(text, excerpts)
470
  else:
471
- answer = await simple_llm_plan(text, excerpts)
472
 
473
- # Agent trace (new steps only)
474
  trace = cl.user_session.get("agent_trace") or []
475
  new_trace = trace[trace_before:]
476
- trace_md = ""
477
- if new_trace:
478
- trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n"
479
 
480
- # Deterministic quick reference
481
  qref = quick_reference(text)
482
- def bullets(arr: List[str]): return "\n".join(f"- {x}" for x in (arr or [])) or "-"
483
-
484
  quick_md = (
485
  f"### 📘 Quick Reference\n"
486
- f"**Safety**\n{bullets(qref.get('safety'))}\n\n"
487
- f"**Common Faults**\n{bullets(qref.get('common_faults'))}\n\n"
488
- f"**Quick Checks**\n{bullets(qref.get('quick_checks'))}\n\n"
489
- f"**QC / Calibration**\n{bullets(qref.get('qc'))}\n\n"
490
- f"**Escalate If**\n{bullets(qref.get('escalate'))}\n\n"
491
- f"> ⚠️ Education-only. Refer to OEM manual & policy. No invasive service.\n"
492
  )
493
 
494
- prefix = f"📄 Using manual: **{manual_name}**\n\n" if manual_name else ""
495
- await cl.Message(content=f"{trace_md}{quick_md}\n---\n{prefix}{answer}").send()
 
1
+ # app.py — Biomedical Device Troubleshooting Assistant (Education-only, Agentic)
2
+ # Minimal deps: chainlit==1.0.200, python-dotenv==1.0.1, openai==1.35.13
3
+ # .env: GEMINI_API_KEY=... (preferred) OR OPENAI_API_KEY=...
4
 
5
+ import os, re, json
 
6
  from dataclasses import dataclass, field
7
+ from typing import Any, Callable, Dict, List, Optional
8
 
9
  import chainlit as cl
10
  from dotenv import load_dotenv
11
  from openai import AsyncOpenAI as _SDKAsyncOpenAI
 
12
 
13
  # =========================
14
+ # Small agents shim
15
  # =========================
16
+ def set_tracing_disabled(_: bool = True): # placeholder (no-op)
17
+ return True
18
 
19
  def function_tool(func: Callable):
20
  func._is_tool = True
 
25
  kwargs = {"api_key": api_key}
26
  if base_url:
27
  kwargs["base_url"] = base_url
28
+ self._c = _SDKAsyncOpenAI(**kwargs)
29
  @property
30
  def client(self):
31
+ return self._c
32
 
33
  class OpenAIChatCompletionsModel:
34
  def __init__(self, model: str, openai_client: AsyncOpenAI):
 
41
  instructions: str
42
  model: OpenAIChatCompletionsModel
43
  tools: Optional[List[Callable]] = field(default_factory=list)
44
+ _tool_specs: Optional[List[Dict[str, Any]]] = field(default=None, init=False)
45
+
46
+ def tool_specs(self) -> Optional[List[Dict[str, Any]]]:
47
+ if not self.tools:
48
+ return None
49
+ if self._tool_specs is None:
50
+ specs: List[Dict[str, Any]] = []
51
+ for t in self.tools:
52
+ if not getattr(t, "_is_tool", False):
53
+ continue
54
  argnames = list(t.__code__.co_varnames[:t.__code__.co_argcount])
55
  specs.append({
56
  "type": "function",
 
64
  },
65
  },
66
  })
67
+ self._tool_specs = specs
68
+ return self._tool_specs
69
 
70
  class Runner:
71
  @staticmethod
72
+ async def run(agent: Agent, user_input: str, context: Optional[Dict[str, Any]] = None, turns: int = 5):
73
  msgs = [
74
  {"role": "system", "content": agent.instructions},
75
  {"role": "user", "content": user_input},
76
  ]
77
+ tool_specs = agent.tool_specs()
78
  tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)}
79
 
80
+ for _ in range(max(1, min(8, turns))):
 
81
  resp = await agent.model.client.chat.completions.create(
82
  model=agent.model.model,
83
  messages=msgs,
84
+ tools=tool_specs or None,
85
+ tool_choice="auto" if tool_specs else None,
86
  )
87
  msg = resp.choices[0].message
88
  msgs.append({"role": "assistant", "content": msg.content or "", "tool_calls": msg.tool_calls})
89
 
90
  if msg.tool_calls:
91
+ # Plan→Act→Observe
92
  for call in msg.tool_calls:
93
  fn_name = call.function.name
94
  args = json.loads(call.function.arguments or "{}")
95
  result = {"error": f"Unknown tool: {fn_name}"}
96
+ tool = tool_map.get(fn_name)
97
+ if tool:
98
+ try: result = tool(**args)
99
+ except Exception as e: result = {"error": str(e)}
 
100
  msgs.append({
101
  "role": "tool",
102
  "tool_call_id": call.id,
 
118
  return out
119
 
120
  # =========================
121
+ # Model setup (Gemini via OpenAI-compatible OR OpenAI)
122
  # =========================
123
+ def setup_model() -> OpenAIChatCompletionsModel:
124
+ load_dotenv()
125
+ gem = os.getenv("Gem")
126
+ oai = os.getenv("OPENAI_API_KEY")
127
+ if gem:
128
+ client = AsyncOpenAI(api_key=gem, base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
129
+ return OpenAIChatCompletionsModel("gemini-2.5-flash", client)
130
+ if oai:
131
+ client = AsyncOpenAI(api_key=oai)
132
+ return OpenAIChatCompletionsModel("gpt-4o-mini", client)
133
+ raise RuntimeError("Add GEMINI_API_KEY or OPENAI_API_KEY to your .env")
134
+
135
+ llm_model = setup_model()
 
 
136
  set_tracing_disabled(True)
 
 
137
 
138
  # =========================
139
+ # Guardrails (fast & local)
140
  # =========================
141
+ COMMANDS = ("/help", "/policy", "/agent", "/simple", "/tools")
142
+
143
+ TOPIC_TOKENS = tuple("""
144
+ biomedical biomed device equipment oem troubleshoot fault error alarm probe sensor lead cable battery power calibration qc
145
+ verification analyzer self-test ventilator defibrillator infusion pump ecg oximeter nibp monitor ultrasound anesthesia syringe
146
+ suction spirometer glucometer flow pressure temperature module waveform leak occlusion charger display keypad
147
+ """.split())
148
+
149
+ RE_FORBIDDEN = {
150
+ "clinical_advice": re.compile(r"\b(diagnos(e|is|tic)|prescrib|medicat|treat(ment|ing)?|dose|drug|therapy)\b", re.I),
151
+ "invasive_repair": re.compile(r"\b(open(ing)?\s+(device|casing|cover)|solder|reflow|board[- ]level|replace\s+(capacitor|ic))\b", re.I),
152
+ "alarm_bypass": re.compile(r"\b(bypass|disable|silence)\s+(alarm|alert|safety|interlock)\b", re.I),
153
+ "firmware": re.compile(r"\b(firmware|bootloader|root|jailbreak|unlock\s+(service|engineer)\s*mode|password\s*override|backdoor)\b", re.I),
154
+ }
155
+ RE_PHI = {
156
+ "email": re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.I),
157
+ "phone": re.compile(r"(?:\+\d{1,3}[-\s.]*)?(?:\(?\d{3,4}\)?[-\s.]*)?\d{3}[-\s.]?\d{4}"),
158
+ "id": re.compile(r"\b(CNIC|MRN|passport|nid|national\s*id|social\s*security|aadhaar)\b", re.I),
159
+ }
160
 
161
  def on_topic(text: str) -> bool:
162
  low = (text or "").lower().strip()
163
+ return bool(low) and (low.startswith(COMMANDS) or any(t in low for t in TOPIC_TOKENS))
 
 
 
 
164
 
165
+ def guard_issues(text: str) -> List[str]:
166
  low = (text or "").lower()
167
+ issues = [k for k, rx in RE_FORBIDDEN.items() if rx.search(low)]
168
+ if any(rx.search(text) for rx in RE_PHI.values()):
169
+ issues.append("phi")
 
 
 
 
170
  return issues
171
 
172
  # =========================
173
+ # Education-only tools
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
  # =========================
175
+ def _bullets(items: List[str]) -> str:
176
+ return "\n".join(f"- {x}" for x in (items or [])) or "-"
 
 
 
 
 
 
 
 
 
177
 
178
  @function_tool
179
  def quick_reference(device_and_symptom: str) -> dict:
180
  """
181
+ Generic, non-invasive reference for troubleshooting.
182
  """
183
+ s = (device_and_symptom or "").lower()
184
+ life = any(k in s for k in ["ventilator", "defibrillator", "infusion pump"])
185
  return {
186
  "safety": [
187
+ "If attached to a patient, ensure backup monitoring before checks.",
188
+ "Do not open the casing; follow OEM & facility policy.",
189
+ "Do not bypass or silence alarms beyond OEM instructions.",
190
+ *(["Life-support device: remove from service and use backup if malfunction suspected."] if life else [])
191
+ ],
192
  "common_faults": [
193
+ "Power/battery issues or loose connections.",
194
  "Damaged/incorrect accessories (leads, probes, tubing).",
195
  "Configuration/profile mismatch for the clinical setup.",
196
  "Environmental interference (EMI), filters/clogs, or mechanical obstruction."
 
198
  "quick_checks": [
199
  "Note model/serial/firmware and any error codes.",
200
  "Verify power source/battery; try another outlet; reseat user-removable battery.",
201
+ "Inspect & reseat accessories; swap with a known-good.",
202
+ "Confirm settings; restore defaults if safe.",
203
+ "Run device self-test and review OEM quick-reference."
204
  ],
205
  "qc": [
206
+ "Verify against a reference/simulator where applicable (ECG, SpO₂, NIBP, flow).",
207
+ "Perform electrical safety tests per policy.",
208
+ "Document & compare with prior QC."
209
  ],
210
  "escalate": [
211
  "Any failed self-test or QC/safety test.",
212
+ "Persistent faults after basic checks or visible damage/liquid ingress.",
213
+ "Life-critical pathway: remove from service and escalate to Biomed/OEM."
214
+ ],
215
  }
216
 
217
  @function_tool
218
  def safety_policy() -> dict:
219
+ """Concise policy enforced by the assistant."""
220
  return {"policy": [
221
  "Education-only troubleshooting; no clinical advice.",
222
+ "No invasive repairs; no alarm bypass; no firmware tampering.",
223
  "No collecting/sharing personal identifiers.",
224
+ "OEM manuals & local policy take precedence."
225
  ]}
226
 
227
  @function_tool
228
  def record_step(note: str) -> dict:
229
+ """Append an action/observation to an agent trace (shown to user)."""
230
  trace: List[str] = cl.user_session.get("agent_trace") or []
231
+ clean = " ".join((note or "").split())
232
  if clean:
233
  trace.append(clean[:500])
234
  cl.user_session.set("agent_trace", trace)
235
  return {"ok": True, "len": len(trace)}
236
 
237
+ @function_tool
238
+ def error_code_lookup(model: str, code: str) -> dict:
239
+ """
240
+ Friendly generic notes for common codes (extendable).
241
+ """
242
+ key = f"{(model or '').strip().lower()}::{(code or '').strip().lower()}"
243
+ KB = {
244
+ "generic::e01": {"meaning": "General startup/self-test failure.",
245
+ "notes": ["Safe power-cycle (off-patient). Check mains/battery, connections. Re-run self-test. Escalate if persistent."]},
246
+ "generic::occlusion": {"meaning": "Flow obstruction detected.",
247
+ "notes": ["Check clamps/kinks/filters. Reload set per OEM. Do not bypass alarms."]},
248
+ "monitor::leads_off": {"meaning": "ECG leads not detected / poor contact.",
249
+ "notes": ["Verify placement & skin prep. Swap known-good leads. Route away from mains/EMI."]},
250
+ }
251
+ data = KB.get(key) or KB.get(f"generic::{(code or '').strip().lower()}")
252
+ if not data:
253
+ return {"found": False, "message": "No local entry. Use OEM quick-ref.", "examples": list(KB.keys())}
254
+ return {"found": True, "model": model, "code": code, "meaning": data["meaning"], "education_notes": data["notes"]}
255
+
256
+ @function_tool
257
+ def acronym_expand(term: str) -> dict:
258
+ """Explain common biomed/acquisition acronyms (study aid)."""
259
+ t = (term or "").strip().lower()
260
+ MAP = {
261
+ "ecg": "Electrocardiogram.",
262
+ "nibp": "Non-Invasive Blood Pressure.",
263
+ "spo2": "Peripheral oxygen saturation (pulse oximetry).",
264
+ "peep": "Positive End-Expiratory Pressure (ventilation).",
265
+ "fio2": "Fraction of inspired oxygen.",
266
+ "vt": "Tidal Volume.",
267
+ "hu": "Hounsfield Units (CT).",
268
+ "snr": "Signal-to-Noise Ratio.",
269
+ "tr": "Repetition Time (MRI).",
270
+ "te": "Echo Time (MRI).",
271
+ "flair": "Fluid-Attenuated Inversion Recovery (MRI).",
272
+ "adc": "Apparent Diffusion Coefficient (MRI).",
273
+ }
274
+ return {"term": term, "meaning": MAP.get(t, "Not in the quick list. Use OEM/teaching refs.")}
275
+
276
+ @function_tool
277
+ def unit_convert(value: str, from_unit: str, to_unit: str) -> dict:
278
+ """
279
+ Limited, safe conversions used in QC:
280
+ mmHg↔kPa (1 kPa = 7.50062 mmHg) | L/min↔mL/s (1 L/min = 16.6667 mL/s) | mV↔µV (1 mV = 1000 µV)
281
+ """
282
+ def f(x): return float(str(x).strip())
283
+ v = f(value); fu = (from_unit or "").lower(); tu = (to_unit or "").lower()
284
+ if fu == "mmhg" and tu == "kpa": return {"ok": True, "result": v/7.50062}
285
+ if fu == "kpa" and tu == "mmhg": return {"ok": True, "result": v*7.50062}
286
+ if fu in ["l/min","lpm"] and tu in ["ml/s","mlps"]: return {"ok": True, "result": v*1000/60}
287
+ if fu in ["ml/s","mlps"] and tu in ["l/min","lpm"]: return {"ok": True, "result": v*60/1000}
288
+ if fu == "mv" and tu in ["µv","uv"]: return {"ok": True, "result": v*1000}
289
+ if fu in ["µv","uv"] and tu == "mv": return {"ok": True, "result": v/1000}
290
+ return {"ok": False, "error": "Unsupported units. Try mmHg↔kPa, L/min↔mL/s, mV↔µV."}
291
+
292
+ @function_tool
293
+ def triage_priority(device_and_symptom: str) -> dict:
294
+ """Heuristic priority: life-critical / high / standard (education-only)."""
295
+ s = (device_and_symptom or "").lower()
296
+ life = any(k in s for k in ["ventilator","defibrillator","infusion pump"])
297
+ high = any(k in s for k in ["alarm","occlusion","no output","no reading","self-test fail","charging issue"])
298
+ return {"priority": "life-critical" if life else ("high" if high else "standard"),
299
+ "explain": "Heuristic only. Follow local escalation policy."}
300
+
301
+ @function_tool
302
+ def simulator_baselines(device: str) -> dict:
303
+ """Educational simulator targets (generic, not OEM-specific)."""
304
+ d = (device or "").lower()
305
+ bank = {
306
+ "ecg": ["1 mV @ 10 mm/mV (paper 25 mm/s).", "Check lead-fault detection & baseline stability."],
307
+ "nibp": ["Phantom ~120/80 mmHg within policy tolerance.", "Leak test & overpressure safety."],
308
+ "spo2": ["Simulator steps e.g., 97%, 90%, 85% (device-dependent).", "Verify pleth and probe integrity."],
309
+ "ventilator": ["Tidal volume/flow with analyzer; verify alarms.", "Circuit leak test; filters per policy."],
310
+ }
311
+ for k, v in bank.items():
312
+ if k in d: return {"device": device, "baselines": v}
313
+ return {"device": device, "baselines": ["Use appropriate reference/simulator.", "Document vs prior QC; check alarm trips."]}
314
+
315
+ @function_tool
316
+ def env_checklist(_: str) -> dict:
317
+ """Environment/power/EMI checklist for safe troubleshooting."""
318
+ return {"checks": [
319
+ "Verify mains outlet & ground; power indicator on.",
320
+ "Avoid extension cords; try a known-good outlet.",
321
+ "Route signal cables away from mains/transformers.",
322
+ "Eliminate strong RF/EMI sources nearby.",
323
+ "Check filters/vents; ambient temp/humidity within spec."
324
+ ]}
325
+
326
+ @function_tool
327
+ def accessory_checklist(_: str) -> dict:
328
+ """Generic accessory fit/damage/compatibility checklist."""
329
+ return {"checks": [
330
+ "Inspect connectors for bent pins/corrosion; ensure tight seating.",
331
+ "Check cables/tubing for cuts/kinks; confirm model compatibility.",
332
+ "Swap to a known-good accessory.",
333
+ "Respect single-use vs reusable policy."
334
+ ]}
335
+
336
+ @function_tool
337
+ def build_qc_template(device: str) -> dict:
338
+ """Markdown QC template students can copy into notes/CMMS (education-only)."""
339
+ d = (device or "Device").strip()
340
+ md = f"""### QC/Verification — {d}
341
+ - Date / Tech:
342
+ - Model / Serial / FW:
343
+ - Visual Inspection: (✔/✖) Notes:
344
+ - Electrical Safety: (✔/✖) Leakage / Ground:
345
+ - Functional Checks:
346
+ - Reference/Simulator values → Measured:
347
+ - Alarms exercised →
348
+ - Accessories (list & status):
349
+ - Final Assessment: Pass / Conditional / Fail
350
+ - Next Steps / Escalation:
351
+ """
352
+ return {"markdown": md}
353
+
354
  # =========================
355
+ # Agentic planner + simple fallback
356
  # =========================
357
  PLANNER_INSTRUCTIONS = (
358
+ "You are an **agentic biomedical device troubleshooting planner** for clinical engineers.\n"
359
+ "SCOPE (strict): education-only. No clinical advice. No invasive repairs. No alarm bypass. No firmware tampering. No PHI.\n"
 
360
  "Loop up to 3 times:\n"
361
+ "- PLAN what you need (e.g., quick_reference, safety_policy, acronym_expand, error_code_lookup, unit_convert, triage_priority,\n"
362
+ " simulator_baselines, env_checklist, accessory_checklist, build_qc_template).\n"
363
+ "- ACT by calling a tool.\n"
364
+ "- OBSERVE and add a short summary via record_step.\n\n"
365
+ "Finally output exactly these sections:\n"
366
  "1) Safety First\n"
367
  "2) Likely Causes (ranked)\n"
368
  "3) Step-by-Step Checks (non-invasive; no alarm bypass)\n"
369
  "4) QC/Calibration\n"
370
  "5) Escalate When\n"
371
+ "End with a one-line summary. If life-critical & patient-connected is implied, start with:\n"
372
  "REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n"
373
  )
374
+
375
  planner_agent = Agent(
376
  name="Agentic Planner",
377
  instructions=PLANNER_INSTRUCTIONS,
378
  model=llm_model,
379
+ tools=[
380
+ quick_reference, safety_policy, record_step, error_code_lookup, acronym_expand,
381
+ unit_convert, triage_priority, simulator_baselines, env_checklist,
382
+ accessory_checklist, build_qc_template
383
+ ],
384
  )
385
 
386
  SIMPLE_SYSTEM = (
 
388
  "Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary."
389
  )
390
 
391
+ async def simple_plan(user_desc: str) -> str:
392
+ resp = await llm_model.client.chat.completions.create(
393
+ model=llm_model.model,
394
+ messages=[{"role": "system", "content": SIMPLE_SYSTEM},
395
+ {"role": "user", "content": f"Device & symptom:\n{user_desc.strip()}"}],
 
 
396
  )
397
  return resp.choices[0].message.content or ""
398
 
 
401
  # =========================
402
  WELCOME = (
403
  "🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n"
404
+ "Education-only. No diagnosis, no invasive service, no alarm bypass. OEM & policy rule the day.\n\n"
405
  "Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n"
406
+ "Commands: **/agent** agentic (default) · **/simple** single-pass · **/policy** rules · **/tools** list tools · **/help**"
 
407
  )
408
  POLICY = (
409
  "🛡️ **Safety & Scope Policy**\n"
410
+ "- Scope: biomedical device troubleshooting (education-only).\n"
411
  "- No clinical advice (diagnosis/treatment/dosing/medications).\n"
412
+ "- No invasive repairs; no alarm bypass; no firmware tampering.\n"
413
+ "- No collecting/sharing personal identifiers (PHI).\n"
 
414
  "- OEM manuals & local policy take priority."
415
  )
416
  REFUSAL = (
417
+ "🚫 I can’t help with diagnosis/treatment, invasive repair, alarm bypass, firmware hacks, or handling personal identifiers.\n"
418
+ "I can guide safe, non-invasive troubleshooting and educational QC steps."
419
+ )
420
+
421
+ TOOLS_LIST = (
422
+ "🔧 **Available Tools**\n"
423
+ "- quick_reference(text)\n"
424
+ "- safety_policy()\n"
425
+ "- record_step(note)\n"
426
+ "- error_code_lookup(model, code)\n"
427
+ "- acronym_expand(term)\n"
428
+ "- unit_convert(value, from_unit, to_unit)\n"
429
+ "- triage_priority(text)\n"
430
+ "- simulator_baselines(device)\n"
431
+ "- env_checklist(context)\n"
432
+ "- accessory_checklist(device)\n"
433
+ "- build_qc_template(device)\n"
434
  )
435
 
436
  # =========================
437
  # Chainlit flow
438
  # =========================
439
  @cl.on_chat_start
440
+ async def on_start():
441
+ cl.user_session.set("mode", "agent") # 'agent' | 'simple'
 
442
  cl.user_session.set("agent_trace", [])
443
  await cl.Message(content=WELCOME).send()
444
 
445
  @cl.on_message
446
+ async def on_message(msg: cl.Message):
447
+ text = (msg.content or "").strip()
448
  low = text.lower()
449
 
450
  # Commands
 
452
  await cl.Message(content=WELCOME).send(); return
453
  if low.startswith("/policy"):
454
  await cl.Message(content=POLICY).send(); return
455
+ if low.startswith("/tools"):
456
+ await cl.Message(content=TOOLS_LIST).send(); return
457
  if low.startswith("/agent"):
458
+ cl.user_session.set("mode", "agent")
459
+ await cl.Message(content="✅ Agentic mode enabled.").send(); return
460
  if low.startswith("/simple"):
461
+ cl.user_session.set("mode", "simple")
462
+ await cl.Message(content="✅ Simple mode enabled.").send(); return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
463
 
464
+ # Guardrails
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
465
  if not on_topic(text):
466
  await cl.Message(
467
  content="I only support **biomedical device troubleshooting**. "
468
+ "Describe the device & symptom (e.g., “ECG noisy baseline”)."
469
+ ).send(); return
 
 
 
 
 
470
 
471
+ issues = guard_issues(text)
472
+ if issues:
473
+ await cl.Message(content=REFUSAL + "\n\n" + POLICY).send(); return
 
 
 
474
 
475
+ mode = cl.user_session.get("mode") or "agent"
476
  trace_before = len(cl.user_session.get("agent_trace") or [])
477
 
478
  if mode == "agent":
479
  try:
480
+ result = await Runner.run(planner_agent, text, turns=5)
 
481
  answer = result.final_output or "I couldn’t generate a plan."
482
  except Exception as e:
483
+ await cl.Message(content=f"⚠️ Planner error: {e}\nFalling back to simple mode.").send()
484
+ answer = await simple_plan(text)
485
  else:
486
+ answer = await simple_plan(text)
487
 
488
+ # Agent trace (new steps)
489
  trace = cl.user_session.get("agent_trace") or []
490
  new_trace = trace[trace_before:]
491
+ trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n" if new_trace else ""
 
 
492
 
493
+ # Deterministic quick reference (always helpful)
494
  qref = quick_reference(text)
 
 
495
  quick_md = (
496
  f"### 📘 Quick Reference\n"
497
+ f"**Safety**\n{_bullets(qref.get('safety'))}\n\n"
498
+ f"**Common Faults**\n{_bullets(qref.get('common_faults'))}\n\n"
499
+ f"**Quick Checks**\n{_bullets(qref.get('quick_checks'))}\n\n"
500
+ f"**QC / Calibration**\n{_bullets(qref.get('qc'))}\n\n"
501
+ f"**Escalate If**\n{_bullets(qref.get('escalate'))}\n\n"
502
+ f"> ⚠️ Education-only. Follow OEM & local policy.\n"
503
  )
504
 
505
+ await cl.Message(content=f"{trace_md}{quick_md}\n---\n{answer}").send()