fciannella commited on
Commit
9312c3a
Β·
1 Parent(s): 2f49513

First functional multi thread

Browse files
README.md CHANGED
@@ -145,7 +145,7 @@ The `voice_agent_multi_thread` example includes a non-blocking multi-threaded ag
145
 
146
  ### Build the Docker image:
147
  ```bash
148
- docker build -t voice-agent-multi-thread .
149
  ```
150
 
151
  ### Run the container:
 
145
 
146
  ### Build the Docker image:
147
  ```bash
148
+ docker build --build-arg EXAMPLE_NAME=voice_agent_multi_thread -t voice-agent-multi-thread .
149
  ```
150
 
151
  ### Run the container:
examples/voice_agent_multi_thread/agents/helper_functions.py CHANGED
@@ -22,21 +22,29 @@ def write_status(
22
  namespace: Namespace tuple for store isolation
23
  config: Optional runtime config
24
  """
 
 
 
25
  if not isinstance(namespace, tuple):
26
  try:
27
  namespace = tuple(namespace)
28
  except (TypeError, ValueError):
29
  namespace = (str(namespace),)
30
 
31
- store.put(
32
- namespace,
33
- "working-tool-status-update",
34
- {
35
- "tool_name": tool_name,
36
- "progress": progress,
37
- "status": status,
38
- }
39
- )
 
 
 
 
 
40
 
41
 
42
  def reset_status(store: BaseStore, namespace: tuple | list) -> None:
 
22
  namespace: Namespace tuple for store isolation
23
  config: Optional runtime config
24
  """
25
+ import logging
26
+ logger = logging.getLogger(__name__)
27
+
28
  if not isinstance(namespace, tuple):
29
  try:
30
  namespace = tuple(namespace)
31
  except (TypeError, ValueError):
32
  namespace = (str(namespace),)
33
 
34
+ try:
35
+ logger.info(f"πŸ“ write_status: Attempting to write to store: namespace={namespace}, key='working-tool-status-update'")
36
+ store.put(
37
+ namespace,
38
+ "working-tool-status-update",
39
+ {
40
+ "tool_name": tool_name,
41
+ "progress": progress,
42
+ "status": status,
43
+ }
44
+ )
45
+ logger.info(f"πŸ“ write_status: Successfully called store.put() for {tool_name} at {progress}%")
46
+ except Exception as e:
47
+ logger.error(f"❌ write_status FAILED: {e}", exc_info=True)
48
 
49
 
50
  def reset_status(store: BaseStore, namespace: tuple | list) -> None:
examples/voice_agent_multi_thread/agents/telco-agent-multi/react_agent.py CHANGED
@@ -83,11 +83,16 @@ SYSTEM_PROMPT = (
83
  )
84
 
85
  SECONDARY_SYSTEM_PROMPT = (
86
- "You are a friendly mobile operator assistant engaging in light conversation while a long-running task is being processed. "
87
- "You can: (1) Check status of the ongoing task using check_status tool; (2) Answer general questions about packages, data balance, or roaming; (3) Provide light chit-chat. "
88
- "DO NOT attempt to perform any long operations like changing packages, closing contracts, or purchasing passes - explain that another operation is in progress. "
89
- "STYLE: Brief (1-2 sentences), friendly, and reassuring. "
90
- "TTS SAFETY: Output must be plain text suitable for text-to-speech. Do not use markdown, bullets, asterisks, emojis, or special typography. Use only ASCII punctuation and straight quotes."
 
 
 
 
 
91
  )
92
 
93
 
 
83
  )
84
 
85
  SECONDARY_SYSTEM_PROMPT = (
86
+ "You are a lively, personable mobile operator assistant keeping customers entertained while their request processes in the background. "
87
+ "Your goal is to make the wait enjoyable - be playful, share interesting facts, tell brief jokes, or engage in light conversation. "
88
+ "If they ask about status, check using check_status tool and present it in an upbeat way. "
89
+ "If they're bored or impatient, sympathize warmly and distract them with something fun - ask about their day, share a quick tech tip, or make them smile. "
90
+ "For small talk (weather, location, hobbies, sports, random questions), engage enthusiastically and naturally - show genuine interest! "
91
+ "You can answer quick questions about their package, data balance, or roaming options. "
92
+ "DO NOT start new long operations (changing packages, closing contracts, purchasing passes) - playfully explain you're juggling their current request and can help with that next. "
93
+ "PERSONALITY: Friendly, upbeat, conversational, and entertaining - like a fun colleague who makes waiting time fly by! "
94
+ "STYLE: Natural (2-3 sentences), warm, and engaging. Mix status updates with personality - don't just recite percentages robotically. "
95
+ "TTS SAFETY: Plain text only - no markdown, bullets, asterisks, emojis, or special formatting. Use ASCII punctuation and straight quotes."
96
  )
97
 
98
 
examples/voice_agent_multi_thread/agents/telco-agent-multi/tools.py CHANGED
@@ -97,8 +97,13 @@ def close_contract_tool(msisdn: str, confirm: bool = False) -> str:
97
  return json.dumps(telco_logic.close_contract(msisdn, False))
98
 
99
  # Long-running operation with progress reporting (following working example pattern)
 
 
 
 
100
  writer = get_stream_writer()
101
  writer("Processing your contract closure request. This may take a moment...")
 
102
 
103
  tool_name = "close_contract_tool"
104
  steps = 10
@@ -107,12 +112,16 @@ def close_contract_tool(msisdn: str, confirm: bool = False) -> str:
107
  config = ensure_config()
108
  namespace = config["configurable"]["namespace_for_memory"]
109
  server_store = get_store()
 
110
 
111
  for i in range(1, steps + 1):
 
112
  time.sleep(interval_seconds)
 
113
  pct = (i * 100) // steps
114
  status = "running"
115
  write_status(tool_name, pct, status, server_store, namespace, config)
 
116
 
117
  # Execute actual closure
118
  result = telco_logic.close_contract(msisdn, True)
 
97
  return json.dumps(telco_logic.close_contract(msisdn, False))
98
 
99
  # Long-running operation with progress reporting (following working example pattern)
100
+ import logging
101
+ logger = logging.getLogger(__name__)
102
+ logger.info(f"πŸ”₯ close_contract_tool STARTING 50-second operation for {msisdn}")
103
+
104
  writer = get_stream_writer()
105
  writer("Processing your contract closure request. This may take a moment...")
