dvalle08 commited on
Commit
d0f8e17
·
1 Parent(s): bf5b959

feat: Add post-tool response timeout and enhance error handling in channel metrics

Browse files
.env.example CHANGED
@@ -33,6 +33,7 @@ LANGFUSE_BASE_URL= # Optional alternative to LANGFUSE_HOST
33
  LANGFUSE_PUBLIC_KEY=
34
  LANGFUSE_SECRET_KEY=
35
  LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS=8000
 
36
  LANGFUSE_MAX_PENDING_TRACE_TASKS=200
37
  LANGFUSE_TRACE_FLUSH_TIMEOUT_MS=1000
38
 
 
33
  LANGFUSE_PUBLIC_KEY=
34
  LANGFUSE_SECRET_KEY=
35
  LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS=8000
36
+ LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS=30000
37
  LANGFUSE_MAX_PENDING_TRACE_TASKS=200
38
  LANGFUSE_TRACE_FLUSH_TIMEOUT_MS=1000
39
 
src/agent/models/llm_runtime.py CHANGED
@@ -11,7 +11,7 @@ from src.agent.prompts.runtime import MCP_STARTUP_GREETING
11
  from src.core.logger import logger
12
 
13
  NVIDIA_OPENAI_BASE_URL = "https://integrate.api.nvidia.com/v1"
14
- MCP_STARTUP_GREETING_TIMEOUT_SEC = 4.0
15
  MCP_GENERATE_REPLY_BLOCK_MESSAGE = (
16
  "Manual generate_reply is disabled in MCP mode; use session.say(...) instead."
17
  )
@@ -176,7 +176,7 @@ def run_startup_greeting(
176
  return session.say(
177
  MCP_STARTUP_GREETING,
178
  allow_interruptions=True,
179
- add_to_chat_ctx=True,
180
  )
181
  except Exception as exc:
182
  logger.warning(f"MCP startup greeting could not start: {exc}")
 
11
  from src.core.logger import logger
12
 
13
  NVIDIA_OPENAI_BASE_URL = "https://integrate.api.nvidia.com/v1"
14
+ MCP_STARTUP_GREETING_TIMEOUT_SEC = 8.0
15
  MCP_GENERATE_REPLY_BLOCK_MESSAGE = (
16
  "Manual generate_reply is disabled in MCP mode; use session.say(...) instead."
17
  )
 
176
  return session.say(
177
  MCP_STARTUP_GREETING,
178
  allow_interruptions=True,
179
+ add_to_chat_ctx=False,
180
  )
181
  except Exception as exc:
182
  logger.warning(f"MCP startup greeting could not start: {exc}")
src/agent/runtime/session.py CHANGED
@@ -23,7 +23,7 @@ from src.agent.models.stt_factory import create_stt
23
  from src.agent.runtime.assistant import Assistant
24
  from src.agent.runtime.tasks import (
25
  cancel_task_for_shutdown,
26
- schedule_llm_warmup_task,
27
  schedule_startup_greeting_task,
28
  )
29
  from src.agent.tools.feedback import ToolFeedbackController
@@ -63,7 +63,6 @@ async def session_handler(ctx: agents.JobContext) -> None:
63
  )
64
  trace_provider = setup_langfuse_tracer()
65
  startup_greeting_task: asyncio.Task[Any] | None = None
66
- llm_warmup_task: asyncio.Task[Any] | None = None
67
  tool_feedback = ToolFeedbackController(enabled=False)
68
 
69
  if trace_provider:
@@ -84,14 +83,6 @@ async def session_handler(ctx: agents.JobContext) -> None:
84
 
85
  ctx.add_shutdown_callback(cancel_startup_greeting)
86
 
87
- async def cancel_llm_warmup(_: str) -> None:
88
- await cancel_task_for_shutdown(
89
- llm_warmup_task,
90
- task_name="llm warm-up",
91
- )
92
-
93
- ctx.add_shutdown_callback(cancel_llm_warmup)
94
-
95
  async def close_tool_feedback(_: str) -> None:
96
  await tool_feedback.aclose()
97
 
@@ -160,7 +151,12 @@ async def session_handler(ctx: agents.JobContext) -> None:
160
  )
161
  mcp_runtime_active = llm_runtime.mcp_runtime_active
162
  tool_feedback = ToolFeedbackController(enabled=mcp_runtime_active)
163
- llm_warmup_task = schedule_llm_warmup_task(
 
 
 
 
 
164
  llm_client=llm_runtime.llm,
165
  conn_options=llm_conn_options,
166
  provider=llm_runtime.provider,
 
23
  from src.agent.runtime.assistant import Assistant
24
  from src.agent.runtime.tasks import (
25
  cancel_task_for_shutdown,
26
+ run_llm_warmup,
27
  schedule_startup_greeting_task,
28
  )
29
  from src.agent.tools.feedback import ToolFeedbackController
 
63
  )
