ibadhasnain commited on
Commit
79e94d6
·
verified ·
1 Parent(s): 247cb5b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +65 -56
app.py CHANGED
@@ -1,5 +1,6 @@
1
- # app.py — Biomedical Device Troubleshooting Assistant (Education-only, Agentic)
2
- # Self-contained: no external 'agents' package needed.
 
3
 
4
  import os, io, re, json
5
  from typing import Any, Callable, Dict, List, Optional
@@ -11,7 +12,7 @@ from openai import AsyncOpenAI as _SDKAsyncOpenAI
11
  from pypdf import PdfReader
12
 
13
  # =========================
14
- # Minimal "agents" shim
15
  # =========================
16
  def set_tracing_disabled(disabled: bool = True):
17
  return disabled
@@ -20,9 +21,6 @@ def function_tool(func: Callable):
20
  func._is_tool = True
21
  return func
22
 
23
- class InputGuardrailTripwireTriggered(Exception):
24
- pass
25
-
26
  class AsyncOpenAI:
27
  def __init__(self, api_key: str, base_url: Optional[str] = None):
28
  kwargs = {"api_key": api_key}
@@ -74,7 +72,8 @@ class Runner:
74
  tools = agent.tool_specs()
75
  tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)}
76
 
77
- for _ in range(6): # allow a few PLANACTOBSERVE loops
 
78
  resp = await agent.model.client.chat.completions.create(
79
  model=agent.model.model,
80
  messages=msgs,
@@ -102,18 +101,17 @@ class Runner:
102
  })
103
  continue
104
 
105
- # Final
106
- result_obj = type("Result", (), {})()
107
- result_obj.final_output = msg.content or ""
108
- result_obj.context = context or {}
109
- result_obj.final_output_as = lambda *_: result_obj.final_output
110
- return result_obj
111
 
112
- result_obj = type("Result", (), {})()
113
- result_obj.final_output = "Sorry, I couldn't complete the request."
114
- result_obj.context = context or {}
115
- result_obj.final_output_as = lambda *_: result_obj.final_output
116
- return result_obj
117
 
118
  # =========================
119
  # Provider setup (Gemini or OpenAI)
@@ -131,14 +129,14 @@ elif OPENAI_API_KEY:
131
  BASE_URL = None
132
  MODEL_ID = "gpt-4o-mini"
133
  else:
134
- raise RuntimeError("Missing GEMINI_API_KEY or OPENAI_API_KEY in your environment.")
135
 
136
  set_tracing_disabled(True)
137
  ext_client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL)
138
  llm_model = OpenAIChatCompletionsModel(model=MODEL_ID, openai_client=ext_client)
139
 
140
  # =========================
141
- # Topic/safety guardrails
142
  # =========================
143
  ALLOWED_COMMANDS = ("/help", "/policy", "/manual", "/clear", "/hits", "/agent", "/simple")
144
 
@@ -178,7 +176,7 @@ def local_guard(text: str) -> List[str]:
178
  return issues
179
 
180
  # =========================
181
- # Manual handling + search
182
  # =========================
183
  def extract_pdf_pages(data: bytes):
184
  pages = []
@@ -194,7 +192,18 @@ def extract_pdf_pages(data: bytes):
194
  def extract_txt_pages(data: bytes, chunk_chars: int = 1600):
195
  try: txt = data.decode("utf-8", errors="ignore")
196
  except Exception: txt = ""
197
- return [{"page": i+1, "text": txt[i:i+chunk_chars]} for i in range(0, len(txt), chunk_chars)] or [{"page": 1, "text": ""}]
 
 
 
 
 
 
 
 
 
 
 
198
 
199
  def manual_hits(pages, query: str, topk: int = 3):
200
  if not pages: return []
@@ -219,18 +228,17 @@ def manual_hits(pages, query: str, topk: int = 3):
219
  return snippet
220
  return [f"[p.{h['page']}] {excerpt(h.get('text',''))}" for h in hits]
221
 
222
- # Keep manual in session; agent tools will read it live
223
- def _current_manual():
 
 
224
  m = cl.user_session.get("manual")
225
  return (m or {}).get("pages", [])
226
 
227
- # =========================
228
- # Tools (agent can call)
229
- # =========================
230
  @function_tool
231
  def search_manual(query: str, topk: str = "3") -> dict:
232
  """Search the uploaded manual (PDF/TXT) for the query. Returns {'hits': [snippets...]}. If no manual, hits=[]"""
233
- pages = _current_manual()
234
  k = int(topk) if str(topk).isdigit() else 3
235
  hits = manual_hits(pages, query, topk=k) if pages else []
236
  return {"hits": hits}
@@ -276,14 +284,12 @@ def quick_reference(device_and_symptom: str) -> dict:
276
  @function_tool
277
  def safety_policy() -> dict:
278
  """Return a concise policy block enforced by the assistant."""
279
- return {
280
- "policy": [
281
- "Education-only troubleshooting; no clinical advice.",
282
- "No invasive repairs, no alarm bypass, no firmware tampering.",
283
- "No collecting/sharing personal identifiers.",
284
- "OEM manuals and local policy take precedence."
285
- ]
286
- }
287
 
288
  @function_tool
289
  def record_step(note: str) -> dict:
@@ -296,14 +302,14 @@ def record_step(note: str) -> dict:
296
  return {"ok": True, "len": len(trace)}
297
 
298
  # =========================
299
- # Agentic Planner
300
  # =========================
301
  PLANNER_INSTRUCTIONS = (
302
  "You are an agentic biomedical device troubleshooting planner for clinical engineers.\n"
303
  "STRICT SCOPE: Education-only. Do NOT give clinical advice, do NOT instruct invasive repairs, do NOT bypass alarms, "
304
  "do NOT suggest firmware tampering. Respect policy.\n\n"
305
  "Loop up to 3 times:\n"
306
- "- PLAN: Decide what you need (e.g., search_manual for specific error, or quick_reference).\n"
307
  "- ACT: Call a tool (search_manual, quick_reference, safety_policy, record_step).\n"
308
  "- OBSERVE: Read tool result. Summarize via record_step.\n"
309
  "Then produce a final plan with exactly these sections:\n"
@@ -315,7 +321,6 @@ PLANNER_INSTRUCTIONS = (
315
  "End with a one-line summary. If life-critical & patient-connected is implied, start with: "
316
  "REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n"
317
  )
318
-
319
  planner_agent = Agent(
320
  name="Agentic Planner",
321
  instructions=PLANNER_INSTRUCTIONS,
@@ -323,9 +328,6 @@ planner_agent = Agent(
323
  tools=[search_manual, quick_reference, safety_policy, record_step],
324
  )
325
 
326
- # =========================
327
- # Simple (non-agentic) LLM (fallback)
328
- # =========================
329
  SIMPLE_SYSTEM = (
330
  "You are a biomedical device troubleshooting assistant (education-only). "
331
  "Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary."
@@ -348,7 +350,7 @@ WELCOME = (
348
  "🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n"
349
  "Education-only. No diagnosis, no invasive service, no alarm bypass. OEM manual & policy rule the day.\n\n"
350
  "Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n"
351
- "Commands: **/manual** upload PDF/TXT, **/hits** show top manual matches, **/clear** remove manual,\n"
352
  "**/agent** agentic mode (default), **/simple** single-pass mode, **/policy** view rules."
353
  )
354
  POLICY = (
@@ -371,7 +373,7 @@ REFUSAL = (
371
  @cl.on_chat_start
372
  async def start():
373
  cl.user_session.set("manual", None)
374
- cl.user_session.set("agent_mode", "agent") # 'agent' or 'simple'
375
  cl.user_session.set("agent_trace", [])
376
  await cl.Message(content=WELCOME).send()
377
 
@@ -395,21 +397,30 @@ async def main(message: cl.Message):
395
  if low.startswith("/manual"):
396
  files = await cl.AskFileMessage(
397
  content="Upload the **service manual** (PDF or TXT). Max ~20 MB.",
398
- accept=["application/pdf", "text/plain"],
399
  max_files=1, max_size_mb=20, timeout=240
400
  ).send()
401
  if not files:
402
  await cl.Message(content="No file received.").send(); return
 
403
  f = files[0]
404
  data = getattr(f, "content", None)
405
  if data is None and getattr(f, "path", None):
406
- with open(f.path, "rb") as fh: data = fh.read()
 
 
 
 
407
  try:
408
- pages = extract_pdf_pages(data) if (f.mime == "application/pdf" or f.name.lower().endswith(".pdf")) else extract_txt_pages(data)
 
 
 
409
  except Exception as e:
410
  await cl.Message(content=f"Couldn't read the manual: {e}").send(); return
411
- cl.user_session.set("manual", {"name": f.name, "pages": pages})
412
- await cl.Message(content=f" Manual indexed: **{f.name}** — {len(pages)} page-chunks.").send()
 
413
  return
414
 
415
  if low.startswith("/clear"):
@@ -426,7 +437,7 @@ async def main(message: cl.Message):
426
  await cl.Message(content="### 🔎 Manual Matches\n" + "\n\n".join(hits)).send()
427
  return
428
 
429
- # Topic & local safety guard
430
  if not on_topic(text):
431
  await cl.Message(
432
  content="I only support **biomedical device troubleshooting**. "
@@ -438,7 +449,7 @@ async def main(message: cl.Message):
438
  await cl.Message(content=REFUSAL + "\n\n" + POLICY).send()
439
  return
440
 
441
- # Optional manual context for the simple path
442
  manual = cl.user_session.get("manual")
443
  manual_name = manual.get("name") if manual else None
444
  excerpts = ""
@@ -449,26 +460,24 @@ async def main(message: cl.Message):
449
  trace_before = len(cl.user_session.get("agent_trace") or [])
450
 
451
  if mode == "agent":
452
- # Agentic Planner run
453
  try:
454
- # give planner the user text; it will call tools search_manual/quick_reference/record_step itself
455
  result = await Runner.run(planner_agent, text)
456
  answer = result.final_output or "I couldn’t generate a plan."
457
  except Exception as e:
458
  await cl.Message(content=f"⚠️ Planner failed: {e}\nFalling back to simple mode.").send()
459
  answer = await simple_llm_plan(text, excerpts)
460
  else:
461
- # Simple single-pass
462
  answer = await simple_llm_plan(text, excerpts)
463
 
464
- # Show agent trace if any new steps were recorded
465
  trace = cl.user_session.get("agent_trace") or []
466
  new_trace = trace[trace_before:]
467
  trace_md = ""
468
  if new_trace:
469
  trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n"
470
 
471
- # Deterministic quick reference (always helpful)
472
  qref = quick_reference(text)
473
  def bullets(arr: List[str]): return "\n".join(f"- {x}" for x in (arr or [])) or "-"
474
 
 
1
+ # app.py — Biomedical Device Troubleshooting Assistant
2
+ # Education-only · Manual (PDF/TXT) upload & search · Guardrails · Agentic planner (optional)
3
+ # Deps: chainlit==1.0.200, python-dotenv==1.0.1, openai==1.35.13, pypdf>=4.2.0
4
 
5
  import os, io, re, json
6
  from typing import Any, Callable, Dict, List, Optional
 
12
  from pypdf import PdfReader
13
 
14
  # =========================
15
+ # Minimal "agents" shim (no external package needed)
16
  # =========================
17
  def set_tracing_disabled(disabled: bool = True):
18
  return disabled
 
21
  func._is_tool = True
22
  return func
23
 
 
 
 
24
  class AsyncOpenAI:
25
  def __init__(self, api_key: str, base_url: Optional[str] = None):
26
  kwargs = {"api_key": api_key}
 
72
  tools = agent.tool_specs()
73
  tool_map = {t.__name__: t for t in (agent.tools or []) if getattr(t, "_is_tool", False)}
74
 
75
+ # small PlanActObserve budget
76
+ for _ in range(6):
77
  resp = await agent.model.client.chat.completions.create(
78
  model=agent.model.model,
79
  messages=msgs,
 
101
  })
102
  continue
103
 
104
+ out = type("Result", (), {})()
105
+ out.final_output = msg.content or ""
106
+ out.context = context or {}
107
+ out.final_output_as = lambda *_: out.final_output
108
+ return out
 
109
 
110
+ out = type("Result", (), {})()
111
+ out.final_output = "Sorry, I couldn't complete the request."
112
+ out.context = context or {}
113
+ out.final_output_as = lambda *_: out.final_output
114
+ return out
115
 
116
  # =========================
117
  # Provider setup (Gemini or OpenAI)
 
129
  BASE_URL = None
130
  MODEL_ID = "gpt-4o-mini"
131
  else:
132
+ raise RuntimeError("Missing GEMINI_API_KEY or OPENAI_API_KEY in environment.")
133
 
134
  set_tracing_disabled(True)
135
  ext_client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL)
136
  llm_model = OpenAIChatCompletionsModel(model=MODEL_ID, openai_client=ext_client)
137
 
138
  # =========================
139
+ # Guardrails (topic & safety)
140
  # =========================
141
  ALLOWED_COMMANDS = ("/help", "/policy", "/manual", "/clear", "/hits", "/agent", "/simple")
142
 
 
176
  return issues
177
 
178
  # =========================
179
+ # Manual handling & search (robust PDF detection)
180
  # =========================
181
  def extract_pdf_pages(data: bytes):
182
  pages = []
 
192
  def extract_txt_pages(data: bytes, chunk_chars: int = 1600):
193
  try: txt = data.decode("utf-8", errors="ignore")
194
  except Exception: txt = ""
195
+ chunks = [{"page": i+1, "text": txt[i:i+chunk_chars]} for i in range(0, len(txt), chunk_chars)]
196
+ return chunks or [{"page": 1, "text": ""}]
197
+
198
+ def _is_pdf(file_obj, data: bytes) -> bool:
199
+ # Chainlit may expose .type or nothing; avoid relying on .mime
200
+ name = getattr(file_obj, "name", "") or getattr(file_obj, "filename", "")
201
+ typ = getattr(file_obj, "type", "") or ""
202
+ return (
203
+ name.lower().endswith(".pdf")
204
+ or typ == "application/pdf"
205
+ or (isinstance(data, (bytes, bytearray)) and data[:5] == b"%PDF-")
206
+ )
207
 
208
  def manual_hits(pages, query: str, topk: int = 3):
209
  if not pages: return []
 
228
  return snippet
229
  return [f"[p.{h['page']}] {excerpt(h.get('text',''))}" for h in hits]
230
 
231
+ # =========================
232
+ # Tools the agent can call
233
+ # =========================
234
+ def _current_manual_pages():
235
  m = cl.user_session.get("manual")
236
  return (m or {}).get("pages", [])
237
 
 
 
 
238
  @function_tool
239
  def search_manual(query: str, topk: str = "3") -> dict:
240
  """Search the uploaded manual (PDF/TXT) for the query. Returns {'hits': [snippets...]}. If no manual, hits=[]"""
241
+ pages = _current_manual_pages()
242
  k = int(topk) if str(topk).isdigit() else 3
243
  hits = manual_hits(pages, query, topk=k) if pages else []
244
  return {"hits": hits}
 
284
  @function_tool
285
  def safety_policy() -> dict:
286
  """Return a concise policy block enforced by the assistant."""
287
+ return {"policy": [
288
+ "Education-only troubleshooting; no clinical advice.",
289
+ "No invasive repairs, no alarm bypass, no firmware tampering.",
290
+ "No collecting/sharing personal identifiers.",
291
+ "OEM manuals and local policy take precedence."
292
+ ]}
 
 
293
 
294
  @function_tool
295
  def record_step(note: str) -> dict:
 
302
  return {"ok": True, "len": len(trace)}
303
 
304
  # =========================
305
+ # Agentic planner & simple fallback
306
  # =========================
307
  PLANNER_INSTRUCTIONS = (
308
  "You are an agentic biomedical device troubleshooting planner for clinical engineers.\n"
309
  "STRICT SCOPE: Education-only. Do NOT give clinical advice, do NOT instruct invasive repairs, do NOT bypass alarms, "
310
  "do NOT suggest firmware tampering. Respect policy.\n\n"
311
  "Loop up to 3 times:\n"
312
+ "- PLAN: Decide what you need (e.g., search_manual for error keywords, or quick_reference).\n"
313
  "- ACT: Call a tool (search_manual, quick_reference, safety_policy, record_step).\n"
314
  "- OBSERVE: Read tool result. Summarize via record_step.\n"
315
  "Then produce a final plan with exactly these sections:\n"
 
321
  "End with a one-line summary. If life-critical & patient-connected is implied, start with: "
322
  "REMOVE FROM SERVICE & USE BACKUP — then proceed off-patient.\n"
323
  )
 
324
  planner_agent = Agent(
325
  name="Agentic Planner",
326
  instructions=PLANNER_INSTRUCTIONS,
 
328
  tools=[search_manual, quick_reference, safety_policy, record_step],
329
  )
330
 
 
 
 
331
  SIMPLE_SYSTEM = (
332
  "You are a biomedical device troubleshooting assistant (education-only). "
333
  "Output sections: Safety First; Likely Causes; Step-by-Step Checks; QC/Calibration; Escalate When; one-line summary."
 
350
  "🛠️ **Biomedical Device Troubleshooting Assistant** (Agentic)\n"
351
  "Education-only. No diagnosis, no invasive service, no alarm bypass. OEM manual & policy rule the day.\n\n"
352
  "Type your **device & symptom** (e.g., “Infusion pump occlusion alarm”).\n"
353
+ "Commands: **/manual** upload PDF/TXT, **/hits** show manual matches, **/clear** remove manual,\n"
354
  "**/agent** agentic mode (default), **/simple** single-pass mode, **/policy** view rules."
355
  )
356
  POLICY = (
 
373
  @cl.on_chat_start
374
  async def start():
375
  cl.user_session.set("manual", None)
376
+ cl.user_session.set("agent_mode", "agent") # 'agent' | 'simple'
377
  cl.user_session.set("agent_trace", [])
378
  await cl.Message(content=WELCOME).send()
379
 
 
397
  if low.startswith("/manual"):
398
  files = await cl.AskFileMessage(
399
  content="Upload the **service manual** (PDF or TXT). Max ~20 MB.",
400
+ accept=["application/pdf", "text/plain"], # UI hint only
401
  max_files=1, max_size_mb=20, timeout=240
402
  ).send()
403
  if not files:
404
  await cl.Message(content="No file received.").send(); return
405
+
406
  f = files[0]
407
  data = getattr(f, "content", None)
408
  if data is None and getattr(f, "path", None):
409
+ with open(f.path, "rb") as fh:
410
+ data = fh.read()
411
+ if not data:
412
+ await cl.Message(content="Received file but got empty content. Please re-upload.").send(); return
413
+
414
  try:
415
+ if _is_pdf(f, data):
416
+ pages = extract_pdf_pages(data)
417
+ else:
418
+ pages = extract_txt_pages(data)
419
  except Exception as e:
420
  await cl.Message(content=f"Couldn't read the manual: {e}").send(); return
421
+
422
+ cl.user_session.set("manual", {"name": getattr(f, "name", "manual"), "pages": pages})
423
+ await cl.Message(content=f"✅ Manual indexed: **{getattr(f, 'name', 'manual')}** — {len(pages)} page-chunks.").send()
424
  return
425
 
426
  if low.startswith("/clear"):
 
437
  await cl.Message(content="### 🔎 Manual Matches\n" + "\n\n".join(hits)).send()
438
  return
439
 
440
+ # Topic & safety guard
441
  if not on_topic(text):
442
  await cl.Message(
443
  content="I only support **biomedical device troubleshooting**. "
 
449
  await cl.Message(content=REFUSAL + "\n\n" + POLICY).send()
450
  return
451
 
452
+ # Manual context (for simple fallback)
453
  manual = cl.user_session.get("manual")
454
  manual_name = manual.get("name") if manual else None
455
  excerpts = ""
 
460
  trace_before = len(cl.user_session.get("agent_trace") or [])
461
 
462
  if mode == "agent":
 
463
  try:
464
+ # Agentic planner will call tools (search_manual/quick_reference/record_step) itself
465
  result = await Runner.run(planner_agent, text)
466
  answer = result.final_output or "I couldn’t generate a plan."
467
  except Exception as e:
468
  await cl.Message(content=f"⚠️ Planner failed: {e}\nFalling back to simple mode.").send()
469
  answer = await simple_llm_plan(text, excerpts)
470
  else:
 
471
  answer = await simple_llm_plan(text, excerpts)
472
 
473
+ # Agent trace (new steps only)
474
  trace = cl.user_session.get("agent_trace") or []
475
  new_trace = trace[trace_before:]
476
  trace_md = ""
477
  if new_trace:
478
  trace_md = "### 🧭 Agent Trace (summary)\n" + "\n".join(f"- {t}" for t in new_trace) + "\n\n"
479
 
480
+ # Deterministic quick reference
481
  qref = quick_reference(text)
482
  def bullets(arr: List[str]): return "\n".join(f"- {x}" for x in (arr or [])) or "-"
483