106
+ logger.info("βœ… Stream writer message sent")
107
 
108
  tool_name = "close_contract_tool"
109
  steps = 10
 
112
  config = ensure_config()
113
  namespace = config["configurable"]["namespace_for_memory"]
114
  server_store = get_store()
115
+ logger.info(f"πŸ“¦ Got store and namespace: {namespace}")
116
 
117
  for i in range(1, steps + 1):
118
+ logger.info(f"⏱️ Step {i}/{steps} - sleeping {interval_seconds}s...")
119
  time.sleep(interval_seconds)
120
+ logger.info(f"⏱️ Step {i}/{steps} - sleep complete, writing status...")
121
  pct = (i * 100) // steps
122
  status = "running"
123
  write_status(tool_name, pct, status, server_store, namespace, config)
124
+ logger.info(f"βœ… Status written: {pct}% - {status}")
125
 
126
  # Execute actual closure
127
  result = telco_logic.close_contract(msisdn, True)
examples/voice_agent_multi_thread/langgraph_llm_service.py CHANGED
@@ -74,7 +74,7 @@ class LangGraphLLMService(OpenAILLMService):
74
  base_url: str = "http://127.0.0.1:2024",
75
  assistant: str = "ace-base-agent",
76
  user_email: str = "test@example.com",
77
- stream_mode: str = "values",
78
  debug_stream: bool = False,
79
  thread_id: Optional[str] = None,
80
  auth_token: Optional[str] = None,
@@ -86,9 +86,11 @@ class LangGraphLLMService(OpenAILLMService):
86
  self.base_url = base_url
87
  self.assistant = assistant
88
  self.user_email = user_email
89
- self.stream_mode = stream_mode
 
90
  self.debug_stream = debug_stream
91
  self.enable_multi_threading = enable_multi_threading
 
92
 
93
  # Optional auth header