64
  trace_provider = setup_langfuse_tracer()
65
  startup_greeting_task: asyncio.Task[Any] | None = None
 
66
  tool_feedback = ToolFeedbackController(enabled=False)
67
 
68
  if trace_provider:
 
83
 
84
  ctx.add_shutdown_callback(cancel_startup_greeting)
85
 
 
 
 
 
 
 
 
 
86
  async def close_tool_feedback(_: str) -> None:
87
  await tool_feedback.aclose()
88
 
 
151
  )
152
  mcp_runtime_active = llm_runtime.mcp_runtime_active
153
  tool_feedback = ToolFeedbackController(enabled=mcp_runtime_active)
154
+ logger.info(
155
+ "Running LLM warm-up before session start: provider=%s model=%s",
156
+ llm_runtime.provider,
157
+ llm_runtime.model,
158
+ )
159
+ await run_llm_warmup(
160
  llm_client=llm_runtime.llm,
161
  conn_options=llm_conn_options,
162
  provider=llm_runtime.provider,
src/agent/traces/channel_metrics.py CHANGED
@@ -90,7 +90,13 @@ class ChannelPublisher:
90
  reliable=True,
91
  )
92
  except Exception as e:
93
- logger.error(f"Failed to publish live metrics update: {e}")
 
 
 
 
 
 
94
 
95
  async def publish_conversation_turn(self, turn_metrics: TurnMetrics) -> None:
96
  """Publish completed turn metrics to LiveKit data channel."""
@@ -140,6 +146,10 @@ def _stt_display_duration(stt_metrics: STTMetrics) -> float:
140
  return stt_metrics.audio_duration
141
 
142
 
 
 
 
 
143
  def _build_partial_latencies(
144
  *,
145
  turn_metrics: Optional[TurnMetrics],
 
90
  reliable=True,
91
  )
92
  except Exception as e:
93
+ if _is_preconnect_publish_error(e):
94
+ logger.debug(
95
+ "Skipping live metrics update before room connect: %s",
96
+ e,
97
+ )
98
+ else:
99
+ logger.error(f"Failed to publish live metrics update: {e}")
100
 
101
  async def publish_conversation_turn(self, turn_metrics: TurnMetrics) -> None:
102
  """Publish completed turn metrics to LiveKit data channel."""
 
146
  return stt_metrics.audio_duration
147
 
148
 
149
+ def _is_preconnect_publish_error(exc: Exception) -> bool:
150
+ return "cannot access local participant before connecting" in str(exc).lower()
151
+
152
+
153
  def _build_partial_latencies(
154
  *,
155
  turn_metrics: Optional[TurnMetrics],
src/agent/traces/metrics_collector.py CHANGED
@@ -250,6 +250,7 @@ class MetricsCollector:
250
  self._llm_stall_tasks: dict[str, asyncio.Task[None]] = {}
251
  self._latest_vad_metrics: Optional[VADMetrics] = None
252
  self._latest_vad_metric_attributes: Optional[dict[str, Any]] = None
 
253
  self._llm_stall_timeout_sec = max(
254
  float(
255
  getattr(
@@ -281,6 +282,14 @@ class MetricsCollector:
281
  def _trace_finalize_timeout_sec(self, value: float) -> None:
282
  self._tracer._trace_finalize_timeout_sec = value
283
 
 
 
 
 
 
 
 
 
284
  # ------------------------------------------------------------------
285
  # Public event handlers
286
  # ------------------------------------------------------------------
@@ -310,6 +319,14 @@ class MetricsCollector:
310
  if not normalized:
311
  return
312
  self._pending_transcripts.append(normalized)
 
 
 
 
 
 
 
 
313
  self._start_llm_stall_watchdog(transcript=normalized)
314
  room_id = await self._resolve_room_id()
315
  await self._tracer.create_turn(user_transcript=normalized, room_id=room_id)
 
250
  self._llm_stall_tasks: dict[str, asyncio.Task[None]] = {}
251
  self._latest_vad_metrics: Optional[VADMetrics] = None
252
  self._latest_vad_metric_attributes: Optional[dict[str, Any]] = None
253
+ self._first_final_user_turn_logged = False
254
  self._llm_stall_timeout_sec = max(
255
  float(
256
  getattr(
 
282
  def _trace_finalize_timeout_sec(self, value: float) -> None:
283
  self._tracer._trace_finalize_timeout_sec = value
284
 
285
+ @property
286
+ def _trace_post_tool_response_timeout_sec(self) -> float:
287
+ return self._tracer._trace_post_tool_response_timeout_sec
288
+
289
+ @_trace_post_tool_response_timeout_sec.setter
290
+ def _trace_post_tool_response_timeout_sec(self, value: float) -> None:
291
+ self._tracer._trace_post_tool_response_timeout_sec = value
292
+
293
  # ------------------------------------------------------------------
294
  # Public event handlers
295
  # ------------------------------------------------------------------
 
319
  if not normalized:
320
  return
321
  self._pending_transcripts.append(normalized)
322
+ if not self._first_final_user_turn_logged:
323
+ self._first_final_user_turn_logged = True
324
+ logger.info(
325
+ "First finalized user transcript received: room=%s chars=%s preview=%r",
326
+ self._room_name,
327
+ len(normalized),
328
+ normalized[:80],
329
+ )
330
  self._start_llm_stall_watchdog(transcript=normalized)
331
  room_id = await self._resolve_room_id()
332
  await self._tracer.create_turn(user_transcript=normalized, room_id=room_id)
src/agent/traces/turn_tracer.py CHANGED
@@ -150,6 +150,7 @@ class ToolExecutionBlock:
150
 
151
 
152
  _DEFAULT_TRACE_FINALIZE_TIMEOUT_MS = 8000.0
 
153
  _DEFAULT_MAX_PENDING_TRACE_TASKS = 200
154
  _DEFAULT_TRACE_FLUSH_TIMEOUT_SEC = 1.0
155
 
@@ -186,6 +187,7 @@ class TurnTracer:
186
  self._trace_lock = asyncio.Lock()
187
  self._trace_emit_tasks: set[asyncio.Task[None]] = set()
188
  self._trace_finalize_tasks: dict[str, asyncio.Task[None]] = {}
 
189
 
190
  self._trace_finalize_timeout_sec = (
191
  max(
@@ -198,6 +200,17 @@ class TurnTracer:
198
  )
199
  / 1000.0
200
  )
 
 
 
 
 
 
 
 
 
 
 
201
  self._trace_max_pending_tasks = max(
202
  int(
203
  getattr(
@@ -563,18 +576,21 @@ class TurnTracer:
563
  return
564
 
565
  completed_turn: Optional[TraceTurn] = None
566
- schedule_timeout_for_turn: Optional[str] = None
567
  async with self._trace_lock:
568
  if trace_turn not in self._pending_trace_turns:
569
  return
570
  if not self._is_complete(trace_turn):
571
  if self._should_schedule_finalize_timeout(trace_turn):
572
- schedule_timeout_for_turn = trace_turn.turn_id
 
 
 
573
  else:
574
  completed_turn = self._finalize_locked(trace_turn)
575
 
576
  if schedule_timeout_for_turn:
577
- self._schedule_finalize_timeout(schedule_timeout_for_turn)
578
  if completed_turn:
579
  self._schedule_trace_emit(completed_turn)
580
 
@@ -682,22 +698,31 @@ class TurnTracer:
682
  and turn.tts_calls
683
  and not self._is_complete(turn)
684
  and not (turn.tool_phase_open and turn.last_tool_event_at is None)
685
- and turn.turn_id not in self._trace_finalize_tasks
686
- and self._trace_finalize_timeout_sec > 0.0
687
  )
688
 
 
 
 
 
 
689
  def _requires_post_tool_response(self, turn: TraceTurn) -> bool:
690
  if not turn.tool_step_announced and turn.last_tool_event_order is None:
691
  return False
692
  return not self._post_tool_response_observed(turn)
693
 
694
- def _post_tool_response_observed(self, turn: TraceTurn) -> bool:
695
  if turn.last_tool_event_order is None:
696
  return False
697
- assistant_seen = bool(
698
  turn.assistant_text_updated_order is not None
699
  and turn.assistant_text_updated_order > turn.last_tool_event_order
700
  )
 
 
 
 
 
701
  tts_seen = bool(
702
  turn.tts_updated_order is not None
703
  and turn.tts_updated_order > turn.last_tool_event_order
@@ -717,7 +742,12 @@ class TurnTracer:
717
  *,
718
  missing_assistant_fallback: bool = False,
719
  tool_post_response_missing: bool = False,
 
720
  ) -> TraceTurn:
 
 
 
 
721
  if not turn.prompt_text:
722
  turn.prompt_text = turn.user_transcript
723
  if not turn.response_text and turn.assistant_text:
@@ -726,7 +756,13 @@ class TurnTracer:
726
  turn.assistant_text = turn.response_text
727
 
728
  if missing_assistant_fallback and not turn.assistant_text:
729
- fallback = self._best_available_assistant_text(turn)
 
 
 
 
 
 
730
  if fallback:
731
  turn.assistant_text = fallback
732
  if not turn.response_text:
@@ -746,15 +782,40 @@ class TurnTracer:
746
  self._cancel_finalize_timeout(turn.turn_id)
747
  return turn
748
 
749
- def _best_available_assistant_text(self, turn: TraceTurn) -> str:
 
 
 
 
 
 
750
  if turn.assistant_text.strip():
751
- return turn.assistant_text.strip()
 
 
 
 
 
 
 
752
  if turn.response_text.strip():
753
- return turn.response_text.strip()
 
 
 
 
 
 
 
754
  for tts_call in reversed(turn.tts_calls):
 
 
 
 
 
755
  if tts_call.assistant_text.strip():
756
  return tts_call.assistant_text.strip()
757
- if self._pending_agent_transcripts:
758
  return self._pending_agent_transcripts.popleft().strip()
759
  return ""
760
 
@@ -762,32 +823,66 @@ class TurnTracer:
762
  # Timeout scheduling
763
  # ------------------------------------------------------------------
764
 
765
- def _schedule_finalize_timeout(self, turn_id: str) -> None:
766
- if turn_id in self._trace_finalize_tasks:
767
  return
768
- task = asyncio.create_task(self._finalize_after_timeout(turn_id))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
769
  self._trace_finalize_tasks[turn_id] = task
770
  task.add_done_callback(
771
- lambda _: self._trace_finalize_tasks.pop(turn_id, None)
 
 
 
772
  )
773
 
 
 
 
 
 
774
  def _cancel_finalize_timeout(self, turn_id: str) -> None:
 
775
  task = self._trace_finalize_tasks.pop(turn_id, None)
776
  current = asyncio.current_task()
777
  if task and not task.done() and task is not current:
778
  task.cancel()
779
 
780
- async def _finalize_after_timeout(self, turn_id: str) -> None:
781
- await asyncio.sleep(self._trace_finalize_timeout_sec)
 
 
 
 
 
 
782
 
783
  completed_turn: Optional[TraceTurn] = None
784
  async with self._trace_lock:
 
 
 
785
  pending_turn = next(
786
  (t for t in self._pending_trace_turns if t.turn_id == turn_id),
787
  None,
788
  )
789
  if not pending_turn:
790
  return
 
791
  if self._is_complete(pending_turn):
792
  completed_turn = self._finalize_locked(pending_turn)
793
  elif (
@@ -798,15 +893,24 @@ class TurnTracer:
798
  requires_post_tool_response = self._requires_post_tool_response(
799
  pending_turn
800
  )
 
 
 
 
801
  completed_turn = self._finalize_locked(
802
  pending_turn,
803
- missing_assistant_fallback=not bool(pending_turn.assistant_text),
 
 
 
804
  tool_post_response_missing=requires_post_tool_response,
 
805
  )
806
 
807
  if completed_turn:
808
  self._schedule_trace_emit(completed_turn)
809
 
 
810
  # ------------------------------------------------------------------
811
  # Trace emission
812
  # ------------------------------------------------------------------
 
150
 
151
 
152
  _DEFAULT_TRACE_FINALIZE_TIMEOUT_MS = 8000.0
153
+ _DEFAULT_POST_TOOL_RESPONSE_TIMEOUT_MS = 30000.0
154
  _DEFAULT_MAX_PENDING_TRACE_TASKS = 200
155
  _DEFAULT_TRACE_FLUSH_TIMEOUT_SEC = 1.0
156
 
 
187
  self._trace_lock = asyncio.Lock()
188
  self._trace_emit_tasks: set[asyncio.Task[None]] = set()
189
  self._trace_finalize_tasks: dict[str, asyncio.Task[None]] = {}
190
+ self._trace_finalize_task_versions: dict[str, int] = {}
191
 
192
  self._trace_finalize_timeout_sec = (
193
  max(
 
200
  )
201
  / 1000.0
202
  )
203
+ self._trace_post_tool_response_timeout_sec = (
204
+ max(
205
+ getattr(
206
+ settings.langfuse,
207
+ "LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS",
208
+ _DEFAULT_POST_TOOL_RESPONSE_TIMEOUT_MS,
209
+ ),
210
+ 0.0,
211
+ )
212
+ / 1000.0
213
+ )
214
  self._trace_max_pending_tasks = max(
215
  int(
216
  getattr(
 
576
  return
577
 
578
  completed_turn: Optional[TraceTurn] = None
579
+ schedule_timeout_for_turn: Optional[tuple[str, float]] = None
580
  async with self._trace_lock:
581
  if trace_turn not in self._pending_trace_turns:
582
  return
583
  if not self._is_complete(trace_turn):
584
  if self._should_schedule_finalize_timeout(trace_turn):
585
+ schedule_timeout_for_turn = (
586
+ trace_turn.turn_id,
587
+ self._resolve_finalize_timeout_sec(trace_turn),
588
+ )
589
  else:
590
  completed_turn = self._finalize_locked(trace_turn)
591
 
592
  if schedule_timeout_for_turn:
593
+ self._schedule_finalize_timeout(*schedule_timeout_for_turn)
594
  if completed_turn:
595
  self._schedule_trace_emit(completed_turn)
596
 
 
698
  and turn.tts_calls
699
  and not self._is_complete(turn)
700
  and not (turn.tool_phase_open and turn.last_tool_event_at is None)
701
+ and self._resolve_finalize_timeout_sec(turn) > 0.0
 
702
  )
703
 
704
+ def _resolve_finalize_timeout_sec(self, turn: TraceTurn) -> float:
705
+ if self._requires_post_tool_response(turn):
706
+ return self._trace_post_tool_response_timeout_sec
707
+ return self._trace_finalize_timeout_sec
708
+
709
  def _requires_post_tool_response(self, turn: TraceTurn) -> bool:
710
  if not turn.tool_step_announced and turn.last_tool_event_order is None:
711
  return False
712
  return not self._post_tool_response_observed(turn)
713
 
714
+ def _post_tool_assistant_observed(self, turn: TraceTurn) -> bool:
715
  if turn.last_tool_event_order is None:
716
  return False
717
+ return bool(
718
  turn.assistant_text_updated_order is not None
719
  and turn.assistant_text_updated_order > turn.last_tool_event_order
720
  )
721
+
722
+ def _post_tool_response_observed(self, turn: TraceTurn) -> bool:
723
+ if turn.last_tool_event_order is None:
724
+ return False
725
+ assistant_seen = self._post_tool_assistant_observed(turn)
726
  tts_seen = bool(
727
  turn.tts_updated_order is not None
728
  and turn.tts_updated_order > turn.last_tool_event_order
 
742
  *,
743
  missing_assistant_fallback: bool = False,
744
  tool_post_response_missing: bool = False,
745
+ drop_assistant_text: bool = False,
746
  ) -> TraceTurn:
747
+ if drop_assistant_text:
748
+ turn.assistant_text = ""
749
+ turn.response_text = ""
750
+
751
  if not turn.prompt_text:
752
  turn.prompt_text = turn.user_transcript
753
  if not turn.response_text and turn.assistant_text:
 
756
  turn.assistant_text = turn.response_text
757
 
758
  if missing_assistant_fallback and not turn.assistant_text:
759
+ fallback = self._best_available_assistant_text(
760
+ turn,
761
+ min_observed_order=(
762
+ turn.last_tool_event_order if tool_post_response_missing else None
763
+ ),
764
+ include_pending_agent_transcripts=not tool_post_response_missing,
765
+ )
766
  if fallback:
767
  turn.assistant_text = fallback
768
  if not turn.response_text:
 
782
  self._cancel_finalize_timeout(turn.turn_id)
783
  return turn
784
 
785
+ def _best_available_assistant_text(
786
+ self,
787
+ turn: TraceTurn,
788
+ *,
789
+ min_observed_order: Optional[int] = None,
790
+ include_pending_agent_transcripts: bool = True,
791
+ ) -> str:
792
  if turn.assistant_text.strip():
793
+ if (
794
+ min_observed_order is None
795
+ or (
796
+ turn.assistant_text_updated_order is not None
797
+ and turn.assistant_text_updated_order > min_observed_order
798
+ )
799
+ ):
800
+ return turn.assistant_text.strip()
801
  if turn.response_text.strip():
802
+ if (
803
+ min_observed_order is None
804
+ or (
805
+ turn.assistant_text_updated_order is not None
806
+ and turn.assistant_text_updated_order > min_observed_order
807
+ )
808
+ ):
809
+ return turn.response_text.strip()
810
  for tts_call in reversed(turn.tts_calls):
811
+ if (
812
+ min_observed_order is not None
813
+ and tts_call.observed_order <= min_observed_order
814
+ ):
815
+ continue
816
  if tts_call.assistant_text.strip():
817
  return tts_call.assistant_text.strip()
818
+ if include_pending_agent_transcripts and self._pending_agent_transcripts:
819
  return self._pending_agent_transcripts.popleft().strip()
820
  return ""
821
 
 
823
  # Timeout scheduling
824
  # ------------------------------------------------------------------
825
 
826
+ def _schedule_finalize_timeout(self, turn_id: str, timeout_sec: float) -> None:
827
+ if timeout_sec <= 0.0:
828
  return
829
+
830
+ version = self._trace_finalize_task_versions.get(turn_id, 0) + 1
831
+ self._trace_finalize_task_versions[turn_id] = version
832
+
833
+ existing_task = self._trace_finalize_tasks.get(turn_id)
834
+ current = asyncio.current_task()
835
+ if existing_task and not existing_task.done() and existing_task is not current:
836
+ existing_task.cancel()
837
+
838
+ task = asyncio.create_task(
839
+ self._finalize_after_timeout(
840
+ turn_id=turn_id,
841
+ version=version,
842
+ timeout_sec=timeout_sec,
843
+ )
844
+ )
845
  self._trace_finalize_tasks[turn_id] = task
846
  task.add_done_callback(
847
+ lambda _task, tid=turn_id, v=version: self._on_finalize_timeout_task_done(
848
+ turn_id=tid,
849
+ version=v,
850
+ )
851
  )
852
 
853
+ def _on_finalize_timeout_task_done(self, *, turn_id: str, version: int) -> None:
854
+ if self._trace_finalize_task_versions.get(turn_id) != version:
855
+ return
856
+ self._trace_finalize_tasks.pop(turn_id, None)
857
+
858
  def _cancel_finalize_timeout(self, turn_id: str) -> None:
859
+ self._trace_finalize_task_versions.pop(turn_id, None)
860
  task = self._trace_finalize_tasks.pop(turn_id, None)
861
  current = asyncio.current_task()
862
  if task and not task.done() and task is not current:
863
  task.cancel()
864
 
865
+ async def _finalize_after_timeout(
866
+ self,
867
+ *,
868
+ turn_id: str,
869
+ version: int,
870
+ timeout_sec: float,
871
+ ) -> None:
872
+ await asyncio.sleep(timeout_sec)
873
 
874
  completed_turn: Optional[TraceTurn] = None
875
  async with self._trace_lock:
876
+ if self._trace_finalize_task_versions.get(turn_id) != version:
877
+ return
878
+
879
  pending_turn = next(
880
  (t for t in self._pending_trace_turns if t.turn_id == turn_id),
881
  None,
882
  )
883
  if not pending_turn:
884
  return
885
+
886
  if self._is_complete(pending_turn):
887
  completed_turn = self._finalize_locked(pending_turn)
888
  elif (
 
893
  requires_post_tool_response = self._requires_post_tool_response(
894
  pending_turn
895
  )
896
+ missing_post_tool_assistant = bool(
897
+ requires_post_tool_response
898
+ and not self._post_tool_assistant_observed(pending_turn)
899
+ )
900
  completed_turn = self._finalize_locked(
901
  pending_turn,
902
+ missing_assistant_fallback=(
903
+ missing_post_tool_assistant
904
+ or not bool(pending_turn.assistant_text)
905
+ ),
906
  tool_post_response_missing=requires_post_tool_response,
907
+ drop_assistant_text=missing_post_tool_assistant,
908
  )
909
 
910
  if completed_turn:
911
  self._schedule_trace_emit(completed_turn)
912
 
913
+
914
  # ------------------------------------------------------------------
915
  # Trace emission
916
  # ------------------------------------------------------------------
src/core/settings.py CHANGED
@@ -188,7 +188,7 @@ class LLMSettings(CoreSettings):
188
 
189
  # NVIDIA settings
190
  NVIDIA_API_KEY: Optional[str] = Field(default=None)
191
- NVIDIA_MODEL: str = Field(default="qwen/qwen3-next-80b-a3b-instruct")
192
 
193
  # Ollama settings
194
  OLLAMA_BASE_URL: str = Field(
@@ -268,6 +268,15 @@ class LangfuseSettings(CoreSettings):
268
  le=10000.0,
269
  description="Timeout to wait for assistant text before force-finalizing trace",
270
  )
 
 
 
 
 
 
 
 
 
271
  LANGFUSE_MAX_PENDING_TRACE_TASKS: int = Field(
272
  default=200,
273
  ge=1,
 
188
 
189
  # NVIDIA settings
190
  NVIDIA_API_KEY: Optional[str] = Field(default=None)
191
+ NVIDIA_MODEL: str = Field(default="meta/llama-3.1-8b-instruct") #"qwen/qwen3-next-80b-a3b-instruct"
192
 
193
  # Ollama settings
194
  OLLAMA_BASE_URL: str = Field(
 
268
  le=10000.0,
269
  description="Timeout to wait for assistant text before force-finalizing trace",
270
  )
271
+ LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS: float = Field(
272
+ default=30000.0,
273
+ ge=0.0,
274
+ le=120000.0,
275
+ description=(
276
+ "Timeout to wait for post-tool assistant response before force-finalizing trace; "
277
+ "telemetry only, does not affect live audio latency"
278
+ ),
279
+ )
280
  LANGFUSE_MAX_PENDING_TRACE_TASKS: int = Field(
281
  default=200,
282
  ge=1,
tests/test_agent_mcp_runtime.py CHANGED
@@ -245,7 +245,7 @@ def testrun_startup_greeting_uses_say_in_mcp_mode() -> None:
245
  "text": MCP_STARTUP_GREETING,
246
  "kwargs": {
247
  "allow_interruptions": True,
248
- "add_to_chat_ctx": True,
249
  },
250
  }
251
  ]
@@ -280,7 +280,7 @@ def testrun_startup_greeting_swallows_say_exception() -> None:
280
  handle = run_startup_greeting(session, mcp_runtime_active=True) # type: ignore[arg-type]
281
 
282
  assert handle is None
283
- assert session.say_calls == [{"text": MCP_STARTUP_GREETING, "kwargs": {"allow_interruptions": True, "add_to_chat_ctx": True}}]
284
  assert session.generate_reply_calls == []
285
 
286
 
 
245
  "text": MCP_STARTUP_GREETING,
246
  "kwargs": {
247
  "allow_interruptions": True,
248
+ "add_to_chat_ctx": False,
249
  },
250
  }
251
  ]
 
280
  handle = run_startup_greeting(session, mcp_runtime_active=True) # type: ignore[arg-type]
281
 
282
  assert handle is None
283
+ assert session.say_calls == [{"text": MCP_STARTUP_GREETING, "kwargs": {"allow_interruptions": True, "add_to_chat_ctx": False}}]
284
  assert session.generate_reply_calls == []
285
 
286
 
tests/test_channel_metrics.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ from typing import Any
5
+
6
+ import pytest
7
+
8
+ from src.agent.traces.channel_metrics import ChannelPublisher
9
+
10
+
11
+ class _FailingLocalParticipant:
12
+ def __init__(self, exc: Exception) -> None:
13
+ self._exc = exc
14
+
15
+ async def publish_data(
16
+ self,
17
+ *,
18
+ payload: bytes,
19
+ topic: str,
20
+ reliable: bool,
21
+ ) -> None:
22
+ _ = (payload, topic, reliable)
23
+ raise self._exc
24
+
25
+
26
+ class _FakeRoom:
27
+ def __init__(self, exc: Exception) -> None:
28
+ self.local_participant = _FailingLocalParticipant(exc)
29
+
30
+
31
+ def test_publish_live_update_downgrades_preconnect_publish_error(
32
+ monkeypatch: pytest.MonkeyPatch,
33
+ ) -> None:
34
+ import src.agent.traces.channel_metrics as channel_metrics_module
35
+
36
+ debug_logs: list[tuple[Any, ...]] = []
37
+ error_logs: list[tuple[Any, ...]] = []
38
+ monkeypatch.setattr(
39
+ channel_metrics_module.logger,
40
+ "debug",
41
+ lambda *args, **kwargs: debug_logs.append((args, kwargs)),
42
+ )
43
+ monkeypatch.setattr(
44
+ channel_metrics_module.logger,
45
+ "error",
46
+ lambda *args, **kwargs: error_logs.append((args, kwargs)),
47
+ )
48
+
49
+ room = _FakeRoom(
50
+ RuntimeError("cannot access local participant before connecting")
51
+ )
52
+ publisher = ChannelPublisher(room) # type: ignore[arg-type]
53
+
54
+ asyncio.run(
55
+ publisher.publish_live_update(
56
+ speech_id=None,
57
+ stage="llm",
58
+ role=None,
59
+ turn_metrics=None,
60
+ )
61
+ )
62
+
63
+ assert debug_logs
64
+ assert not error_logs
65
+
66
+
67
+ def test_publish_live_update_keeps_error_logging_for_other_failures(
68
+ monkeypatch: pytest.MonkeyPatch,
69
+ ) -> None:
70
+ import src.agent.traces.channel_metrics as channel_metrics_module
71
+
72
+ debug_logs: list[tuple[Any, ...]] = []
73
+ error_logs: list[tuple[Any, ...]] = []
74
+ monkeypatch.setattr(
75
+ channel_metrics_module.logger,
76
+ "debug",
77
+ lambda *args, **kwargs: debug_logs.append((args, kwargs)),
78
+ )
79
+ monkeypatch.setattr(
80
+ channel_metrics_module.logger,
81
+ "error",
82
+ lambda *args, **kwargs: error_logs.append((args, kwargs)),
83
+ )
84
+
85
+ room = _FakeRoom(RuntimeError("unexpected publish failure"))
86
+ publisher = ChannelPublisher(room) # type: ignore[arg-type]
87
+
88
+ asyncio.run(
89
+ publisher.publish_live_update(
90
+ speech_id=None,
91
+ stage="llm",
92
+ role=None,
93
+ turn_metrics=None,
94
+ )
95
+ )
96
+
97
+ assert error_logs
98
+ assert not debug_logs
tests/test_langfuse_turn_tracing.py CHANGED
@@ -868,6 +868,7 @@ def test_timeout_finalizes_tool_turn_with_missing_post_tool_response(
868
  langfuse_enabled=True,
869
  )
870
  collector._trace_finalize_timeout_sec = 0.01
 
871
 
872
  async def _run() -> None:
873
  await collector.on_session_metadata(
@@ -907,6 +908,80 @@ def test_timeout_finalizes_tool_turn_with_missing_post_tool_response(
907
  root = fake_tracer.spans[0]
908
  assert root.attributes["tool.phase_announced"] is True
909
  assert root.attributes["tool.post_response_missing"] is True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
910
 
911
 
912
  def test_tool_event_without_matching_turn_is_ignored(
 
868
  langfuse_enabled=True,
869
  )
870
  collector._trace_finalize_timeout_sec = 0.01
871
+ collector._trace_post_tool_response_timeout_sec = 0.01
872
 
873
  async def _run() -> None:
874
  await collector.on_session_metadata(
 
908
  root = fake_tracer.spans[0]
909
  assert root.attributes["tool.phase_announced"] is True
910
  assert root.attributes["tool.post_response_missing"] is True
911
+ assert root.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
912
+
913
+
914
+ def test_post_tool_timeout_prevents_early_finalize_of_pre_tool_leadin(
915
+ monkeypatch: pytest.MonkeyPatch,
916
+ ) -> None:
917
+ import src.agent.traces.metrics_collector as metrics_collector_module
918
+
919
+ fake_tracer = _FakeTracer()
920
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
921
+
922
+ room = _FakeRoom()
923
+ collector = MetricsCollector(
924
+ room=room, # type: ignore[arg-type]
925
+ model_name="moonshine",
926
+ room_name=room.name,
927
+ room_id="RM123",
928
+ participant_id="web-123",
929
+ langfuse_enabled=True,
930
+ )
931
+ collector._trace_finalize_timeout_sec = 0.01
932
+ collector._trace_post_tool_response_timeout_sec = 0.08
933
+
934
+ async def _run() -> None:
935
+ await collector.on_session_metadata(
936
+ session_id="session-post-tool-timeout-window",
937
+ participant_id="web-123",
938
+ )
939
+ await collector.on_user_input_transcribed("find me the best paper", is_final=True)
940
+ await collector.on_metrics_collected(_make_llm_metrics("speech-post-tool-timeout-window"))
941
+ await collector.on_tool_step_started()
942
+ await collector.on_conversation_item_added(role="assistant", content="I'll look that up.")
943
+ await collector.on_metrics_collected(_make_tts_metrics("speech-post-tool-timeout-window"))
944
+ await collector.on_function_tools_executed(
945
+ function_calls=[
946
+ _FakeFunctionCall(
947
+ name="paper_search",
948
+ call_id="call-post-tool-timeout-window",
949
+ arguments='{"query":"mps cubic phases"}',
950
+ created_at=400.0,
951
+ )
952
+ ],
953
+ function_call_outputs=[
954
+ _FakeFunctionCallOutput(
955
+ output='{"results":[{"title":"A key paper"}]}',
956
+ is_error=False,
957
+ created_at=400.2,
958
+ )
959
+ ],
960
+ created_at=400.2,
961
+ )
962
+
963
+ # The base finalize timeout has elapsed, but post-tool timeout should keep the turn pending.
964
+ await asyncio.sleep(0.03)
965
+ await collector.wait_for_pending_trace_tasks()
966
+ assert not fake_tracer.spans
967
+
968
+ await collector.on_conversation_item_added(
969
+ role="assistant",
970
+ content="The most cited paper is Attention Is All You Need.",
971
+ )
972
+ await collector.on_metrics_collected(
973
+ _make_tts_metrics("speech-post-tool-timeout-window")
974
+ )
975
+ await collector.wait_for_pending_trace_tasks()
976
+
977
+ asyncio.run(_run())
978
+
979
+ root = fake_tracer.spans[0]
980
+ assert (
981
+ root.attributes["langfuse.trace.output"]
982
+ == "The most cited paper is Attention Is All You Need."
983
+ )
984
+ assert root.attributes["tool.post_response_missing"] is False
985
 
986
 
987
  def test_tool_event_without_matching_turn_is_ignored(