94
  token = (
@@ -117,6 +119,12 @@ class LangGraphLLMService(OpenAILLMService):
117
  self._current_task: Optional[asyncio.Task] = None
118
  self._outer_open: bool = False
119
  self._emitted_texts: set[str] = set()
 
 
 
 
 
 
120
 
121
  async def _ensure_thread(self, thread_type: str = "main") -> Optional[str]:
122
  """Ensure thread exists for the given type (main or secondary)."""
@@ -151,6 +159,47 @@ class LangGraphLLMService(OpenAILLMService):
151
  logger.warning(f"LangGraph: could not determine {thread_type} thread id; proceeding threadless.")
152
  return None
153
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
  async def _check_long_operation_running(self) -> bool:
155
  """Check if a long operation is currently running via the store."""
156
  if not self.enable_multi_threading:
@@ -161,25 +210,43 @@ class LangGraphLLMService(OpenAILLMService):
161
  ns_list = list(self._namespace_for_memory)
162
  logger.info(f"Checking store with namespace: {ns_list}")
163
 
164
- # Get the specific status key that tools write to
165
- item = await self._client.store.get_item(ns_list, "working-tool-status-update")
 
166
 
167
- if item is None:
168
- logger.info("No item found in store, returning False")
 
 
 
 
 
 
 
 
 
 
 
169
  return False
170
 
171
- # Extract value from the item
172
- value = getattr(item, "value", None)
173
- if value is None and isinstance(item, dict):
174
- value = item.get("value")
175
 
176
- # Check if status is "running"
177
- if isinstance(value, dict):
178
- status = value.get("status")
179
- logger.info(f"πŸ” Long operation check: status={status}, tool={value.get('tool_name')}, progress={value.get('progress')}")
180
- return status == "running"
 
 
 
 
 
 
 
 
 
181
 
182
- logger.info(f"Value not a dict: {type(value)}")
183
  return False
184
  except Exception as exc: # noqa: BLE001
185
  logger.error(f"❌ Failed to check operation status: {exc}", exc_info=True)
@@ -211,48 +278,10 @@ class LangGraphLLMService(OpenAILLMService):
211
  continue
212
  return ""
213
 
214
- async def _stream_langgraph(self, text: str) -> None:
215
- # Determine thread type based on whether a long operation is running
216
- thread_type = "main"
217
- if self.enable_multi_threading:
218
- long_operation_running = await self._check_long_operation_running()
219
- if long_operation_running:
220
- thread_type = "secondary"
221
- self._interim_messages_reset = False
222
- logger.info("Long operation detected, routing to secondary thread")
223
- else:
224
- # Starting new main operation
225
- if self._last_was_long_operation:
226
- self._interim_messages_reset = True
227
- self._last_was_long_operation = False
228
- else:
229
- self._interim_messages_reset = True
230
- logger.info("No long operation, routing to main thread")
231
-
232
- # Ensure appropriate thread
233
- thread_id = await self._ensure_thread(thread_type)
234
-
235
- # Build config with namespace for store coordination
236
- config = {
237
- "configurable": {
238
- "user_email": self.user_email,
239
- "thread_id": thread_id,
240
- "namespace_for_memory": list(self._namespace_for_memory),
241
- }
242
- }
243
-
244
- # Build input dict for multi-threaded agent
245
- if self.enable_multi_threading:
246
- input_payload = {
247
- "messages": [{"type": "human", "content": text}],
248
- "thread_type": thread_type,
249
- "interim_messages_reset": self._interim_messages_reset,
250
- }
251
- else:
252
- # Backward compatible: simple message input
253
- input_payload = [HumanMessage(content=text)]
254
-
255
  try:
 
256
  async for chunk in self._client.runs.stream(
257
  thread_id,
258
  self.assistant,
@@ -305,44 +334,87 @@ class LangGraphLLMService(OpenAILLMService):
305
  if part_text not in self._emitted_texts:
306
  self._emitted_texts.add(part_text)
307
  await self.push_frame(LLMTextFrame(_tts_sanitize(part_text)))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
 
309
  # Final value-style events (values mode)
310
  if event == "values":
311
  # Some dev servers send final AI message content here
312
  final_text = ""
 
313
 
314
  # Handle list of messages (most common case)
315
  if isinstance(data, list) and data:
 
316
  # Find the last AI message in the list
317
  for msg in reversed(data):
318
  if isinstance(msg, dict):
319
  if msg.get("type") == "ai" and isinstance(msg.get("content"), str):
320
  final_text = msg["content"]
 
321
  break
322
  elif hasattr(msg, "type") and getattr(msg, "type") == "ai":
323
  content = getattr(msg, "content", None)
324
  if isinstance(content, str):
325
  final_text = content
 
326
  break
327
  # Handle single message object
328
  elif hasattr(data, "content") and isinstance(getattr(data, "content"), str):
329
  final_text = getattr(data, "content")
 
330
  # Handle single message dict
331
  elif isinstance(data, dict):
332
  c = data.get("content")
333
  if isinstance(c, str):
334
  final_text = c
 
335
 
336
- if final_text:
337
- # Close backchannel utterance if open
338
- if self._outer_open:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
339
  await self.push_frame(LLMFullResponseEndFrame())
340
- self._outer_open = False
341
- self._emitted_texts.clear()
342
- # Emit final explanation as its own message
343
- await self.push_frame(LLMFullResponseStartFrame())
344
- await self.push_frame(LLMTextFrame(_tts_sanitize(final_text)))
345
- await self.push_frame(LLMFullResponseEndFrame())
346
 
347
  # Messages mode: look for an array of messages
348
  if event == "messages" or event.endswith(":messages"):
@@ -380,6 +452,88 @@ class LangGraphLLMService(OpenAILLMService):
380
  await self.push_frame(LLMTextFrame(_tts_sanitize(txt)))
381
  except Exception as exc: # noqa: BLE001
382
  logger.error(f"LangGraph stream error: {exc}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
383
 
384
  async def _process_context_and_frames(self, context: OpenAILLMContext) -> None:
385
  """Adapter entrypoint: push start/end frames and stream tokens."""
@@ -416,6 +570,31 @@ class LangGraphLLMService(OpenAILLMService):
416
  if self._current_task is not None and not self._current_task.done():
417
  await self.cancel_task(self._current_task)
418
  self._current_task = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
419
  return
420
  else:
421
  await super().process_frame(frame, direction)
 
74
  base_url: str = "http://127.0.0.1:2024",
75
  assistant: str = "ace-base-agent",
76
  user_email: str = "test@example.com",
77
+ stream_mode: Optional[list] = None,
78
  debug_stream: bool = False,
79
  thread_id: Optional[str] = None,
80
  auth_token: Optional[str] = None,
 
86
  self.base_url = base_url
87
  self.assistant = assistant
88
  self.user_email = user_email
89
+ # Match working text client: use ["values", "custom"] for multi-threading
90
+ self.stream_mode = stream_mode if stream_mode is not None else (["values", "custom"] if enable_multi_threading else "values")
91
  self.debug_stream = debug_stream
92
  self.enable_multi_threading = enable_multi_threading
93
+ logger.info(f"πŸŽ›οΈ LangGraphLLMService initialized: enable_multi_threading={enable_multi_threading}, stream_mode={self.stream_mode}, type={type(self.stream_mode)}")
94
 
95
  # Optional auth header
96
  token = (
 
119
  self._current_task: Optional[asyncio.Task] = None
120
  self._outer_open: bool = False
121
  self._emitted_texts: set[str] = set()
122
+
123
+ # Background task for main thread long operations
124
+ self._background_main_task: Optional[asyncio.Task] = None
125
+ self._background_final_message: Optional[str] = None
126
+ self._background_monitor_task: Optional[asyncio.Task] = None
127
+ self._background_task_is_long_operation: bool = False # Track if current background task is a long operation
128
 
129
  async def _ensure_thread(self, thread_type: str = "main") -> Optional[str]:
130
  """Ensure thread exists for the given type (main or secondary)."""
 
159
  logger.warning(f"LangGraph: could not determine {thread_type} thread id; proceeding threadless.")
160
  return None
161
 
162
+ async def _monitor_background_task(self) -> None:
163
+ """Monitor background main task and proactively inject final message when complete."""
164
+ if not self._background_main_task:
165
+ return
166
+
167
+ try:
168
+ # Wait for the background task to complete
169
+ await self._background_main_task
170
+ logger.info("🏁 Background main task completed, checking for final message")
171
+
172
+ # Give a VERY brief moment for the final message to be captured (minimize race window)
173
+ await asyncio.sleep(0.1)
174
+
175
+ # If we captured a final message, inject it as a new bot-initiated turn
176
+ if self._background_final_message:
177
+ logger.info("πŸ“’ Injecting final synthesized message from background task")
178
+ logger.info(f"Message to inject: {self._background_final_message}")
179
+
180
+ # Simply push the frames directly - they should flow through TTS
181
+ await self.push_frame(LLMFullResponseStartFrame())
182
+ logger.info("βœ… Pushed LLMFullResponseStartFrame")
183
+
184
+ await self.push_frame(LLMTextFrame(_tts_sanitize(self._background_final_message)))
185
+ logger.info(f"βœ… Pushed LLMTextFrame with content")
186
+
187
+ await self.push_frame(LLMFullResponseEndFrame())
188
+ logger.info("βœ… Pushed LLMFullResponseEndFrame")
189
+
190
+ # Clear the captured message
191
+ self._background_final_message = None
192
+ logger.info("✨ Final message injection complete")
193
+ else:
194
+ logger.info("ℹ️ Background task completed but no final message to inject")
195
+ except asyncio.CancelledError:
196
+ logger.info("🚫 Background task monitor cancelled")
197
+ except Exception as exc:
198
+ logger.error(f"❌ Background task monitor error: {exc}", exc_info=True)
199
+ finally:
200
+ self._background_main_task = None
201
+ self._background_monitor_task = None
202
+
203
  async def _check_long_operation_running(self) -> bool:
204
  """Check if a long operation is currently running via the store."""
205
  if not self.enable_multi_threading:
 
210
  ns_list = list(self._namespace_for_memory)
211
  logger.info(f"Checking store with namespace: {ns_list}")
212
 
213
+ # Use search_items() like the working client code does
214
+ items = await self._client.store.search_items(ns_list)
215
+ logger.info(f"πŸ”Ž search_items returned: type={type(items)}")
216
 
217
+ # Normalize return shape: SDK may return a dict with 'items' or a bare list (matching text client)
218
+ items_list = None
219
+ if isinstance(items, dict):
220
+ inner = items.get("items")
221
+ if isinstance(inner, list):
222
+ items_list = inner
223
+ logger.info(f"πŸ“¦ Extracted {len(inner)} items from dict wrapper")
224
+ elif isinstance(items, list):
225
+ items_list = items
226
+ logger.info(f"πŸ“¦ Got {len(items)} items as bare list")
227
+
228
+ if not items_list:
229
+ logger.info("No items found in store, returning False")
230
  return False
231
 
232
+ logger.info(f"πŸ“¦ Total items in store: {len(items_list)}")
 
 
 
233
 
234
+ # Walk from the end to find the most recent item that has a 'status' (EXACTLY like text client)
235
+ for idx, item in enumerate(reversed(items_list)):
236
+ item_key = getattr(item, "key", None) or (item.get("key") if isinstance(item, dict) else None)
237
+ value = getattr(item, "value", None)
238
+ if value is None and isinstance(item, dict):
239
+ value = item.get("value")
240
+
241
+ value_keys = list(value.keys()) if isinstance(value, dict) else "N/A"
242
+ logger.info(f"πŸ“¦ Item {idx} (from end): key={item_key}, value_keys={value_keys}")
243
+
244
+ if isinstance(value, dict) and "status" in value:
245
+ status = value.get("status")
246
+ logger.info(f"πŸ” Long operation check: status={status}, tool={value.get('tool_name')}, progress={value.get('progress')}")
247
+ return status == "running"
248
 
249
+ logger.info("No status items found in store")
250
  return False
251
  except Exception as exc: # noqa: BLE001
252
  logger.error(f"❌ Failed to check operation status: {exc}", exc_info=True)
 
278
  continue
279
  return ""
280
 
281
+ async def _stream_langgraph_impl(self, text: str, thread_type: str, thread_id: Optional[str], config: dict, input_payload: Any, is_background: bool = False) -> None:
282
+ """Internal implementation of LangGraph streaming."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
  try:
284
+ logger.info(f"🎬 Starting stream with mode: {self.stream_mode} (type: {type(self.stream_mode)})")
285
  async for chunk in self._client.runs.stream(
286
  thread_id,
287
  self.assistant,
 
334
  if part_text not in self._emitted_texts:
335
  self._emitted_texts.add(part_text)
336
  await self.push_frame(LLMTextFrame(_tts_sanitize(part_text)))
337
+
338
+ # Custom events from get_stream_writer() - tool progress messages
339
+ if event == "custom":
340
+ custom_text = ""
341
+ if isinstance(data, str):
342
+ custom_text = data
343
+ elif isinstance(data, dict):
344
+ # Try to extract text from custom event data
345
+ custom_text = data.get("content") or data.get("text") or ""
346
+ elif hasattr(data, "content"):
347
+ custom_text = getattr(data, "content", "")
348
+
349
+ if custom_text and isinstance(custom_text, str) and custom_text not in self._emitted_texts:
350
+ logger.info(f"πŸ“’ Custom event (tool message): {custom_text[:100]}")
351
+ self._emitted_texts.add(custom_text)
352
+ # Emit as its own turn
353
+ if self._outer_open:
354
+ await self.push_frame(LLMFullResponseEndFrame())
355
+ self._outer_open = False
356
+ await self.push_frame(LLMFullResponseStartFrame())
357
+ await self.push_frame(LLMTextFrame(_tts_sanitize(custom_text)))
358
+ await self.push_frame(LLMFullResponseEndFrame())
359
 
360
  # Final value-style events (values mode)
361
  if event == "values":
362
  # Some dev servers send final AI message content here
363
  final_text = ""
364
+ logger.info(f"πŸ“Š Processing values event: data_type={type(data)}, is_background={is_background}")
365
 
366
  # Handle list of messages (most common case)
367
  if isinstance(data, list) and data:
368
+ logger.info(f"πŸ“Š Data is list with {len(data)} items")
369
  # Find the last AI message in the list
370
  for msg in reversed(data):
371
  if isinstance(msg, dict):
372
  if msg.get("type") == "ai" and isinstance(msg.get("content"), str):
373
  final_text = msg["content"]
374
+ logger.info(f"βœ… Found AI message in dict: {final_text[:100]}")
375
  break
376
  elif hasattr(msg, "type") and getattr(msg, "type") == "ai":
377
  content = getattr(msg, "content", None)
378
  if isinstance(content, str):
379
  final_text = content
380
+ logger.info(f"βœ… Found AI message in object: {final_text[:100]}")
381
  break
382
  # Handle single message object
383
  elif hasattr(data, "content") and isinstance(getattr(data, "content"), str):
384
  final_text = getattr(data, "content")
385
+ logger.info(f"βœ… Found content in object: {final_text[:100]}")
386
  # Handle single message dict
387
  elif isinstance(data, dict):
388
  c = data.get("content")
389
  if isinstance(c, str):
390
  final_text = c
391
+ logger.info(f"βœ… Found content in dict: {final_text[:100]}")
392
 
393
+ if final_text and final_text not in self._emitted_texts:
394
+ if is_background:
395
+ # Running in background - capture for later injection
396
+ # Only capture if there's no pending message waiting to be injected
397
+ if not self._background_final_message:
398
+ logger.info("πŸ’Ύ Capturing final message from background task")
399
+ self._background_final_message = final_text
400
+ self._emitted_texts.add(final_text)
401
+ else:
402
+ logger.info(f"⚠️ Skipping capture - pending message already exists: {self._background_final_message[:50]}...")
403
+ # Close any open utterance
404
+ if self._outer_open:
405
+ await self.push_frame(LLMFullResponseEndFrame())
406
+ self._outer_open = False
407
+ else:
408
+ # Normal foreground - push immediately
409
+ # Close backchannel utterance if open
410
+ if self._outer_open:
411
+ await self.push_frame(LLMFullResponseEndFrame())
412
+ self._outer_open = False
413
+ # Emit final explanation as its own message
414
+ self._emitted_texts.add(final_text)
415
+ await self.push_frame(LLMFullResponseStartFrame())
416
+ await self.push_frame(LLMTextFrame(_tts_sanitize(final_text)))
417
  await self.push_frame(LLMFullResponseEndFrame())
 
 
 
 
 
 
418
 
419
  # Messages mode: look for an array of messages
420
  if event == "messages" or event.endswith(":messages"):
 
452
  await self.push_frame(LLMTextFrame(_tts_sanitize(txt)))
453
  except Exception as exc: # noqa: BLE001
454
  logger.error(f"LangGraph stream error: {exc}")
455
+ finally:
456
+ # Mark operation complete if this was a main thread
457
+ if thread_type == "main":
458
+ self._last_was_long_operation = True
459
+ logger.info("βœ… Main thread operation completed")
460
+
461
+ async def _stream_langgraph(self, text: str) -> None:
462
+ """Route to main or secondary thread, running main operations in background."""
463
+ # Determine thread type based on whether a long operation is running
464
+ logger.info(f"🎯 _stream_langgraph called: enable_multi_threading={self.enable_multi_threading}")
465
+ thread_type = "main"
466
+ if self.enable_multi_threading:
467
+ long_operation_running = await self._check_long_operation_running()
468
+ if long_operation_running:
469
+ thread_type = "secondary"
470
+ self._interim_messages_reset = False
471
+ logger.info("πŸ”€ Long operation detected, routing to secondary thread")
472
+ else:
473
+ # Starting new main operation
474
+ if self._last_was_long_operation:
475
+ self._interim_messages_reset = True
476
+ self._last_was_long_operation = False
477
+ else:
478
+ self._interim_messages_reset = True
479
+ logger.info("▢️ No long operation, routing to main thread")
480
+
481
+ # Ensure appropriate thread
482
+ thread_id = await self._ensure_thread(thread_type)
483
+
484
+ # Build config with namespace for store coordination
485
+ config = {
486
+ "configurable": {
487
+ "user_email": self.user_email,
488
+ "thread_id": thread_id,
489
+ "namespace_for_memory": list(self._namespace_for_memory),
490
+ }
491
+ }
492
+
493
+ # Build input dict for multi-threaded agent
494
+ if self.enable_multi_threading:
495
+ input_payload = {
496
+ "messages": [{"type": "human", "content": text}],
497
+ "thread_type": thread_type,
498
+ "interim_messages_reset": self._interim_messages_reset,
499
+ }
500
+ else:
501
+ # Backward compatible: simple message input
502
+ input_payload = [HumanMessage(content=text)]
503
+
504
+ # For main thread operations, run in background to allow subsequent messages
505
+ if self.enable_multi_threading and thread_type == "main":
506
+ logger.info("πŸš€ Starting main thread operation in background")
507
+
508
+ # Cancel any existing background main task and monitor
509
+ if self._background_main_task is not None and not self._background_main_task.done():
510
+ logger.info("⚠️ Canceling previous background main task")
511
+ self._background_main_task.cancel()
512
+ try:
513
+ await self._background_main_task
514
+ except asyncio.CancelledError:
515
+ pass
516
+ if self._background_monitor_task is not None and not self._background_monitor_task.done():
517
+ self._background_monitor_task.cancel()
518
+ try:
519
+ await self._background_monitor_task
520
+ except asyncio.CancelledError:
521
+ pass
522
+
523
+ # Start new background task (with is_background=True to capture final message)
524
+ self._background_main_task = asyncio.create_task(
525
+ self._stream_langgraph_impl(text, thread_type, thread_id, config, input_payload, is_background=True)
526
+ )
527
+
528
+ # Start monitor to inject final message when background task completes
529
+ self._background_monitor_task = asyncio.create_task(self._monitor_background_task())
530
+
531
+ # Don't await - return immediately to allow pipeline to process next message
532
+ logger.info("✨ Main thread operation dispatched, pipeline is now free")
533
+ else:
534
+ # Secondary thread or non-multi-threaded: run synchronously (should be fast)
535
+ logger.info(f"⚑ Running {thread_type} thread operation synchronously")
536
+ await self._stream_langgraph_impl(text, thread_type, thread_id, config, input_payload, is_background=False)
537
 
538
  async def _process_context_and_frames(self, context: OpenAILLMContext) -> None:
539
  """Adapter entrypoint: push start/end frames and stream tokens."""
 
570
  if self._current_task is not None and not self._current_task.done():
571
  await self.cancel_task(self._current_task)
572
  self._current_task = None
573
+ # For multi-threading: check if a long operation is running before cancelling
574
+ long_op_running = False
575
+ if self.enable_multi_threading:
576
+ long_op_running = await self._check_long_operation_running()
577
+
578
+ # Only cancel background tasks if NOT in a long operation (which should continue)
579
+ if not long_op_running:
580
+ if self._background_main_task is not None and not self._background_main_task.done():
581
+ logger.info("πŸ›‘ Canceling background main task due to interruption")
582
+ self._background_main_task.cancel()
583
+ try:
584
+ await self._background_main_task
585
+ except asyncio.CancelledError:
586
+ pass
587
+ self._background_main_task = None
588
+ if self._background_monitor_task is not None and not self._background_monitor_task.done():
589
+ logger.info("πŸ›‘ Canceling background monitor task due to interruption")
590
+ self._background_monitor_task.cancel()
591
+ try:
592
+ await self._background_monitor_task
593
+ except asyncio.CancelledError:
594
+ pass
595
+ self._background_monitor_task = None
596
+ else:
597
+ logger.info("πŸ”„ Long operation running - keeping background tasks alive, secondary will handle interruption")
598
  return
599
  else:
600
  await super().process_frame(frame, direction)
examples/voice_agent_multi_thread/pipeline.py CHANGED
@@ -281,14 +281,28 @@ async def run_bot(webrtc_connection, ws: WebSocket, assistant_override: str | No
281
  selected_assistant = assistant_override or os.getenv("LANGGRAPH_ASSISTANT", "ace-base-agent")
282
  logger.info(f"Using LangGraph assistant: {selected_assistant}")
283
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
  # Enable multi-threading for telco agent
285
- enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]
 
286
 
287
  llm = LangGraphLLMService(
288
  base_url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"),
289
  assistant=selected_assistant,
290
  user_email=os.getenv("USER_EMAIL", "test@example.com"),
291
- stream_mode=os.getenv("LANGGRAPH_STREAM_MODE", "values"),
292
  debug_stream=os.getenv("LANGGRAPH_DEBUG_STREAM", "false").lower() == "true",
293
  enable_multi_threading=enable_multi_threading,
294
  )
 
281
  selected_assistant = assistant_override or os.getenv("LANGGRAPH_ASSISTANT", "ace-base-agent")
282
  logger.info(f"Using LangGraph assistant: {selected_assistant}")
283
 
284
+ # Determine assistant name (may be UUID, need to fetch graph_id)
285
+ assistant_name = selected_assistant # Default to the value we have
286
+ try:
287
+ # If it looks like a UUID, fetch the assistant details to get the graph_id
288
+ if len(selected_assistant) > 30 and "-" in selected_assistant:
289
+ from langgraph_sdk import get_client
290
+ langgraph_client = get_client(url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"))
291
+ assistant_info = await langgraph_client.assistants.get(selected_assistant)
292
+ assistant_name = assistant_info.get("graph_id", selected_assistant)
293
+ logger.info(f"Resolved assistant UUID to graph_id: {assistant_name}")
294
+ except Exception as exc: # noqa: BLE001
295
+ logger.warning(f"Failed to resolve assistant name, using as-is: {exc}")
296
+
297
  # Enable multi-threading for telco agent
298
+ enable_multi_threading = assistant_name in ["telco-agent", "wire-transfer-agent"]
299
+ logger.info(f"Multi-threading enabled: {enable_multi_threading} for assistant: {assistant_name}")
300
 
301
  llm = LangGraphLLMService(
302
  base_url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"),
303
  assistant=selected_assistant,
304
  user_email=os.getenv("USER_EMAIL", "test@example.com"),
305
+ # stream_mode now auto-set based on enable_multi_threading (["values", "custom"] for multi-thread, "values" for single)
306
  debug_stream=os.getenv("LANGGRAPH_DEBUG_STREAM", "false").lower() == "true",
307
  enable_multi_threading=enable_multi_threading,
308
  )
examples/voice_agent_webrtc_langgraph/agents/healthcare-agent/TESTING_GUIDE.md CHANGED
@@ -287,3 +287,4 @@ If tests fail, provide:
287
  3. **Expected behavior** vs **actual behavior**
288
  4. **Log excerpt** from `app.log` showing tool calls
289
  5. **Which test scenario** from this guide
 
 
287
  3. **Expected behavior** vs **actual behavior**
288
  4. **Log excerpt** from `app.log` showing tool calls
289
  5. **Which test scenario** from this guide
290
+
examples/voice_agent_webrtc_langgraph/agents/healthcare-agent/prompts.py CHANGED
@@ -42,7 +42,7 @@ HEALTHCARE_SYSTEM_PROMPT = (
42
  "1. GREETING: Begin with a warm, brief greeting. Ask for the caller's full name.\n"
43
  " - Call find_patient(full_name=...) to get patient_id\n"
44
  "\n"
45
- "2. IDENTITY VERIFICATION (CRITICAL - MANDATORY before any medical info):\n"
46
  " - Ask for date of birth in any format (you will normalize it)\n"
47
  " - Call verify_identity(dob_yyyy_mm_dd=...) with the DOB\n"
48
  " - IMPORTANT: Check the response:\n"
@@ -52,36 +52,60 @@ HEALTHCARE_SYSTEM_PROMPT = (
52
  " - NEVER claim verification until verified=true is returned\n"
53
  " - If verification fails after all attempts, politely explain you cannot access medical records\n"
54
  "\n"
 
 
 
 
 
 
55
  "3. AFTER VERIFIED=TRUE:\n"
56
  " - Call get_patient_profile_tool() to retrieve allergies, medications, conditions\n"
57
  " - Ask: 'What brings you in today?' or 'What's going on?'\n"
58
  " - Gather chief complaint and symptoms using ONE question at a time\n"
59
- " - Ask brief follow-ups to clarify: duration, severity, associated symptoms\n"
60
- "\n"
61
- "4. TRIAGE AND GUIDANCE:\n"
62
- " - Call triage_symptoms_tool(symptoms_text=...) with a natural description of all symptoms\n"
63
- " - Check the 'risk' level and 'red_flags' in the response:\n"
64
- " * If risk='urgent' or red_flags present: Direct to ER/911 immediately with clear urgency\n"
65
- " * If risk='soon': Give advice and strongly recommend scheduling within 1-2 days\n"
66
- " * If risk='self_care': Give the advice, check allergies/meds for safety, offer optional follow-up\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  " - ALWAYS consider patient's allergies and current medications before recommending anything (including OTC)\n"
 
68
  "\n"
69
- "5. APPOINTMENT BOOKING (if appropriate):\n"
70
  " - Call list_providers_tool() to get available providers\n"
71
  " - Present 1-2 provider options to patient\n"
72
  " - Call get_provider_slots_tool(provider_id=...) for chosen provider\n"
73
- " - Present 2-3 time slots in friendly format (today at 8pm, tomorrow at 8:30am, etc.)\n"
74
  " - After patient chooses, call schedule_appointment_tool(provider_id=..., slot_iso=...)\n"
 
75
  " - Confirm appointment and mention sending details to their phone\n"
 
76
  "\n"
77
- "6. PHARMACY CONFIRMATION (if prescriptions likely):\n"
78
  " - Call get_preferred_pharmacy_tool() to get on-file pharmacy\n"
79
  " - Ask: 'Should we keep your pharmacy at [address]?'\n"
80
  "\n"
81
- "7. CLOSING:\n"
82
  " - Call log_call_tool(notes=..., triage_json=...) to document the encounter\n"
83
- " - Provide any urgent precautions (e.g., 'If fever goes over 102, weakness, or confusion develop, seek urgent care')\n"
84
- " - End with a warm closing\n"
 
85
  "\n"
86
  "## COMMUNICATION STYLE:\n"
87
  "- Use a calm, empathetic, warm tone\n"
@@ -90,4 +114,10 @@ HEALTHCARE_SYSTEM_PROMPT = (
90
  "- Avoid medical jargon; use plain language\n"
91
  "- Be conversational and natural - imagine speaking to a patient on the phone\n"
92
  "- Remember: PLAIN TEXT ONLY for text-to-speech (see TTS rules at top)\n"
 
 
 
 
 
 
93
  )
 
42
  "1. GREETING: Begin with a warm, brief greeting. Ask for the caller's full name.\n"
43
  " - Call find_patient(full_name=...) to get patient_id\n"
44
  "\n"
45
+ "2. IDENTITY VERIFICATION (CRITICAL - Do this ONCE at start of call ONLY):\n"
46
  " - Ask for date of birth in any format (you will normalize it)\n"
47
  " - Call verify_identity(dob_yyyy_mm_dd=...) with the DOB\n"
48
  " - IMPORTANT: Check the response:\n"
 
52
  " - NEVER claim verification until verified=true is returned\n"
53
  " - If verification fails after all attempts, politely explain you cannot access medical records\n"
54
  "\n"
55
+ " CRITICAL: ONCE VERIFIED, DO NOT ASK FOR AUTHENTICATION AGAIN IN THIS CALL!\n"
56
+ " - Verification persists for the ENTIRE conversation\n"
57
+ " - If you already verified the patient earlier in this call, continue the conversation\n"
58
+ " - Do NOT re-verify because of unclear input or confusion - just ask for clarification\n"
59
+ " - Only one authentication per call session\n"
60
+ "\n"
61
  "3. AFTER VERIFIED=TRUE:\n"
62
  " - Call get_patient_profile_tool() to retrieve allergies, medications, conditions\n"
63
  " - Ask: 'What brings you in today?' or 'What's going on?'\n"
64
  " - Gather chief complaint and symptoms using ONE question at a time\n"
65
+ "\n"
66
+ "4. SYMPTOM ASSESSMENT (CRITICAL - Do thorough assessment BEFORE jumping to urgent care):\n"
67
+ " - Ask clarifying questions about duration, severity, onset\n"
68
+ " - For common symptoms like headache or fever, ACTIVELY SCREEN FOR RED FLAGS by asking:\n"
69
+ " * 'Do you have any severe symptoms like worst headache of your life, confusion, or vision changes?'\n"
70
+ " * 'Any numbness, weakness, or difficulty moving?'\n"
71
+ " * 'Any recent head injury or stiff neck?'\n"
72
+ " - Listen carefully to answers: 'No numbness' means NO red flag, not a red flag\n"
73
+ " - For chest pain specifically: Ask about severity, radiation, shortness of breath, sweating\n"
74
+ " - Be measured and thorough - most symptoms are NOT emergencies\n"
75
+ "\n"
76
+ "5. TRIAGE AND GUIDANCE (After thorough assessment):\n"
77
+ " - IMPORTANT: The triage tool uses keyword matching, so only describe PRESENT symptoms\n"
78
+ " - DO NOT list absent symptoms (saying 'no numbness' will incorrectly trigger 'numbness' keyword)\n"
79
+ " - Good: 'mild headache for 2 days, gradual onset, improved with rest'\n"
80
+ " - Bad: 'headache, no confusion, no numbness' (will cause false urgent classification)\n"
81
+ " - Call triage_symptoms_tool(symptoms_text=...) with description of actual complaints only\n"
82
+ " - INTERPRET TRIAGE RESULTS CAREFULLY:\n"
83
+ " * If risk='urgent' AND red_flags list is NOT EMPTY: Then escalate to ER/911\n"
84
+ " * If risk='urgent' but patient explicitly denied red flag symptoms: Use clinical judgment - likely can schedule appointment instead\n"
85
+ " * If risk='soon': Give advice and recommend scheduling within 1-2 days\n"
86
+ " * If risk='self_care': Give advice, check allergies/meds for safety, offer optional follow-up\n"
87
  " - ALWAYS consider patient's allergies and current medications before recommending anything (including OTC)\n"
88
+ " - Remember: Most headaches, fevers, and minor aches do NOT need emergency care\n"
89
  "\n"
90
+ "6. APPOINTMENT BOOKING (if appropriate):\n"
91
  " - Call list_providers_tool() to get available providers\n"
92
  " - Present 1-2 provider options to patient\n"
93
  " - Call get_provider_slots_tool(provider_id=...) for chosen provider\n"
94
+ " - Present 2-3 time slots in friendly format (Monday at 10am, Monday at 2:30pm, Tuesday at 9:15am)\n"
95
  " - After patient chooses, call schedule_appointment_tool(provider_id=..., slot_iso=...)\n"
96
+ " - If patient gives partial answer like 'Tuesday' or 'the Tuesday one', confirm the full time: 'Tuesday at 9:15 a.m., correct?'\n"
97
  " - Confirm appointment and mention sending details to their phone\n"
98
+ " - DO NOT re-authenticate just because you need to confirm the appointment time\n"
99
  "\n"
100
+ "7. PHARMACY CONFIRMATION (if prescriptions likely):\n"
101
  " - Call get_preferred_pharmacy_tool() to get on-file pharmacy\n"
102
  " - Ask: 'Should we keep your pharmacy at [address]?'\n"
103
  "\n"
104
+ "8. CLOSING:\n"
105
  " - Call log_call_tool(notes=..., triage_json=...) to document the encounter\n"
106
+ " - Provide safety net advice only if appropriate (e.g., 'If fever goes over 102, seek urgent care')\n"
107
+ " - Do NOT over-warn about emergencies for minor symptoms - this causes unnecessary anxiety\n"
108
+ " - End with a warm, reassuring closing\n"
109
  "\n"
110
  "## COMMUNICATION STYLE:\n"
111
  "- Use a calm, empathetic, warm tone\n"
 
114
  "- Avoid medical jargon; use plain language\n"
115
  "- Be conversational and natural - imagine speaking to a patient on the phone\n"
116
  "- Remember: PLAIN TEXT ONLY for text-to-speech (see TTS rules at top)\n"
117
+ "\n"
118
+ "## HANDLING UNCLEAR INPUT:\n"
119
+ "- If you don't understand what the patient said, ask for clarification: 'I'm sorry, could you repeat that?'\n"
120
+ "- If the patient gives an ambiguous answer (e.g., 'I said tues'), ask: 'Did you mean Tuesday at 9:15 a.m.?'\n"
121
+ "- DO NOT re-authenticate the patient just because you didn't understand their input\n"
122
+ "- Maintain conversation context - you already know who they are if you verified them earlier\n"
123
  )
examples/voice_agent_webrtc_langgraph/agents/healthcare-agent/tools.py CHANGED
@@ -261,39 +261,60 @@ def schedule_appointment_tool(provider_id: str, slot_iso: str, patient_id: str |
261
  def triage_symptoms_tool(patient_id: str | None, symptoms_text: str) -> str:
262
  """Analyze patient symptoms using clinical triage rules to determine urgency and guidance.
263
 
264
- WHEN TO CALL: After collecting chief complaint and symptoms from patient. This is a CORE tool.
 
 
 
 
 
 
 
265
 
266
  PARAMETERS:
267
  - patient_id: From find_patient() result (auto-injected if available, used for age-based rules)
268
- - symptoms_text: Natural language description of symptoms collected from patient (e.g., "headache and fatigue, no fever")
 
 
 
 
269
 
270
  RETURNS: JSON with:
271
  - "risk": "urgent" | "soon" | "self_care" - Urgency level
272
  - "advice": "Try rest, hydration..." - Clinical guidance to share with patient
273
- - "red_flags": ["stiff neck", "high fever"] - List of concerning symptoms detected (empty array if none)
274
  - "rule": "Headache - typical" - Internal rule name that matched
275
 
276
  RISK LEVELS:
277
- - "urgent": Emergency - direct to ER/911 immediately
278
  - "soon": Schedule appointment within 1-2 days
279
  - "self_care": Home care with OTC medications, monitor symptoms
280
 
281
- WHAT TO DO WITH RESULTS:
282
- - If risk="urgent" AND red_flags present: "This may require urgent evaluation. Please go to the nearest ER or call 911."
283
- - If risk="soon": Give brief advice, then offer appointment: "I recommend scheduling within a day or two."
284
- - If risk="self_care": Give the advice verbatim, check patient profile for medication safety, offer follow-up appointment
 
285
  - ALWAYS tailor advice based on patient's allergies and current medications from get_patient_profile_tool()
286
-
287
- EXAMPLE 1 (URGENT):
288
- β†’ Call triage_symptoms_tool(symptoms_text="severe chest pain and shortness of breath")
289
- ← Returns: {"risk": "urgent", "advice": "Chest pain can be serious. Please call emergency services now.", "red_flags": ["chest pain"]}
290
- β†’ Say: "Chest pain can be serious. Please call 911 now or go to the nearest emergency room."
291
-
292
- EXAMPLE 2 (SELF-CARE):
293
- β†’ Call triage_symptoms_tool(symptoms_text="mild headache and tired, no fever or neck stiffness")
294
- ← Returns: {"risk": "self_care", "advice": "Try rest, hydration, and acetaminophen as directed...", "red_flags": []}
295
- Patient profile shows: allergies=["Penicillin"], medications=[{"name": "Acetaminophen", "otc": true}]
296
- β†’ Say: "Since you're already on acetaminophen as needed and have no concerning symptoms, try rest and hydration. Would you like a follow-up appointment?"
 
 
 
 
 
 
 
 
 
297
  """
298
  return json.dumps(triage_symptoms(patient_id, symptoms_text))
299
 
 
261
  def triage_symptoms_tool(patient_id: str | None, symptoms_text: str) -> str:
262
  """Analyze patient symptoms using clinical triage rules to determine urgency and guidance.
263
 
264
+ WHEN TO CALL: ONLY after thorough symptom assessment. Ask clarifying questions about red flags BEFORE calling this tool.
265
+
266
+ CRITICAL: This tool uses simple keyword matching, so be VERY CAREFUL with your symptoms_text.
267
+ - Only include symptoms that ARE PRESENT
268
+ - Do NOT mention symptoms that are absent (saying "no numbness" will trigger the "numbness" keyword!)
269
+ - Instead, after screening for red flags, ONLY list positive findings in symptoms_text
270
+ - Use descriptive language: "mild headache for 2 days, gradual onset, no concerning features"
271
+ - If patient denies all red flags, do NOT list them - just describe the actual complaint
272
 
273
  PARAMETERS:
274
  - patient_id: From find_patient() result (auto-injected if available, used for age-based rules)
275
+ - symptoms_text: Description of PRESENT symptoms only (DO NOT list absent symptoms to avoid false triggers)
276
+ Good: "mild headache for 2 days, gradual onset, relieved by rest"
277
+ Good: "moderate headache with fever 101F, started yesterday"
278
+ Bad: "headache, no numbness, no confusion" (will trigger "numbness" and "confusion" keywords!)
279
+ Bad: "headache" (too vague, lacks detail for proper triage)
280
 
281
  RETURNS: JSON with:
282
  - "risk": "urgent" | "soon" | "self_care" - Urgency level
283
  - "advice": "Try rest, hydration..." - Clinical guidance to share with patient
284
+ - "red_flags": ["stiff neck", "high fever"] - Keywords detected (may include false positives!)
285
  - "rule": "Headache - typical" - Internal rule name that matched
286
 
287
  RISK LEVELS:
288
+ - "urgent": Potential emergency (but verify with clinical judgment)
289
  - "soon": Schedule appointment within 1-2 days
290
  - "self_care": Home care with OTC medications, monitor symptoms
291
 
292
+ WHAT TO DO WITH RESULTS (USE CLINICAL JUDGMENT):
293
+ - If risk="urgent" AND red_flags has items AND patient confirmed those symptoms: Direct to ER/911
294
+ - If risk="urgent" BUT patient explicitly denied red flag symptoms: FALSE POSITIVE - schedule appointment instead
295
+ - If risk="soon": Give advice and offer appointment within 1-2 days
296
+ - If risk="self_care": Give advice, check allergies/meds for safety, offer optional follow-up
297
  - ALWAYS tailor advice based on patient's allergies and current medications from get_patient_profile_tool()
298
+ - Remember: Most common symptoms (headache, fever, fatigue) are NOT emergencies
299
+
300
+ EXAMPLE 1 (TRUE URGENT):
301
+ Conversation: Patient says "severe crushing chest pain, sweating, short of breath"
302
+ β†’ Call triage_symptoms_tool(symptoms_text="severe chest pain with sweating and shortness of breath")
303
+ ← Returns: {"risk": "urgent", "red_flags": ["chest pain"]}
304
+ β†’ Clinical judgment: Patient confirmed severe chest pain = TRUE URGENT
305
+ β†’ Say: "This sounds serious. Please call 911 now or go to the nearest emergency room."
306
+
307
+ EXAMPLE 2 (AVOIDING FALSE POSITIVES):
308
+ Conversation: You ask "Any severe symptoms like confusion, weakness, or numbness?" Patient says "No, none of those"
309
+ β†’ Call triage_symptoms_tool(symptoms_text="mild headache for 2 days, gradual onset, relieved with rest")
310
+ ← Returns: {"risk": "self_care", "red_flags": []}
311
+ β†’ Say: "Try rest, hydration, and acetaminophen. Would you like a follow-up appointment?"
312
+ (Note: Did NOT mention "no confusion, no numbness" to avoid triggering those keywords)
313
+
314
+ EXAMPLE 3 (SELF-CARE):
315
+ β†’ Call triage_symptoms_tool(symptoms_text="low-grade fever 100.5F for 1 day with mild fatigue")
316
+ ← Returns: {"risk": "self_care", "advice": "Hydration, rest, and acetaminophen can help..."}
317
+ β†’ Say: "For a low-grade fever, rest and hydration are key. You're already taking acetaminophen as needed, which is safe with your medications."
318
  """
319
  return json.dumps(triage_symptoms(patient_id, symptoms_text))
320