dvalle08 commited on
Commit
04178a2
·
1 Parent(s): 7fdafe4

feat: Enhance turn tracing and metrics collection

Browse files

- Introduced `LANGFUSE_ASSISTANT_TEXT_GRACE_TIMEOUT_MS` in `.env.example` and `settings.py` to manage the grace period for waiting on assistant text before finalizing traces.
- Refactored `MetricsCollector` and `TurnTracer` to support new metrics and improve handling of user utterances and assistant text sources.
- Updated `PocketTTS` initialization to streamline parameter handling.
- Enhanced tests to validate the new features and ensure proper functionality of the updated tracing and metrics collection mechanisms.

.env.example CHANGED
@@ -42,7 +42,8 @@ LANGFUSE_PROJECT_ID= # Required for frontend deep links: project/<project_id>/.
42
  LANGFUSE_PUBLIC_KEY=
43
  LANGFUSE_SECRET_KEY=
44
  LANGFUSE_PUBLIC_TRACES=false # Mark traces public so non-members can open shared links
45
- LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS=8000
 
46
  LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS=30000
47
  LANGFUSE_MAX_PENDING_TRACE_TASKS=200
48
  LANGFUSE_TRACE_FLUSH_TIMEOUT_MS=1000
 
42
  LANGFUSE_PUBLIC_KEY=
43
  LANGFUSE_SECRET_KEY=
44
  LANGFUSE_PUBLIC_TRACES=false # Mark traces public so non-members can open shared links
45
+ LANGFUSE_ASSISTANT_TEXT_GRACE_TIMEOUT_MS=500 # Short wait for assistant text on normal turns
46
+ LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS=8000 # Legacy fallback retained for compatibility
47
  LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS=30000
48
  LANGFUSE_MAX_PENDING_TRACE_TASKS=200
49
  LANGFUSE_TRACE_FLUSH_TIMEOUT_MS=1000
src/agent/models/tts_factory.py CHANGED
@@ -127,11 +127,12 @@ def create_tts() -> Any:
127
  settings.voice.POCKET_TTS_TEMPERATURE,
128
  settings.voice.POCKET_TTS_LSD_DECODE_STEPS,
129
  )
130
- return PocketTTS(
131
  voice=settings.voice.POCKET_TTS_VOICE,
132
  temperature=settings.voice.POCKET_TTS_TEMPERATURE,
133
  lsd_decode_steps=settings.voice.POCKET_TTS_LSD_DECODE_STEPS,
134
  )
 
135
 
136
  if provider == "deepgram":
137
  logger.info("Initializing Deepgram TTS with plugin defaults")
 
127
  settings.voice.POCKET_TTS_TEMPERATURE,
128
  settings.voice.POCKET_TTS_LSD_DECODE_STEPS,
129
  )
130
+ pocket_kwargs: dict[str, Any] = dict(
131
  voice=settings.voice.POCKET_TTS_VOICE,
132
  temperature=settings.voice.POCKET_TTS_TEMPERATURE,
133
  lsd_decode_steps=settings.voice.POCKET_TTS_LSD_DECODE_STEPS,
134
  )
135
+ return PocketTTS(**pocket_kwargs)
136
 
137
  if provider == "deepgram":
138
  logger.info("Initializing Deepgram TTS with plugin defaults")
src/agent/traces/metrics_collector.py CHANGED
@@ -7,6 +7,7 @@ real-time monitoring. Also creates one Langfuse trace per finalized user turn.
7
  from __future__ import annotations
8
 
9
  import asyncio
 
10
  import json
11
  import uuid
12
  from collections import deque
@@ -211,6 +212,7 @@ class PendingUserUtterance:
211
  stt_observed: bool = False
212
  llm_started: bool = False
213
  watchdog_id: Optional[str] = None
 
214
 
215
 
216
  @dataclass
@@ -271,6 +273,9 @@ class MetricsCollector:
271
  self._latest_vad_metrics: Optional[VADMetrics] = None
272
  self._latest_vad_metric_attributes: Optional[dict[str, Any]] = None
273
  self._first_final_user_turn_logged = False
 
 
 
274
  self._event_queue: deque[QueuedCollectorEvent] = deque()
275
  self._event_worker_task: asyncio.Task[None] | None = None
276
  self._event_loop: asyncio.AbstractEventLoop | None = None
@@ -492,7 +497,7 @@ class MetricsCollector:
492
  if not normalized:
493
  return
494
  if role == "user":
495
- utterance = self._latest_user_utterance()
496
  if utterance is None:
497
  utterance = PendingUserUtterance(
498
  transcript=normalized,
@@ -500,7 +505,10 @@ class MetricsCollector:
500
  )
501
  self._pending_user_utterances.append(utterance)
502
  else:
503
- utterance.transcript = normalized
 
 
 
504
  utterance.committed = True
505
  if utterance.watchdog_id is not None:
506
  self._update_llm_stall_watchdog(
@@ -511,8 +519,9 @@ class MetricsCollector:
511
  item_created_at if item_created_at is not None else event_created_at
512
  )
513
  await self._tracer.attach_user_text(
514
- normalized,
515
  event_created_at=user_event_created_at,
 
516
  )
517
  return
518
  assistant_event_created_at = (
@@ -521,6 +530,7 @@ class MetricsCollector:
521
  await self._on_assistant_text(
522
  normalized,
523
  event_created_at=assistant_event_created_at,
 
524
  )
525
 
526
  async def on_function_tools_executed(
@@ -569,6 +579,11 @@ class MetricsCollector:
569
  if speech_id:
570
  self._pending_speech_ids_for_first_audio.append(speech_id)
571
 
 
 
 
 
 
572
  assistant_text, assistant_created_at = _extract_latest_assistant_chat_item(
573
  getattr(speech_handle, "chat_items", [])
574
  )
@@ -577,6 +592,7 @@ class MetricsCollector:
577
  assistant_text,
578
  event_created_at=assistant_created_at,
579
  speech_id=speech_id,
 
580
  )
581
 
582
  add_done_callback = getattr(speech_handle, "add_done_callback", None)
@@ -584,25 +600,45 @@ class MetricsCollector:
584
  return
585
 
586
  def _on_done(handle: Any) -> None:
587
- try:
588
- done_speech_id = _normalize(getattr(handle, "id", None))
589
- text, created_at = _extract_latest_assistant_chat_item(
590
- getattr(handle, "chat_items", [])
591
- )
592
- self._submit_serialized(
593
- self._handle_speech_done,
594
- done_speech_id,
595
- text,
596
- created_at,
597
- )
598
- except Exception:
599
- return
 
 
 
 
 
600
 
601
  try:
602
  add_done_callback(_on_done)
603
  except Exception:
604
  return
605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
606
  async def _handle_speech_done(
607
  self,
608
  speech_id: Optional[str],
@@ -616,6 +652,7 @@ class MetricsCollector:
616
  assistant_text,
617
  event_created_at=event_created_at,
618
  speech_id=speech_id,
 
619
  )
620
 
621
  async def on_agent_state_changed(
@@ -669,70 +706,6 @@ class MetricsCollector:
669
  started_at=time(),
670
  )
671
 
672
- async def on_tts_synthesized(
673
- self,
674
- *,
675
- ttfb: float,
676
- duration: float,
677
- audio_duration: float,
678
- ) -> None:
679
- await self._call_serialized(
680
- self._handle_tts_synthesized,
681
- ttfb=ttfb,
682
- duration=duration,
683
- audio_duration=audio_duration,
684
- )
685
-
686
- async def _handle_tts_synthesized(
687
- self,
688
- *,
689
- ttfb: float,
690
- duration: float,
691
- audio_duration: float,
692
- ) -> None:
693
- if ttfb < 0:
694
- return
695
- speech_id = self._latest_agent_speech_id or f"tts-{uuid.uuid4()}"
696
- turn_metrics = self._get_or_create_turn(speech_id, role="agent")
697
- turn_metrics.tts = TTSMetrics(
698
- type="tts_metrics",
699
- label="tts_fallback",
700
- request_id=f"fallback-{speech_id}",
701
- timestamp=time(),
702
- duration=duration,
703
- ttfb=ttfb,
704
- audio_duration=audio_duration,
705
- cancelled=False,
706
- characters_count=0,
707
- streamed=True,
708
- speech_id=speech_id,
709
- )
710
- await self._publish_live_update(speech_id=speech_id, stage="tts", turn_metrics=turn_metrics)
711
- logger.debug("TTS fallback metrics collected: speech_id=%s, ttfb=%.3fs", speech_id, ttfb)
712
- await self._maybe_publish_turn(speech_id, turn_metrics)
713
-
714
- trace_turn = await self._tracer.attach_tts(
715
- duration=duration,
716
- fallback_duration=audio_duration,
717
- ttfb=ttfb,
718
- speech_id=speech_id,
719
- observed_total_latency=self._observed_total_latency(speech_id),
720
- metric_attributes={
721
- "type": "tts_metrics",
722
- "label": "tts_fallback",
723
- "request_id": f"fallback-{speech_id}",
724
- "timestamp": time(),
725
- "duration": duration,
726
- "ttfb": ttfb,
727
- "audio_duration": audio_duration,
728
- "cancelled": False,
729
- "characters_count": 0,
730
- "streamed": True,
731
- "speech_id": speech_id,
732
- },
733
- )
734
- await self._tracer.maybe_finalize(trace_turn)
735
-
736
  async def on_metrics_collected(
737
  self,
738
  collected_metrics: Union[
@@ -790,10 +763,12 @@ class MetricsCollector:
790
  )
791
 
792
  elif isinstance(collected_metrics, metrics.LLMMetrics):
793
- self._mark_llm_stage_reached()
794
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
 
795
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
796
  self._latest_agent_speech_id = speech_id
 
 
797
  turn_metrics.llm = LLMMetrics(
798
  type=collected_metrics.type,
799
  label=collected_metrics.label,
@@ -822,6 +797,7 @@ class MetricsCollector:
822
  elif isinstance(collected_metrics, metrics.TTSMetrics):
823
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
824
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
 
825
  turn_metrics.tts = TTSMetrics(
826
  type=collected_metrics.type,
827
  label=collected_metrics.label,
@@ -835,7 +811,7 @@ class MetricsCollector:
835
  streamed=collected_metrics.streamed,
836
  segment_id=collected_metrics.segment_id,
837
  speech_id=collected_metrics.speech_id,
838
- metadata=_metric_metadata_to_dict(collected_metrics.metadata),
839
  )
840
  await self._publish_live_update(speech_id=speech_id, stage="tts", turn_metrics=turn_metrics)
841
  logger.debug("TTS metrics collected: speech_id=%s, ttfb=%.3fs", speech_id, collected_metrics.ttfb)
@@ -847,11 +823,21 @@ class MetricsCollector:
847
  observed_total_latency=self._observed_total_latency(speech_id),
848
  metric_attributes=_tts_metric_attributes(collected_metrics),
849
  )
 
 
 
 
 
 
 
 
850
 
851
  elif isinstance(collected_metrics, metrics.EOUMetrics):
852
  speech_id = collected_metrics.speech_id
853
  if speech_id:
854
- self._mark_oldest_open_user_utterance_committed()
 
 
855
  state = self._get_or_create_state(speech_id)
856
  if state.speech_end_monotonic is None:
857
  state.speech_end_monotonic = monotonic()
@@ -867,6 +853,8 @@ class MetricsCollector:
867
  metadata=_metric_metadata_to_dict(collected_metrics.metadata),
868
  )
869
  turn_metrics = state.metrics
 
 
870
  if turn_metrics:
871
  turn_metrics.eou = state.eou_metrics
872
  if self._latest_vad_metrics and turn_metrics.vad is None:
@@ -1037,6 +1025,7 @@ class MetricsCollector:
1037
  *,
1038
  event_created_at: Optional[float] = None,
1039
  speech_id: Optional[str] = None,
 
1040
  ) -> None:
1041
  normalized = assistant_text.strip()
1042
  if not normalized:
@@ -1045,9 +1034,86 @@ class MetricsCollector:
1045
  normalized,
1046
  event_created_at=event_created_at,
1047
  speech_id=speech_id,
 
1048
  )
1049
  await self._tracer.maybe_finalize(trace_turn)
1050
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1051
  async def _publish_live_update(
1052
  self,
1053
  *,
@@ -1124,7 +1190,17 @@ class MetricsCollector:
1124
 
1125
  def _current_open_user_utterance(self) -> Optional[PendingUserUtterance]:
1126
  utterance = self._latest_user_utterance()
1127
- if utterance is None or utterance.committed or utterance.llm_started:
 
 
 
 
 
 
 
 
 
 
1128
  return None
1129
  return utterance
1130
 
@@ -1140,13 +1216,40 @@ class MetricsCollector:
1140
  return utterance
1141
  return None
1142
 
1143
- def _mark_oldest_open_user_utterance_committed(self) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1144
  for utterance in self._pending_user_utterances:
1145
- if utterance.committed:
 
 
1146
  continue
1147
  utterance.committed = True
 
1148
  self._prune_resolved_user_utterances()
1149
- return
 
1150
 
1151
  def _prune_resolved_user_utterances(self) -> None:
1152
  while self._pending_user_utterances:
@@ -1181,19 +1284,36 @@ class MetricsCollector:
1181
  return
1182
  utterance.transcript = transcript
1183
 
1184
- def _mark_llm_stage_reached(self) -> None:
1185
- for utterance in self._pending_user_utterances:
1186
- if utterance.llm_started:
1187
- continue
1188
- utterance.llm_started = True
1189
- watchdog_id = utterance.watchdog_id
1190
- utterance.watchdog_id = None
1191
- if watchdog_id is not None:
1192
- task = self._llm_stall_tasks.pop(watchdog_id, None)
1193
- if task:
1194
- task.cancel()
1195
- self._prune_resolved_user_utterances()
1196
- return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1197
 
1198
  def _find_user_utterance_by_watchdog(
1199
  self,
@@ -1278,6 +1398,10 @@ def _merge_user_transcripts(existing: str, incoming: str) -> str:
1278
  return left
1279
  if left == right:
1280
  return left
 
 
 
 
1281
  if right.startswith(left):
1282
  return right
1283
  if left.startswith(right):
@@ -1367,6 +1491,16 @@ def _extract_latest_assistant_chat_item(chat_items: Any) -> tuple[str, Optional[
1367
  return latest_text, latest_created_at
1368
 
1369
 
 
 
 
 
 
 
 
 
 
 
1370
  def _to_optional_float(value: Any) -> Optional[float]:
1371
  if isinstance(value, bool):
1372
  return None
@@ -1393,6 +1527,16 @@ def _metric_metadata_to_dict(metadata: Any) -> Optional[dict[str, Any]]:
1393
  return {"value": str(metadata)}
1394
 
1395
 
 
 
 
 
 
 
 
 
 
 
1396
  def _metadata_attributes(metadata: Any) -> dict[str, Any]:
1397
  data = _metric_metadata_to_dict(metadata)
1398
  if not data:
 
7
  from __future__ import annotations
8
 
9
  import asyncio
10
+ import contextlib
11
  import json
12
  import uuid
13
  from collections import deque
 
212
  stt_observed: bool = False
213
  llm_started: bool = False
214
  watchdog_id: Optional[str] = None
215
+ speech_id: Optional[str] = None
216
 
217
 
218
  @dataclass
 
273
  self._latest_vad_metrics: Optional[VADMetrics] = None
274
  self._latest_vad_metric_attributes: Optional[dict[str, Any]] = None
275
  self._first_final_user_turn_logged = False
276
+ self._speech_item_callback_registered_logged = False
277
+ self._speech_item_callback_unavailable_logged = False
278
+ self._speech_item_callback_failed_logged = False
279
  self._event_queue: deque[QueuedCollectorEvent] = deque()
280
  self._event_worker_task: asyncio.Task[None] | None = None
281
  self._event_loop: asyncio.AbstractEventLoop | None = None
 
497
  if not normalized:
498
  return
499
  if role == "user":
500
+ utterance = self._user_utterance_accepting_manual_update()
501
  if utterance is None:
502
  utterance = PendingUserUtterance(
503
  transcript=normalized,
 
505
  )
506
  self._pending_user_utterances.append(utterance)
507
  else:
508
+ utterance.transcript = _merge_user_transcripts(
509
+ utterance.transcript,
510
+ normalized,
511
+ )
512
  utterance.committed = True
513
  if utterance.watchdog_id is not None:
514
  self._update_llm_stall_watchdog(
 
519
  item_created_at if item_created_at is not None else event_created_at
520
  )
521
  await self._tracer.attach_user_text(
522
+ utterance.transcript,
523
  event_created_at=user_event_created_at,
524
+ speech_id=utterance.speech_id,
525
  )
526
  return
527
  assistant_event_created_at = (
 
530
  await self._on_assistant_text(
531
  normalized,
532
  event_created_at=assistant_event_created_at,
533
+ source="conversation_item",
534
  )
535
 
536
  async def on_function_tools_executed(
 
579
  if speech_id:
580
  self._pending_speech_ids_for_first_audio.append(speech_id)
581
 
582
+ on_item_added = self._register_speech_item_added_callback(
583
+ speech_handle=speech_handle,
584
+ speech_id=speech_id,
585
+ )
586
+
587
  assistant_text, assistant_created_at = _extract_latest_assistant_chat_item(
588
  getattr(speech_handle, "chat_items", [])
589
  )
 
592
  assistant_text,
593
  event_created_at=assistant_created_at,
594
  speech_id=speech_id,
595
+ source="speech_created",
596
  )
597
 
598
  add_done_callback = getattr(speech_handle, "add_done_callback", None)
 
600
  return
601
 
602
  def _on_done(handle: Any) -> None:
603
+ remove_item_added_callback = getattr(
604
+ handle,
605
+ "_remove_item_added_callback",
606
+ None,
607
+ )
608
+ if callable(remove_item_added_callback) and on_item_added is not None:
609
+ with contextlib.suppress(Exception):
610
+ remove_item_added_callback(on_item_added)
611
+ done_speech_id = _normalize(getattr(handle, "id", None))
612
+ text, created_at = _extract_latest_assistant_chat_item(
613
+ getattr(handle, "chat_items", [])
614
+ )
615
+ self._submit_serialized_callback(
616
+ self._handle_speech_done,
617
+ done_speech_id,
618
+ text,
619
+ created_at,
620
+ )
621
 
622
  try:
623
  add_done_callback(_on_done)
624
  except Exception:
625
  return
626
 
627
+ async def _handle_speech_item_added(
628
+ self,
629
+ speech_id: Optional[str],
630
+ assistant_text: str,
631
+ event_created_at: Optional[float],
632
+ ) -> None:
633
+ if not assistant_text:
634
+ return
635
+ await self._on_assistant_text(
636
+ assistant_text,
637
+ event_created_at=event_created_at,
638
+ speech_id=speech_id,
639
+ source="speech_item_added",
640
+ )
641
+
642
  async def _handle_speech_done(
643
  self,
644
  speech_id: Optional[str],
 
652
  assistant_text,
653
  event_created_at=event_created_at,
654
  speech_id=speech_id,
655
+ source="speech_done",
656
  )
657
 
658
  async def on_agent_state_changed(
 
706
  started_at=time(),
707
  )
708
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
709
  async def on_metrics_collected(
710
  self,
711
  collected_metrics: Union[
 
763
  )
764
 
765
  elif isinstance(collected_metrics, metrics.LLMMetrics):
 
766
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
767
+ linked_utterance = self._mark_llm_stage_reached(speech_id)
768
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
769
  self._latest_agent_speech_id = speech_id
770
+ if linked_utterance is not None:
771
+ turn_metrics.transcript = linked_utterance.transcript
772
  turn_metrics.llm = LLMMetrics(
773
  type=collected_metrics.type,
774
  label=collected_metrics.label,
 
797
  elif isinstance(collected_metrics, metrics.TTSMetrics):
798
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
799
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
800
+ tts_metric_metadata = _metric_metadata_to_dict(collected_metrics.metadata)
801
  turn_metrics.tts = TTSMetrics(
802
  type=collected_metrics.type,
803
  label=collected_metrics.label,
 
811
  streamed=collected_metrics.streamed,
812
  segment_id=collected_metrics.segment_id,
813
  speech_id=collected_metrics.speech_id,
814
+ metadata=tts_metric_metadata,
815
  )
816
  await self._publish_live_update(speech_id=speech_id, stage="tts", turn_metrics=turn_metrics)
817
  logger.debug("TTS metrics collected: speech_id=%s, ttfb=%.3fs", speech_id, collected_metrics.ttfb)
 
823
  observed_total_latency=self._observed_total_latency(speech_id),
824
  metric_attributes=_tts_metric_attributes(collected_metrics),
825
  )
826
+ metric_assistant_text = _assistant_text_from_metadata(tts_metric_metadata)
827
+ if metric_assistant_text:
828
+ await self._on_assistant_text(
829
+ metric_assistant_text,
830
+ event_created_at=collected_metrics.timestamp,
831
+ speech_id=speech_id,
832
+ source="tts_metrics",
833
+ )
834
 
835
  elif isinstance(collected_metrics, metrics.EOUMetrics):
836
  speech_id = collected_metrics.speech_id
837
  if speech_id:
838
+ linked_utterance = self._mark_oldest_open_user_utterance_committed(
839
+ speech_id
840
+ )
841
  state = self._get_or_create_state(speech_id)
842
  if state.speech_end_monotonic is None:
843
  state.speech_end_monotonic = monotonic()
 
853
  metadata=_metric_metadata_to_dict(collected_metrics.metadata),
854
  )
855
  turn_metrics = state.metrics
856
+ if turn_metrics and linked_utterance is not None:
857
+ turn_metrics.transcript = linked_utterance.transcript
858
  if turn_metrics:
859
  turn_metrics.eou = state.eou_metrics
860
  if self._latest_vad_metrics and turn_metrics.vad is None:
 
1025
  *,
1026
  event_created_at: Optional[float] = None,
1027
  speech_id: Optional[str] = None,
1028
+ source: str = "unknown",
1029
  ) -> None:
1030
  normalized = assistant_text.strip()
1031
  if not normalized:
 
1034
  normalized,
1035
  event_created_at=event_created_at,
1036
  speech_id=speech_id,
1037
+ source=source,
1038
  )
1039
  await self._tracer.maybe_finalize(trace_turn)
1040
 
1041
+ def _register_speech_item_added_callback(
1042
+ self,
1043
+ *,
1044
+ speech_handle: Any,
1045
+ speech_id: Optional[str],
1046
+ ) -> Callable[[Any], None] | None:
1047
+ add_item_added_callback = getattr(
1048
+ speech_handle,
1049
+ "_add_item_added_callback",
1050
+ None,
1051
+ )
1052
+ if not callable(add_item_added_callback):
1053
+ if not self._speech_item_callback_unavailable_logged:
1054
+ self._speech_item_callback_unavailable_logged = True
1055
+ logger.warning(
1056
+ "SpeechHandle item-added callback unavailable; Langfuse tracing will rely on fallback sources"
1057
+ )
1058
+ return None
1059
+
1060
+ def _on_item_added(item: Any) -> None:
1061
+ try:
1062
+ assistant_text, created_at = _extract_assistant_chat_item(item)
1063
+ if not assistant_text:
1064
+ return
1065
+ self._submit_serialized_callback(
1066
+ self._handle_speech_item_added,
1067
+ speech_id,
1068
+ assistant_text,
1069
+ created_at,
1070
+ )
1071
+ except Exception:
1072
+ return
1073
+
1074
+ try:
1075
+ add_item_added_callback(_on_item_added)
1076
+ except Exception as exc:
1077
+ if not self._speech_item_callback_failed_logged:
1078
+ self._speech_item_callback_failed_logged = True
1079
+ logger.warning(
1080
+ "Failed to register SpeechHandle item-added callback; Langfuse tracing will rely on fallback sources: %s",
1081
+ exc,
1082
+ )
1083
+ return None
1084
+
1085
+ if not self._speech_item_callback_registered_logged:
1086
+ self._speech_item_callback_registered_logged = True
1087
+ logger.debug(
1088
+ "SpeechHandle item-added callback registered for provider-agnostic assistant text capture"
1089
+ )
1090
+ return _on_item_added
1091
+
1092
+ def _submit_serialized_callback(
1093
+ self,
1094
+ handler: Callable[..., Awaitable[Any]],
1095
+ *args: Any,
1096
+ **kwargs: Any,
1097
+ ) -> None:
1098
+ loop = self._event_loop
1099
+ if loop is None or loop.is_closed():
1100
+ return
1101
+
1102
+ def _enqueue() -> None:
1103
+ self._enqueue_serialized(handler, args=args, kwargs=kwargs, waiter=None)
1104
+
1105
+ try:
1106
+ running_loop = asyncio.get_running_loop()
1107
+ except RuntimeError:
1108
+ loop.call_soon_threadsafe(_enqueue)
1109
+ return
1110
+
1111
+ if running_loop is loop:
1112
+ _enqueue()
1113
+ return
1114
+
1115
+ loop.call_soon_threadsafe(_enqueue)
1116
+
1117
  async def _publish_live_update(
1118
  self,
1119
  *,
 
1190
 
1191
  def _current_open_user_utterance(self) -> Optional[PendingUserUtterance]:
1192
  utterance = self._latest_user_utterance()
1193
+ if utterance is None or utterance.llm_started:
1194
+ return None
1195
+ if utterance.committed and utterance.speech_id is None:
1196
+ return None
1197
+ return utterance
1198
+
1199
+ def _user_utterance_accepting_manual_update(self) -> Optional[PendingUserUtterance]:
1200
+ utterance = self._latest_user_utterance()
1201
+ if utterance is None or utterance.llm_started:
1202
+ return None
1203
+ if utterance.committed and utterance.speech_id is None:
1204
  return None
1205
  return utterance
1206
 
 
1216
  return utterance
1217
  return None
1218
 
1219
+ def _find_user_utterance_by_speech_id(
1220
+ self,
1221
+ speech_id: str,
1222
+ *,
1223
+ include_llm_started: bool = False,
1224
+ ) -> Optional[PendingUserUtterance]:
1225
+ for utterance in reversed(self._pending_user_utterances):
1226
+ if utterance.speech_id != speech_id:
1227
+ continue
1228
+ if utterance.llm_started and not include_llm_started:
1229
+ continue
1230
+ return utterance
1231
+ return None
1232
+
1233
+ def _mark_oldest_open_user_utterance_committed(
1234
+ self,
1235
+ speech_id: str,
1236
+ ) -> Optional[PendingUserUtterance]:
1237
+ linked = self._find_user_utterance_by_speech_id(speech_id)
1238
+ if linked is not None:
1239
+ linked.committed = True
1240
+ self._prune_resolved_user_utterances()
1241
+ return linked
1242
+
1243
  for utterance in self._pending_user_utterances:
1244
+ if utterance.llm_started:
1245
+ continue
1246
+ if utterance.speech_id is not None:
1247
  continue
1248
  utterance.committed = True
1249
+ utterance.speech_id = speech_id
1250
  self._prune_resolved_user_utterances()
1251
+ return utterance
1252
+ return None
1253
 
1254
  def _prune_resolved_user_utterances(self) -> None:
1255
  while self._pending_user_utterances:
 
1284
  return
1285
  utterance.transcript = transcript
1286
 
1287
+ def _mark_llm_stage_reached(
1288
+ self,
1289
+ speech_id: Optional[str],
1290
+ ) -> Optional[PendingUserUtterance]:
1291
+ normalized_speech_id = _normalize(speech_id)
1292
+ utterance: Optional[PendingUserUtterance] = None
1293
+ if normalized_speech_id is not None:
1294
+ utterance = self._find_user_utterance_by_speech_id(normalized_speech_id)
1295
+
1296
+ if utterance is None:
1297
+ for candidate in self._pending_user_utterances:
1298
+ if candidate.llm_started:
1299
+ continue
1300
+ utterance = candidate
1301
+ break
1302
+
1303
+ if utterance is None:
1304
+ return None
1305
+
1306
+ if normalized_speech_id is not None and utterance.speech_id is None:
1307
+ utterance.speech_id = normalized_speech_id
1308
+ utterance.llm_started = True
1309
+ watchdog_id = utterance.watchdog_id
1310
+ utterance.watchdog_id = None
1311
+ if watchdog_id is not None:
1312
+ task = self._llm_stall_tasks.pop(watchdog_id, None)
1313
+ if task:
1314
+ task.cancel()
1315
+ self._prune_resolved_user_utterances()
1316
+ return utterance
1317
 
1318
  def _find_user_utterance_by_watchdog(
1319
  self,
 
1398
  return left
1399
  if left == right:
1400
  return left
1401
+ if left.casefold() in right.casefold():
1402
+ return right
1403
+ if right.casefold() in left.casefold():
1404
+ return left
1405
  if right.startswith(left):
1406
  return right
1407
  if left.startswith(right):
 
1491
  return latest_text, latest_created_at
1492
 
1493
 
1494
+ def _extract_assistant_chat_item(item: Any) -> tuple[str, Optional[float]]:
1495
+ role = getattr(item, "role", None)
1496
+ if isinstance(role, str) and role != "assistant":
1497
+ return "", None
1498
+ normalized = _extract_content_text(getattr(item, "content", None)).strip()
1499
+ if not normalized:
1500
+ return "", None
1501
+ return normalized, _to_optional_float(getattr(item, "created_at", None))
1502
+
1503
+
1504
  def _to_optional_float(value: Any) -> Optional[float]:
1505
  if isinstance(value, bool):
1506
  return None
 
1527
  return {"value": str(metadata)}
1528
 
1529
 
1530
+ def _assistant_text_from_metadata(metadata: Optional[dict[str, Any]]) -> str:
1531
+ if not metadata:
1532
+ return ""
1533
+ for key in ("assistant_text", "spoken_text"):
1534
+ value = metadata.get(key)
1535
+ if isinstance(value, str) and value.strip():
1536
+ return value.strip()
1537
+ return ""
1538
+
1539
+
1540
  def _metadata_attributes(metadata: Any) -> dict[str, Any]:
1541
  data = _metric_metadata_to_dict(metadata)
1542
  if not data:
src/agent/traces/turn_tracer.py CHANGED
@@ -80,6 +80,9 @@ class TraceTurn:
80
  assistant_audio_started_at: Optional[float] = None
81
  interrupted: bool = False
82
  interrupted_reason: Optional[str] = None
 
 
 
83
  orphan_assistant_cutoff_at: Optional[float] = None
84
  coalesced_turn_ids: list[str] = field(default_factory=list)
85
  coalesced_user_transcripts: list[str] = field(default_factory=list)
@@ -141,6 +144,7 @@ class AssistantTextRecord:
141
 
142
  text: str
143
  event_created_at: Optional[float] = None
 
144
 
145
 
146
  @dataclass
@@ -170,6 +174,7 @@ class ToolExecutionBlock:
170
 
171
 
172
  _DEFAULT_TRACE_FINALIZE_TIMEOUT_MS = 8000.0
 
173
  _DEFAULT_POST_TOOL_RESPONSE_TIMEOUT_MS = 30000.0
174
  _DEFAULT_MAX_PENDING_TRACE_TASKS = 200
175
  _DEFAULT_TRACE_FLUSH_TIMEOUT_SEC = 1.0
@@ -215,7 +220,21 @@ class TurnTracer:
215
  self._trace_finalize_tasks: dict[str, asyncio.Task[None]] = {}
216
  self._trace_finalize_task_versions: dict[str, int] = {}
217
 
 
 
 
 
 
 
 
218
  self._trace_finalize_timeout_sec = (
 
 
 
 
 
 
 
219
  max(
220
  getattr(
221
  settings.langfuse,
@@ -323,14 +342,20 @@ class TurnTracer:
323
 
324
  async def create_turn(self, *, user_transcript: str, room_id: str) -> None:
325
  completed_turns: list[TraceTurn] = []
 
326
  async with self._trace_lock:
327
  normalized = user_transcript.strip()
328
  if not normalized:
329
  return
330
 
331
- completed_turns = self._finalize_interrupted_turns_before_new_user_turn_locked()
 
 
 
332
 
333
- current_turn = self._latest_turn_where(lambda c: not c.user_turn_committed)
 
 
334
  if current_turn is not None:
335
  self._update_user_turn_text(current_turn, normalized)
336
  else:
@@ -354,15 +379,38 @@ class TurnTracer:
354
 
355
  for completed_turn in completed_turns:
356
  self._schedule_trace_emit(completed_turn)
 
 
357
 
358
  async def attach_user_text(
359
  self,
360
  user_transcript: str,
361
  *,
362
  event_created_at: Optional[float] = None,
 
363
  ) -> Optional[TraceTurn]:
364
  async with self._trace_lock:
365
- turn = self._latest_turn_where(lambda c: not c.assistant_text.strip())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
366
  if turn is None:
367
  turn = self._latest_turn_where(lambda _: True)
368
  if turn is None:
@@ -377,6 +425,7 @@ class TurnTracer:
377
  normalized,
378
  event_created_at=event_created_at,
379
  )
 
380
  turn.user_turn_committed = True
381
  turn.user_turn_committed_at = _resolved_event_timestamp(
382
  _to_optional_float(event_created_at)
@@ -499,6 +548,7 @@ class TurnTracer:
499
  if turn.llm_ttft_ms is None:
500
  turn.llm_ttft_ms = llm_call.ttft_ms
501
  turn.llm_total_latency_ms = _sum_llm_duration_ms(turn.llm_calls)
 
502
  _recompute_perceived_first_audio_latency(turn)
503
  return turn
504
 
@@ -524,6 +574,17 @@ class TurnTracer:
524
  self._apply_buffered_assistant_text_for_speech_id(turn)
525
 
526
  tts_attrs = _sanitize_component_attributes(metric_attributes)
 
 
 
 
 
 
 
 
 
 
 
527
  order = self._next_event_order(turn)
528
  tts_call = TTSCallTrace(
529
  duration_ms=_duration_to_ms(duration, fallback_duration),
@@ -557,6 +618,7 @@ class TurnTracer:
557
  turn.tts_updated_at = _resolved_event_timestamp(tts_event_created_at)
558
  turn.tts_updated_order = order
559
 
 
560
  _recompute_perceived_first_audio_latency(turn)
561
  if observed_total_latency is not None and len(turn.tts_calls) == 1:
562
  observed_ms = observed_total_latency * 1000.0
@@ -582,6 +644,7 @@ class TurnTracer:
582
  *,
583
  event_created_at: Optional[float] = None,
584
  speech_id: Optional[str] = None,
 
585
  ) -> Optional[TraceTurn]:
586
  async with self._trace_lock:
587
  normalized_text = assistant_text.strip()
@@ -598,12 +661,26 @@ class TurnTracer:
598
  normalized_text,
599
  event_created_at=resolved_event_created_at,
600
  speech_id=normalized_speech_id,
 
601
  )
602
  return None
 
 
 
 
 
 
 
 
 
 
 
 
603
  self._apply_assistant_text_to_turn(
604
  turn,
605
  normalized_text,
606
  event_created_at=resolved_event_created_at,
 
607
  )
608
  return turn
609
 
@@ -614,13 +691,21 @@ class TurnTracer:
614
  self._buffer_assistant_text(
615
  normalized_text,
616
  event_created_at=resolved_event_created_at,
 
617
  )
618
  return None
619
 
 
 
 
 
 
 
620
  self._apply_assistant_text_to_turn(
621
  turn,
622
  normalized_text,
623
  event_created_at=resolved_event_created_at,
 
624
  )
625
  return turn
626
 
@@ -891,15 +976,88 @@ class TurnTracer:
891
  turn.event_counter += 1
892
  return turn.event_counter
893
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
894
  def _select_turn_for_llm(self, speech_id: Optional[str]) -> Optional[TraceTurn]:
895
  if speech_id:
896
  matched = self._latest_turn_where(lambda c: c.speech_id == speech_id)
897
  if matched:
 
898
  return matched
899
- return self._next_turn_where(
900
  lambda c: c.speech_id is None and not c.llm_calls
901
  )
902
- return self._next_turn_where(lambda c: not c.llm_calls)
 
 
 
 
 
 
 
903
 
904
  def _select_turn_for_tts(self, speech_id: Optional[str]) -> Optional[TraceTurn]:
905
  if speech_id:
@@ -933,14 +1091,18 @@ class TurnTracer:
933
  *,
934
  event_created_at: Optional[float],
935
  ) -> Optional[TraceTurn]:
936
- candidates = [
937
- turn
938
- for turn in self._pending_trace_turns
939
- if bool(turn.llm_calls or turn.tts_calls or turn.tool_phase_open)
940
- ]
941
- if len(candidates) != 1:
942
- return None
943
- turn = candidates[0]
 
 
 
 
944
  cutoff = turn.orphan_assistant_cutoff_at
945
  if cutoff is not None:
946
  if event_created_at is None:
@@ -949,17 +1111,26 @@ class TurnTracer:
949
  return None
950
  return turn
951
 
 
 
 
 
 
 
952
  def _apply_assistant_text_to_turn(
953
  self,
954
  turn: TraceTurn,
955
  assistant_text: str,
956
  *,
957
  event_created_at: Optional[float],
 
958
  ) -> None:
959
  previous_assistant_text = turn.assistant_text or turn.response_text
960
  order = self._next_event_order(turn)
961
  turn.assistant_text = assistant_text
962
  turn.response_text = assistant_text
 
 
963
  turn.assistant_text_updated_at = _resolved_event_timestamp(event_created_at)
964
  turn.assistant_text_updated_order = order
965
  _reconcile_assistant_text_with_tts_calls(
@@ -975,6 +1146,7 @@ class TurnTracer:
975
  *,
976
  event_created_at: Optional[float],
977
  speech_id: Optional[str] = None,
 
978
  ) -> None:
979
  normalized = assistant_text.strip()
980
  if not normalized:
@@ -982,6 +1154,7 @@ class TurnTracer:
982
  record = AssistantTextRecord(
983
  text=normalized,
984
  event_created_at=_to_optional_float(event_created_at),
 
985
  )
986
  normalized_speech_id = _normalize_optional_str(speech_id)
987
  if normalized_speech_id:
@@ -1010,6 +1183,7 @@ class TurnTracer:
1010
  turn,
1011
  record.text,
1012
  event_created_at=record.event_created_at,
 
1013
  )
1014
  if not queue:
1015
  self._pending_agent_transcripts_by_speech_id.pop(speech_id, None)
@@ -1035,6 +1209,7 @@ class TurnTracer:
1035
  turn,
1036
  record.text,
1037
  event_created_at=record.event_created_at,
 
1038
  )
1039
  return True
1040
  return False
@@ -1072,12 +1247,7 @@ class TurnTracer:
1072
  turn.perceived_latency_second_audio_ms = fallback_ms
1073
 
1074
  def _is_complete(self, turn: TraceTurn) -> bool:
1075
- base_complete = bool(
1076
- turn.user_transcript
1077
- and turn.assistant_text
1078
- and turn.llm_calls
1079
- and turn.tts_calls
1080
- )
1081
  if not base_complete:
1082
  return False
1083
  if turn.tool_phase_open:
@@ -1086,8 +1256,7 @@ class TurnTracer:
1086
 
1087
  def _should_schedule_finalize_timeout(self, turn: TraceTurn) -> bool:
1088
  return bool(
1089
- turn.llm_calls
1090
- and turn.tts_calls
1091
  and not self._is_complete(turn)
1092
  and not (turn.tool_phase_open and turn.last_tool_event_at is None)
1093
  and self._resolve_finalize_timeout_sec(turn) > 0.0
@@ -1098,6 +1267,11 @@ class TurnTracer:
1098
  return self._trace_post_tool_response_timeout_sec
1099
  return self._trace_finalize_timeout_sec
1100
 
 
 
 
 
 
1101
  def _requires_post_tool_response(self, turn: TraceTurn) -> bool:
1102
  if not turn.tool_step_announced and turn.last_tool_event_order is None:
1103
  return False
@@ -1156,6 +1330,7 @@ class TurnTracer:
1156
  missing_assistant_fallback: bool = False,
1157
  tool_post_response_missing: bool = False,
1158
  drop_assistant_text: bool = False,
 
1159
  ) -> TraceTurn:
1160
  if drop_assistant_text:
1161
  turn.assistant_text = ""
@@ -1169,36 +1344,62 @@ class TurnTracer:
1169
  turn.assistant_text = turn.response_text
1170
 
1171
  if missing_assistant_fallback and not turn.assistant_text:
1172
- fallback = self._best_available_assistant_text(
1173
  turn,
1174
  min_observed_order=(
1175
  turn.last_tool_event_order if tool_post_response_missing else None
1176
  ),
1177
  include_pending_agent_transcripts=not tool_post_response_missing,
1178
  )
1179
- if fallback:
1180
- turn.assistant_text = fallback
1181
- if not turn.response_text:
1182
- turn.response_text = fallback
 
 
 
1183
  else:
1184
  tool_error_fallback = ""
1185
  if tool_post_response_missing:
1186
  tool_error_fallback = _tool_error_fallback_text(turn)
1187
 
1188
  if tool_error_fallback:
1189
- turn.assistant_text = tool_error_fallback
1190
- if not turn.response_text:
1191
- turn.response_text = tool_error_fallback
 
 
 
1192
  else:
1193
  turn.assistant_text_missing = True
1194
  unavailable = "[assistant text unavailable]"
1195
  turn.assistant_text = unavailable
 
1196
  if not turn.response_text:
1197
  turn.response_text = unavailable
 
 
 
 
 
 
 
1198
 
1199
  turn.tool_phase_open = False
1200
  if tool_post_response_missing:
1201
  turn.tool_post_response_missing = True
 
 
 
 
 
 
 
 
 
 
 
 
1202
 
1203
  self._pending_trace_turns.remove(turn)
1204
  self._cancel_finalize_timeout(turn.turn_id)
@@ -1210,7 +1411,7 @@ class TurnTracer:
1210
  *,
1211
  min_observed_order: Optional[int] = None,
1212
  include_pending_agent_transcripts: bool = True,
1213
- ) -> str:
1214
  speech_id = _normalize_optional_str(turn.speech_id)
1215
  if turn.assistant_text.strip():
1216
  if (
@@ -1220,7 +1421,7 @@ class TurnTracer:
1220
  and turn.assistant_text_updated_order > min_observed_order
1221
  )
1222
  ):
1223
- return turn.assistant_text.strip()
1224
  if turn.response_text.strip():
1225
  if (
1226
  min_observed_order is None
@@ -1229,7 +1430,7 @@ class TurnTracer:
1229
  and turn.assistant_text_updated_order > min_observed_order
1230
  )
1231
  ):
1232
- return turn.response_text.strip()
1233
  if speech_id:
1234
  buffered_exact = self._pending_agent_transcripts_by_speech_id.get(speech_id)
1235
  if buffered_exact:
@@ -1239,7 +1440,7 @@ class TurnTracer:
1239
  continue
1240
  if not buffered_exact:
1241
  self._pending_agent_transcripts_by_speech_id.pop(speech_id, None)
1242
- return record.text.strip()
1243
  for tts_call in reversed(turn.tts_calls):
1244
  if (
1245
  min_observed_order is not None
@@ -1247,16 +1448,39 @@ class TurnTracer:
1247
  ):
1248
  continue
1249
  if tts_call.assistant_text.strip():
1250
- return tts_call.assistant_text.strip()
1251
  if self._try_attach_latest_usable_orphan_assistant_text(turn):
1252
  if turn.assistant_text.strip():
1253
- return turn.assistant_text.strip()
1254
- if include_pending_agent_transcripts and self._pending_agent_transcripts:
1255
- return self._pending_agent_transcripts.popleft().strip()
1256
- return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1257
 
1258
- def _finalize_interrupted_turns_before_new_user_turn_locked(self) -> list[TraceTurn]:
 
 
 
 
 
 
1259
  completed_turns: list[TraceTurn] = []
 
1260
  for turn in list(self._pending_trace_turns):
1261
  if not (turn.user_transcript and turn.llm_calls and turn.tts_calls):
1262
  continue
@@ -1269,17 +1493,49 @@ class TurnTracer:
1269
  missing_post_tool_assistant = bool(
1270
  requires_post_tool_response and not self._post_tool_assistant_observed(turn)
1271
  )
1272
- completed_turns.append(
1273
- self._finalize_locked(
1274
  turn,
1275
- missing_assistant_fallback=(
1276
- missing_post_tool_assistant or not bool(turn.assistant_text)
1277
  ),
1278
- tool_post_response_missing=requires_post_tool_response,
1279
- drop_assistant_text=missing_post_tool_assistant,
1280
  )
1281
- )
1282
- return completed_turns
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1283
 
1284
  def _requires_post_tool_follow_up(self, turn: TraceTurn) -> bool:
1285
  if turn.last_tool_event_order is None:
@@ -1310,6 +1566,15 @@ class TurnTracer:
1310
  )
1311
  )
1312
  self._trace_finalize_tasks[turn_id] = task
 
 
 
 
 
 
 
 
 
1313
  task.add_done_callback(
1314
  lambda _task, tid=turn_id, v=version: self._on_finalize_timeout_task_done(
1315
  turn_id=tid,
@@ -1549,12 +1814,17 @@ class TurnTracer:
1549
  for llm_idx, llm_call in enumerate(
1550
  block.llm_calls, start=1
1551
  ):
 
 
 
 
1552
  phase_cursor_ns = _emit_component_span(
1553
  _tracer,
1554
  name="LLMMetrics",
1555
  context=phase_ctx,
1556
  start_ns=phase_cursor_ns,
1557
- duration_ms=llm_call.duration_ms,
 
1558
  attributes=_merge_component_attributes(
1559
  llm_call.attributes,
1560
  {
@@ -1562,6 +1832,7 @@ class TurnTracer:
1562
  "response_text": phase_text,
1563
  "ttft_ms": llm_call.ttft_ms,
1564
  "llm_total_latency_ms": llm_call.duration_ms,
 
1565
  "phase_index": block.index,
1566
  "phase_call_index": llm_idx,
1567
  },
@@ -1574,18 +1845,25 @@ class TurnTracer:
1574
  block.tts_calls, start=1
1575
  ):
1576
  spoken_text = tts_call.assistant_text or phase_text
 
 
 
 
1577
  phase_cursor_ns = _emit_component_span(
1578
  _tracer,
1579
  name="TTSMetrics",
1580
  context=phase_ctx,
1581
  start_ns=phase_cursor_ns,
1582
- duration_ms=tts_call.duration_ms,
 
1583
  attributes=_merge_component_attributes(
1584
  tts_call.attributes,
1585
  {
1586
  "assistant_text": spoken_text,
1587
  "assistant_text_missing": turn.assistant_text_missing,
1588
  "ttfb_ms": tts_call.ttfb_ms,
 
 
1589
  "phase_index": block.index,
1590
  "phase_call_index": tts_idx,
1591
  },
@@ -1699,12 +1977,17 @@ class TurnTracer:
1699
  )
1700
 
1701
  logger.info(
1702
- "Langfuse turn trace emitted: trace_id=%s turn_id=%s session_id=%s room_id=%s participant_id=%s",
1703
  turn.trace_id,
1704
  turn.turn_id,
1705
  turn.session_id,
1706
  turn.room_id,
1707
  turn.participant_id,
 
 
 
 
 
1708
  )
1709
  asyncio.create_task(self._flush_tracer_provider())
1710
  except Exception as exc:
@@ -2422,6 +2705,7 @@ def _set_root_attributes(
2422
  "langfuse.trace.metadata.participant_id": turn.participant_id,
2423
  "langfuse.trace.metadata.turn_id": turn.turn_id,
2424
  "langfuse.trace.metadata.assistant_text_missing": turn.assistant_text_missing,
 
2425
  "langfuse.trace.metadata.stt_status": turn.stt_status,
2426
  "langfuse.trace.metadata.tool_phase_announced": turn.tool_step_announced,
2427
  "langfuse.trace.metadata.tool_post_response_missing": turn.tool_post_response_missing,
@@ -2429,6 +2713,8 @@ def _set_root_attributes(
2429
  "langfuse.trace.metadata.assistant_audio_started": turn.assistant_audio_started,
2430
  "langfuse.trace.metadata.interrupted": turn.interrupted,
2431
  "langfuse.trace.metadata.interrupted_reason": turn.interrupted_reason,
 
 
2432
  "langfuse.trace.metadata.coalesced_turn_count": len(turn.coalesced_turn_ids),
2433
  "langfuse.trace.metadata.coalesced_fragment_count": turn.coalesced_fragment_count,
2434
  "langfuse.trace.metadata.coalesced_turn_ids": turn.coalesced_turn_ids,
@@ -2510,6 +2796,30 @@ def _duration_attribute_to_ms(value: Any) -> Optional[float]:
2510
  return None
2511
 
2512
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2513
  def _tool_calls_total_duration_ms(tool_calls: list[ToolCallTrace]) -> float:
2514
  total = 0.0
2515
  for call in tool_calls:
@@ -2538,6 +2848,10 @@ def _merge_user_transcripts(existing: str, incoming: str) -> str:
2538
  return left
2539
  if left == right:
2540
  return left
 
 
 
 
2541
  if right.startswith(left):
2542
  return right
2543
  if left.startswith(right):
@@ -2562,12 +2876,19 @@ def _emit_component_span(
2562
  context: Any,
2563
  start_ns: int,
2564
  duration_ms: Optional[float],
 
2565
  attributes: dict[str, Any],
2566
  observation_input: Optional[str] = None,
2567
  observation_output: Optional[str] = None,
2568
  ) -> int:
2569
  actual_ms = max(duration_ms, 0.0) if duration_ms is not None else None
 
 
 
 
 
2570
  end_ns = start_ns + _ms_to_ns(actual_ms or 0.0)
 
2571
 
2572
  span = _tracer.start_span(name, context=context, start_time=start_ns)
2573
  try:
@@ -2584,7 +2905,7 @@ def _emit_component_span(
2584
  span.set_attribute(key, value)
2585
  finally:
2586
  _close_span_at(span, end_ns)
2587
- return end_ns
2588
 
2589
 
2590
  def _close_span_at(span: Any, end_ns: int) -> None:
 
80
  assistant_audio_started_at: Optional[float] = None
81
  interrupted: bool = False
82
  interrupted_reason: Optional[str] = None
83
+ finalization_reason: Optional[str] = None
84
+ assistant_text_source: Optional[str] = None
85
+ emit_ready_at: Optional[float] = None
86
  orphan_assistant_cutoff_at: Optional[float] = None
87
  coalesced_turn_ids: list[str] = field(default_factory=list)
88
  coalesced_user_transcripts: list[str] = field(default_factory=list)
 
144
 
145
  text: str
146
  event_created_at: Optional[float] = None
147
+ source: Optional[str] = None
148
 
149
 
150
  @dataclass
 
174
 
175
 
176
  _DEFAULT_TRACE_FINALIZE_TIMEOUT_MS = 8000.0
177
+ _DEFAULT_ASSISTANT_TEXT_GRACE_TIMEOUT_MS = 500.0
178
  _DEFAULT_POST_TOOL_RESPONSE_TIMEOUT_MS = 30000.0
179
  _DEFAULT_MAX_PENDING_TRACE_TASKS = 200
180
  _DEFAULT_TRACE_FLUSH_TIMEOUT_SEC = 1.0
 
220
  self._trace_finalize_tasks: dict[str, asyncio.Task[None]] = {}
221
  self._trace_finalize_task_versions: dict[str, int] = {}
222
 
223
+ assistant_text_grace_timeout_ms = float(
224
+ getattr(
225
+ settings.langfuse,
226
+ "LANGFUSE_ASSISTANT_TEXT_GRACE_TIMEOUT_MS",
227
+ _DEFAULT_ASSISTANT_TEXT_GRACE_TIMEOUT_MS,
228
+ )
229
+ )
230
  self._trace_finalize_timeout_sec = (
231
+ max(
232
+ assistant_text_grace_timeout_ms,
233
+ 0.0,
234
+ )
235
+ / 1000.0
236
+ )
237
+ self._trace_legacy_finalize_timeout_sec = (
238
  max(
239
  getattr(
240
  settings.langfuse,
 
342
 
343
  async def create_turn(self, *, user_transcript: str, room_id: str) -> None:
344
  completed_turns: list[TraceTurn] = []
345
+ timeout_schedules: list[tuple[str, float]] = []
346
  async with self._trace_lock:
347
  normalized = user_transcript.strip()
348
  if not normalized:
349
  return
350
 
351
+ (
352
+ completed_turns,
353
+ timeout_schedules,
354
+ ) = self._finalize_interrupted_turns_before_new_user_turn_locked()
355
 
356
+ current_turn = self._latest_turn_where(
357
+ self._turn_accepting_additional_user_input
358
+ )
359
  if current_turn is not None:
360
  self._update_user_turn_text(current_turn, normalized)
361
  else:
 
379
 
380
  for completed_turn in completed_turns:
381
  self._schedule_trace_emit(completed_turn)
382
+ for turn_id, timeout_sec in timeout_schedules:
383
+ self._schedule_finalize_timeout(turn_id, timeout_sec)
384
 
385
  async def attach_user_text(
386
  self,
387
  user_transcript: str,
388
  *,
389
  event_created_at: Optional[float] = None,
390
+ speech_id: Optional[str] = None,
391
  ) -> Optional[TraceTurn]:
392
  async with self._trace_lock:
393
+ normalized_speech_id = _normalize_optional_str(speech_id)
394
+ turn: Optional[TraceTurn] = None
395
+ if normalized_speech_id:
396
+ turn = self._latest_turn_where(
397
+ lambda c: c.speech_id == normalized_speech_id
398
+ )
399
+ if turn is None:
400
+ turn = self._latest_turn_where(
401
+ lambda c: c.speech_id is None and not c.llm_calls
402
+ )
403
+ if turn is not None:
404
+ turn.speech_id = normalized_speech_id
405
+ if turn is not None:
406
+ self._absorb_pending_pre_llm_turns(turn)
407
+
408
+ if turn is None:
409
+ turn = self._latest_turn_where(
410
+ self._turn_accepting_additional_user_input
411
+ )
412
+ if turn is None:
413
+ turn = self._latest_turn_where(lambda c: not c.assistant_text.strip())
414
  if turn is None:
415
  turn = self._latest_turn_where(lambda _: True)
416
  if turn is None:
 
425
  normalized,
426
  event_created_at=event_created_at,
427
  )
428
+ self._maybe_mark_emit_ready(turn)
429
  turn.user_turn_committed = True
430
  turn.user_turn_committed_at = _resolved_event_timestamp(
431
  _to_optional_float(event_created_at)
 
548
  if turn.llm_ttft_ms is None:
549
  turn.llm_ttft_ms = llm_call.ttft_ms
550
  turn.llm_total_latency_ms = _sum_llm_duration_ms(turn.llm_calls)
551
+ self._maybe_mark_emit_ready(turn)
552
  _recompute_perceived_first_audio_latency(turn)
553
  return turn
554
 
 
574
  self._apply_buffered_assistant_text_for_speech_id(turn)
575
 
576
  tts_attrs = _sanitize_component_attributes(metric_attributes)
577
+ tts_metric_assistant_text = _assistant_text_from_component_attributes(tts_attrs)
578
+ if tts_metric_assistant_text and (
579
+ not turn.assistant_text.strip()
580
+ or turn.assistant_text_source == "tts_metrics"
581
+ ):
582
+ self._apply_assistant_text_to_turn(
583
+ turn,
584
+ tts_metric_assistant_text,
585
+ event_created_at=_to_optional_float(tts_attrs.get("timestamp")),
586
+ source="tts_metrics",
587
+ )
588
  order = self._next_event_order(turn)
589
  tts_call = TTSCallTrace(
590
  duration_ms=_duration_to_ms(duration, fallback_duration),
 
618
  turn.tts_updated_at = _resolved_event_timestamp(tts_event_created_at)
619
  turn.tts_updated_order = order
620
 
621
+ self._maybe_mark_emit_ready(turn)
622
  _recompute_perceived_first_audio_latency(turn)
623
  if observed_total_latency is not None and len(turn.tts_calls) == 1:
624
  observed_ms = observed_total_latency * 1000.0
 
644
  *,
645
  event_created_at: Optional[float] = None,
646
  speech_id: Optional[str] = None,
647
+ source: Optional[str] = None,
648
  ) -> Optional[TraceTurn]:
649
  async with self._trace_lock:
650
  normalized_text = assistant_text.strip()
 
661
  normalized_text,
662
  event_created_at=resolved_event_created_at,
663
  speech_id=normalized_speech_id,
664
+ source=source,
665
  )
666
  return None
667
+ if (
668
+ source == "tts_metrics"
669
+ and turn.assistant_text.strip()
670
+ and turn.assistant_text_source not in {None, "tts_metrics", "unavailable"}
671
+ ):
672
+ return turn
673
+ if (
674
+ source == "speech_done"
675
+ and turn.assistant_text.strip()
676
+ and turn.assistant_text_source == "speech_item_added"
677
+ ):
678
+ return turn
679
  self._apply_assistant_text_to_turn(
680
  turn,
681
  normalized_text,
682
  event_created_at=resolved_event_created_at,
683
+ source=source,
684
  )
685
  return turn
686
 
 
691
  self._buffer_assistant_text(
692
  normalized_text,
693
  event_created_at=resolved_event_created_at,
694
+ source=source,
695
  )
696
  return None
697
 
698
+ if (
699
+ source == "tts_metrics"
700
+ and turn.assistant_text.strip()
701
+ and turn.assistant_text_source not in {None, "tts_metrics", "unavailable"}
702
+ ):
703
+ return turn
704
  self._apply_assistant_text_to_turn(
705
  turn,
706
  normalized_text,
707
  event_created_at=resolved_event_created_at,
708
+ source=source,
709
  )
710
  return turn
711
 
 
976
  turn.event_counter += 1
977
  return turn.event_counter
978
 
979
+ def _is_emit_ready(self, turn: TraceTurn) -> bool:
980
+ return bool(turn.user_transcript and turn.llm_calls and turn.tts_calls)
981
+
982
+ def _maybe_mark_emit_ready(self, turn: TraceTurn) -> None:
983
+ if turn.emit_ready_at is not None:
984
+ return
985
+ if not self._is_emit_ready(turn):
986
+ return
987
+ turn.emit_ready_at = time()
988
+
989
+ def _turn_accepting_additional_user_input(self, turn: TraceTurn) -> bool:
990
+ if turn.llm_calls or turn.tts_calls:
991
+ return False
992
+ if turn.assistant_text.strip() or turn.response_text.strip():
993
+ return False
994
+ if turn.interrupted:
995
+ return False
996
+ if not turn.user_turn_committed:
997
+ return True
998
+ return turn.speech_id is not None
999
+
1000
+ def _absorb_pending_pre_llm_turns(self, anchor_turn: TraceTurn) -> None:
1001
+ if anchor_turn not in self._pending_trace_turns:
1002
+ return
1003
+ try:
1004
+ anchor_index = self._pending_trace_turns.index(anchor_turn)
1005
+ except ValueError:
1006
+ return
1007
+
1008
+ anchor_speech_id = _normalize_optional_str(anchor_turn.speech_id)
1009
+ absorbed_turns: list[TraceTurn] = []
1010
+ for candidate in list(self._pending_trace_turns)[anchor_index + 1 :]:
1011
+ candidate_speech_id = _normalize_optional_str(candidate.speech_id)
1012
+ if candidate.llm_calls or candidate.tts_calls:
1013
+ continue
1014
+ if candidate.assistant_text.strip() or candidate.response_text.strip():
1015
+ continue
1016
+ if candidate.tool_step_announced or candidate.tool_executions:
1017
+ continue
1018
+ if candidate.last_tool_event_order is not None:
1019
+ continue
1020
+ if candidate.interrupted:
1021
+ continue
1022
+ if candidate_speech_id not in {None, anchor_speech_id}:
1023
+ continue
1024
+ if candidate.user_transcript.strip():
1025
+ self._update_user_turn_text(
1026
+ anchor_turn,
1027
+ candidate.user_transcript,
1028
+ event_created_at=candidate.user_transcript_updated_at,
1029
+ )
1030
+ anchor_turn.user_turn_committed = (
1031
+ anchor_turn.user_turn_committed or candidate.user_turn_committed
1032
+ )
1033
+ if candidate.user_turn_committed_at is not None:
1034
+ anchor_turn.user_turn_committed_at = max(
1035
+ anchor_turn.user_turn_committed_at or candidate.user_turn_committed_at,
1036
+ candidate.user_turn_committed_at,
1037
+ )
1038
+ absorbed_turns.append(candidate)
1039
+
1040
+ for absorbed_turn in absorbed_turns:
1041
+ self._pending_trace_turns.remove(absorbed_turn)
1042
+ self._cancel_finalize_timeout(absorbed_turn.turn_id)
1043
+
1044
  def _select_turn_for_llm(self, speech_id: Optional[str]) -> Optional[TraceTurn]:
1045
  if speech_id:
1046
  matched = self._latest_turn_where(lambda c: c.speech_id == speech_id)
1047
  if matched:
1048
+ self._absorb_pending_pre_llm_turns(matched)
1049
  return matched
1050
+ matched = self._latest_turn_where(
1051
  lambda c: c.speech_id is None and not c.llm_calls
1052
  )
1053
+ if matched is not None:
1054
+ matched.speech_id = speech_id
1055
+ self._absorb_pending_pre_llm_turns(matched)
1056
+ return matched
1057
+ matched = self._next_turn_where(lambda c: not c.llm_calls)
1058
+ if matched is not None:
1059
+ self._absorb_pending_pre_llm_turns(matched)
1060
+ return matched
1061
 
1062
  def _select_turn_for_tts(self, speech_id: Optional[str]) -> Optional[TraceTurn]:
1063
  if speech_id:
 
1091
  *,
1092
  event_created_at: Optional[float],
1093
  ) -> Optional[TraceTurn]:
1094
+ candidates = self._assistant_text_correlation_candidates()
1095
+ if len(candidates) == 1:
1096
+ turn = candidates[0]
1097
+ else:
1098
+ emit_ready_candidates = [
1099
+ turn
1100
+ for turn in candidates
1101
+ if self._is_emit_ready(turn) and self._emit_ready_turn_is_recent(turn)
1102
+ ]
1103
+ if len(emit_ready_candidates) != 1:
1104
+ return None
1105
+ turn = emit_ready_candidates[0]
1106
  cutoff = turn.orphan_assistant_cutoff_at
1107
  if cutoff is not None:
1108
  if event_created_at is None:
 
1111
  return None
1112
  return turn
1113
 
1114
+ def _emit_ready_turn_is_recent(self, turn: TraceTurn) -> bool:
1115
+ if turn.emit_ready_at is None:
1116
+ return False
1117
+ recent_window_sec = max(self._trace_finalize_timeout_sec, 1.0)
1118
+ return (time() - turn.emit_ready_at) <= recent_window_sec
1119
+
1120
  def _apply_assistant_text_to_turn(
1121
  self,
1122
  turn: TraceTurn,
1123
  assistant_text: str,
1124
  *,
1125
  event_created_at: Optional[float],
1126
+ source: Optional[str],
1127
  ) -> None:
1128
  previous_assistant_text = turn.assistant_text or turn.response_text
1129
  order = self._next_event_order(turn)
1130
  turn.assistant_text = assistant_text
1131
  turn.response_text = assistant_text
1132
+ turn.assistant_text_missing = False
1133
+ turn.assistant_text_source = source or turn.assistant_text_source or "unknown"
1134
  turn.assistant_text_updated_at = _resolved_event_timestamp(event_created_at)
1135
  turn.assistant_text_updated_order = order
1136
  _reconcile_assistant_text_with_tts_calls(
 
1146
  *,
1147
  event_created_at: Optional[float],
1148
  speech_id: Optional[str] = None,
1149
+ source: Optional[str] = None,
1150
  ) -> None:
1151
  normalized = assistant_text.strip()
1152
  if not normalized:
 
1154
  record = AssistantTextRecord(
1155
  text=normalized,
1156
  event_created_at=_to_optional_float(event_created_at),
1157
+ source=source,
1158
  )
1159
  normalized_speech_id = _normalize_optional_str(speech_id)
1160
  if normalized_speech_id:
 
1183
  turn,
1184
  record.text,
1185
  event_created_at=record.event_created_at,
1186
+ source=record.source or "buffered_exact",
1187
  )
1188
  if not queue:
1189
  self._pending_agent_transcripts_by_speech_id.pop(speech_id, None)
 
1209
  turn,
1210
  record.text,
1211
  event_created_at=record.event_created_at,
1212
+ source=record.source or "orphan_buffer",
1213
  )
1214
  return True
1215
  return False
 
1247
  turn.perceived_latency_second_audio_ms = fallback_ms
1248
 
1249
  def _is_complete(self, turn: TraceTurn) -> bool:
1250
+ base_complete = bool(self._is_emit_ready(turn) and turn.assistant_text)
 
 
 
 
 
1251
  if not base_complete:
1252
  return False
1253
  if turn.tool_phase_open:
 
1256
 
1257
  def _should_schedule_finalize_timeout(self, turn: TraceTurn) -> bool:
1258
  return bool(
1259
+ self._is_emit_ready(turn)
 
1260
  and not self._is_complete(turn)
1261
  and not (turn.tool_phase_open and turn.last_tool_event_at is None)
1262
  and self._resolve_finalize_timeout_sec(turn) > 0.0
 
1267
  return self._trace_post_tool_response_timeout_sec
1268
  return self._trace_finalize_timeout_sec
1269
 
1270
+ def _finalize_wait_reason(self, turn: TraceTurn) -> str:
1271
+ if self._requires_post_tool_response(turn):
1272
+ return "post_tool_response"
1273
+ return "assistant_text_grace"
1274
+
1275
  def _requires_post_tool_response(self, turn: TraceTurn) -> bool:
1276
  if not turn.tool_step_announced and turn.last_tool_event_order is None:
1277
  return False
 
1330
  missing_assistant_fallback: bool = False,
1331
  tool_post_response_missing: bool = False,
1332
  drop_assistant_text: bool = False,
1333
+ finalization_reason: Optional[str] = None,
1334
  ) -> TraceTurn:
1335
  if drop_assistant_text:
1336
  turn.assistant_text = ""
 
1344
  turn.assistant_text = turn.response_text
1345
 
1346
  if missing_assistant_fallback and not turn.assistant_text:
1347
+ fallback_text, fallback_source = self._best_available_assistant_text(
1348
  turn,
1349
  min_observed_order=(
1350
  turn.last_tool_event_order if tool_post_response_missing else None
1351
  ),
1352
  include_pending_agent_transcripts=not tool_post_response_missing,
1353
  )
1354
+ if fallback_text:
1355
+ self._apply_assistant_text_to_turn(
1356
+ turn,
1357
+ fallback_text,
1358
+ event_created_at=None,
1359
+ source=fallback_source or "unknown",
1360
+ )
1361
  else:
1362
  tool_error_fallback = ""
1363
  if tool_post_response_missing:
1364
  tool_error_fallback = _tool_error_fallback_text(turn)
1365
 
1366
  if tool_error_fallback:
1367
+ self._apply_assistant_text_to_turn(
1368
+ turn,
1369
+ tool_error_fallback,
1370
+ event_created_at=None,
1371
+ source="tool_fallback",
1372
+ )
1373
  else:
1374
  turn.assistant_text_missing = True
1375
  unavailable = "[assistant text unavailable]"
1376
  turn.assistant_text = unavailable
1377
+ turn.assistant_text_source = "unavailable"
1378
  if not turn.response_text:
1379
  turn.response_text = unavailable
1380
+ logger.warning(
1381
+ "Langfuse turn finalized without assistant text: turn_id=%s speech_id=%s reason=%s",
1382
+ turn.turn_id,
1383
+ turn.speech_id,
1384
+ finalization_reason
1385
+ or ("post_tool_timeout" if tool_post_response_missing else "assistant_text_grace_timeout"),
1386
+ )
1387
 
1388
  turn.tool_phase_open = False
1389
  if tool_post_response_missing:
1390
  turn.tool_post_response_missing = True
1391
+ if finalization_reason is None:
1392
+ if turn.interrupted_reason == "shutdown_drain":
1393
+ finalization_reason = "shutdown_drain"
1394
+ elif turn.interrupted and turn.assistant_audio_started:
1395
+ finalization_reason = "interrupted_after_audio"
1396
+ elif tool_post_response_missing:
1397
+ finalization_reason = "post_tool_timeout"
1398
+ elif missing_assistant_fallback:
1399
+ finalization_reason = "assistant_text_grace_timeout"
1400
+ else:
1401
+ finalization_reason = "complete"
1402
+ turn.finalization_reason = finalization_reason
1403
 
1404
  self._pending_trace_turns.remove(turn)
1405
  self._cancel_finalize_timeout(turn.turn_id)
 
1411
  *,
1412
  min_observed_order: Optional[int] = None,
1413
  include_pending_agent_transcripts: bool = True,
1414
+ ) -> tuple[str, Optional[str]]:
1415
  speech_id = _normalize_optional_str(turn.speech_id)
1416
  if turn.assistant_text.strip():
1417
  if (
 
1421
  and turn.assistant_text_updated_order > min_observed_order
1422
  )
1423
  ):
1424
+ return turn.assistant_text.strip(), turn.assistant_text_source
1425
  if turn.response_text.strip():
1426
  if (
1427
  min_observed_order is None
 
1430
  and turn.assistant_text_updated_order > min_observed_order
1431
  )
1432
  ):
1433
+ return turn.response_text.strip(), turn.assistant_text_source
1434
  if speech_id:
1435
  buffered_exact = self._pending_agent_transcripts_by_speech_id.get(speech_id)
1436
  if buffered_exact:
 
1440
  continue
1441
  if not buffered_exact:
1442
  self._pending_agent_transcripts_by_speech_id.pop(speech_id, None)
1443
+ return record.text.strip(), record.source or "buffered_exact"
1444
  for tts_call in reversed(turn.tts_calls):
1445
  if (
1446
  min_observed_order is not None
 
1448
  ):
1449
  continue
1450
  if tts_call.assistant_text.strip():
1451
+ return tts_call.assistant_text.strip(), "tts_metrics"
1452
  if self._try_attach_latest_usable_orphan_assistant_text(turn):
1453
  if turn.assistant_text.strip():
1454
+ return turn.assistant_text.strip(), turn.assistant_text_source
1455
+ if (
1456
+ include_pending_agent_transcripts
1457
+ and self._pending_agent_transcripts
1458
+ and self._can_consume_pending_agent_transcript_for_turn(turn)
1459
+ ):
1460
+ return self._pending_agent_transcripts.popleft().strip(), "pending_agent_transcript"
1461
+ return "", None
1462
+
1463
+ def _assistant_text_correlation_candidates(self) -> list[TraceTurn]:
1464
+ return [
1465
+ turn
1466
+ for turn in self._pending_trace_turns
1467
+ if bool(turn.llm_calls or turn.tts_calls or turn.tool_phase_open)
1468
+ and not (turn.interrupted and turn.assistant_audio_started)
1469
+ ]
1470
+
1471
+ def _can_consume_pending_agent_transcript_for_turn(self, turn: TraceTurn) -> bool:
1472
+ candidates = self._assistant_text_correlation_candidates()
1473
+ return len(candidates) == 1 and candidates[0] is turn
1474
 
1475
+ def _has_active_finalize_timeout(self, turn_id: str) -> bool:
1476
+ task = self._trace_finalize_tasks.get(turn_id)
1477
+ return task is not None and not task.done()
1478
+
1479
+ def _finalize_interrupted_turns_before_new_user_turn_locked(
1480
+ self,
1481
+ ) -> tuple[list[TraceTurn], list[tuple[str, float]]]:
1482
  completed_turns: list[TraceTurn] = []
1483
+ timeout_schedules: list[tuple[str, float]] = []
1484
  for turn in list(self._pending_trace_turns):
1485
  if not (turn.user_transcript and turn.llm_calls and turn.tts_calls):
1486
  continue
 
1493
  missing_post_tool_assistant = bool(
1494
  requires_post_tool_response and not self._post_tool_assistant_observed(turn)
1495
  )
1496
+ if not turn.assistant_text.strip() or missing_post_tool_assistant:
1497
+ fallback_text, fallback_source = self._best_available_assistant_text(
1498
  turn,
1499
+ min_observed_order=(
1500
+ turn.last_tool_event_order if missing_post_tool_assistant else None
1501
  ),
1502
+ include_pending_agent_transcripts=not missing_post_tool_assistant,
 
1503
  )
1504
+ if fallback_text:
1505
+ self._apply_assistant_text_to_turn(
1506
+ turn,
1507
+ fallback_text,
1508
+ event_created_at=None,
1509
+ source=fallback_source or "unknown",
1510
+ )
1511
+ missing_post_tool_assistant = False
1512
+
1513
+ if turn.assistant_text.strip() and not missing_post_tool_assistant:
1514
+ completed_turns.append(
1515
+ self._finalize_locked(
1516
+ turn,
1517
+ missing_assistant_fallback=False,
1518
+ tool_post_response_missing=False,
1519
+ drop_assistant_text=False,
1520
+ )
1521
+ )
1522
+ continue
1523
+
1524
+ timeout_sec = self._resolve_finalize_timeout_sec(turn)
1525
+ if timeout_sec <= 0.0:
1526
+ completed_turns.append(
1527
+ self._finalize_locked(
1528
+ turn,
1529
+ missing_assistant_fallback=True,
1530
+ tool_post_response_missing=requires_post_tool_response,
1531
+ drop_assistant_text=missing_post_tool_assistant,
1532
+ )
1533
+ )
1534
+ continue
1535
+ if self._has_active_finalize_timeout(turn.turn_id):
1536
+ continue
1537
+ timeout_schedules.append((turn.turn_id, timeout_sec))
1538
+ return completed_turns, timeout_schedules
1539
 
1540
  def _requires_post_tool_follow_up(self, turn: TraceTurn) -> bool:
1541
  if turn.last_tool_event_order is None:
 
1566
  )
1567
  )
1568
  self._trace_finalize_tasks[turn_id] = task
1569
+ turn = next((t for t in self._pending_trace_turns if t.turn_id == turn_id), None)
1570
+ if turn is not None:
1571
+ logger.debug(
1572
+ "Scheduled Langfuse finalize wait: turn_id=%s speech_id=%s timeout_sec=%.3f wait_reason=%s",
1573
+ turn.turn_id,
1574
+ turn.speech_id,
1575
+ timeout_sec,
1576
+ self._finalize_wait_reason(turn),
1577
+ )
1578
  task.add_done_callback(
1579
  lambda _task, tid=turn_id, v=version: self._on_finalize_timeout_task_done(
1580
  turn_id=tid,
 
1814
  for llm_idx, llm_call in enumerate(
1815
  block.llm_calls, start=1
1816
  ):
1817
+ llm_visible_latency_ms = _preferred_visible_latency_ms(
1818
+ llm_call.ttft_ms,
1819
+ llm_call.duration_ms,
1820
+ )
1821
  phase_cursor_ns = _emit_component_span(
1822
  _tracer,
1823
  name="LLMMetrics",
1824
  context=phase_ctx,
1825
  start_ns=phase_cursor_ns,
1826
+ duration_ms=llm_visible_latency_ms,
1827
+ advance_ms=llm_call.duration_ms,
1828
  attributes=_merge_component_attributes(
1829
  llm_call.attributes,
1830
  {
 
1832
  "response_text": phase_text,
1833
  "ttft_ms": llm_call.ttft_ms,
1834
  "llm_total_latency_ms": llm_call.duration_ms,
1835
+ "total_duration_ms": llm_call.duration_ms,
1836
  "phase_index": block.index,
1837
  "phase_call_index": llm_idx,
1838
  },
 
1845
  block.tts_calls, start=1
1846
  ):
1847
  spoken_text = tts_call.assistant_text or phase_text
1848
+ tts_visible_latency_ms = _preferred_visible_latency_ms(
1849
+ tts_call.ttfb_ms,
1850
+ tts_call.duration_ms,
1851
+ )
1852
  phase_cursor_ns = _emit_component_span(
1853
  _tracer,
1854
  name="TTSMetrics",
1855
  context=phase_ctx,
1856
  start_ns=phase_cursor_ns,
1857
+ duration_ms=tts_visible_latency_ms,
1858
+ advance_ms=tts_call.duration_ms,
1859
  attributes=_merge_component_attributes(
1860
  tts_call.attributes,
1861
  {
1862
  "assistant_text": spoken_text,
1863
  "assistant_text_missing": turn.assistant_text_missing,
1864
  "ttfb_ms": tts_call.ttfb_ms,
1865
+ "tts_total_latency_ms": tts_call.duration_ms,
1866
+ "total_duration_ms": tts_call.duration_ms,
1867
  "phase_index": block.index,
1868
  "phase_call_index": tts_idx,
1869
  },
 
1977
  )
1978
 
1979
  logger.info(
1980
+ "Langfuse turn trace emitted: trace_id=%s turn_id=%s session_id=%s room_id=%s participant_id=%s finalization_reason=%s assistant_text_source=%s emit_wait_ms=%.1f",
1981
  turn.trace_id,
1982
  turn.turn_id,
1983
  turn.session_id,
1984
  turn.room_id,
1985
  turn.participant_id,
1986
+ turn.finalization_reason,
1987
+ turn.assistant_text_source,
1988
+ max((time() - turn.emit_ready_at) * 1000.0, 0.0)
1989
+ if turn.emit_ready_at is not None
1990
+ else 0.0,
1991
  )
1992
  asyncio.create_task(self._flush_tracer_provider())
1993
  except Exception as exc:
 
2705
  "langfuse.trace.metadata.participant_id": turn.participant_id,
2706
  "langfuse.trace.metadata.turn_id": turn.turn_id,
2707
  "langfuse.trace.metadata.assistant_text_missing": turn.assistant_text_missing,
2708
+ "langfuse.trace.metadata.assistant_text_source": turn.assistant_text_source,
2709
  "langfuse.trace.metadata.stt_status": turn.stt_status,
2710
  "langfuse.trace.metadata.tool_phase_announced": turn.tool_step_announced,
2711
  "langfuse.trace.metadata.tool_post_response_missing": turn.tool_post_response_missing,
 
2713
  "langfuse.trace.metadata.assistant_audio_started": turn.assistant_audio_started,
2714
  "langfuse.trace.metadata.interrupted": turn.interrupted,
2715
  "langfuse.trace.metadata.interrupted_reason": turn.interrupted_reason,
2716
+ "langfuse.trace.metadata.finalization_reason": turn.finalization_reason,
2717
+ "langfuse.trace.metadata.emit_ready_at": turn.emit_ready_at,
2718
  "langfuse.trace.metadata.coalesced_turn_count": len(turn.coalesced_turn_ids),
2719
  "langfuse.trace.metadata.coalesced_fragment_count": turn.coalesced_fragment_count,
2720
  "langfuse.trace.metadata.coalesced_turn_ids": turn.coalesced_turn_ids,
 
2796
  return None
2797
 
2798
 
2799
+ def _assistant_text_from_component_attributes(attributes: dict[str, Any]) -> str:
2800
+ for key in (
2801
+ "assistant_text",
2802
+ "spoken_text",
2803
+ "metadata.assistant_text",
2804
+ "metadata.spoken_text",
2805
+ ):
2806
+ value = attributes.get(key)
2807
+ if isinstance(value, str) and value.strip():
2808
+ return value.strip()
2809
+ return ""
2810
+
2811
+
2812
+ def _preferred_visible_latency_ms(
2813
+ preferred_ms: Optional[float],
2814
+ fallback_ms: Optional[float],
2815
+ ) -> Optional[float]:
2816
+ if preferred_ms is not None and preferred_ms >= 0.0:
2817
+ return preferred_ms
2818
+ if fallback_ms is not None and fallback_ms >= 0.0:
2819
+ return fallback_ms
2820
+ return None
2821
+
2822
+
2823
  def _tool_calls_total_duration_ms(tool_calls: list[ToolCallTrace]) -> float:
2824
  total = 0.0
2825
  for call in tool_calls:
 
2848
  return left
2849
  if left == right:
2850
  return left
2851
+ if left.casefold() in right.casefold():
2852
+ return right
2853
+ if right.casefold() in left.casefold():
2854
+ return left
2855
  if right.startswith(left):
2856
  return right
2857
  if left.startswith(right):
 
2876
  context: Any,
2877
  start_ns: int,
2878
  duration_ms: Optional[float],
2879
+ advance_ms: Optional[float] = None,
2880
  attributes: dict[str, Any],
2881
  observation_input: Optional[str] = None,
2882
  observation_output: Optional[str] = None,
2883
  ) -> int:
2884
  actual_ms = max(duration_ms, 0.0) if duration_ms is not None else None
2885
+ cursor_advance_ms = actual_ms
2886
+ if advance_ms is not None:
2887
+ cursor_advance_ms = max(advance_ms, 0.0)
2888
+ if actual_ms is not None:
2889
+ cursor_advance_ms = max(cursor_advance_ms, actual_ms)
2890
  end_ns = start_ns + _ms_to_ns(actual_ms or 0.0)
2891
+ next_cursor_ns = start_ns + _ms_to_ns(cursor_advance_ms or 0.0)
2892
 
2893
  span = _tracer.start_span(name, context=context, start_time=start_ns)
2894
  try:
 
2905
  span.set_attribute(key, value)
2906
  finally:
2907
  _close_span_at(span, end_ns)
2908
+ return max(end_ns, next_cursor_ns)
2909
 
2910
 
2911
  def _close_span_at(span: Any, end_ns: int) -> None:
src/core/settings.py CHANGED
@@ -198,7 +198,7 @@ class VoiceSettings(CoreSettings):
198
  class STTSettings(CoreSettings):
199
  # Provider selection
200
  STT_PROVIDER: str = Field(
201
- default="deepgram",
202
  description="STT provider: 'moonshine', 'nvidia', or 'deepgram'"
203
  )
204
 
@@ -395,11 +395,23 @@ class LangfuseSettings(CoreSettings):
395
  default=True,
396
  description="Mark emitted Langfuse traces as public for shareable URLs",
397
  )
 
 
 
 
 
 
 
 
 
398
  LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS: float = Field(
399
  default=8000.0,
400
  ge=0.0,
401
  le=10000.0,
402
- description="Timeout to wait for assistant text before force-finalizing trace",
 
 
 
403
  )
404
  LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS: float = Field(
405
  default=30000.0,
 
198
  class STTSettings(CoreSettings):
199
  # Provider selection
200
  STT_PROVIDER: str = Field(
201
+ default="moonshine",
202
  description="STT provider: 'moonshine', 'nvidia', or 'deepgram'"
203
  )
204
 
 
395
  default=True,
396
  description="Mark emitted Langfuse traces as public for shareable URLs",
397
  )
398
+ LANGFUSE_ASSISTANT_TEXT_GRACE_TIMEOUT_MS: float = Field(
399
+ default=500.0,
400
+ ge=0.0,
401
+ le=10000.0,
402
+ description=(
403
+ "Short grace window to wait for assistant text on regular turns before "
404
+ "force-finalizing the trace"
405
+ ),
406
+ )
407
  LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS: float = Field(
408
  default=8000.0,
409
  ge=0.0,
410
  le=10000.0,
411
+ description=(
412
+ "Legacy finalize timeout retained for compatibility; regular turns use "
413
+ "LANGFUSE_ASSISTANT_TEXT_GRACE_TIMEOUT_MS"
414
+ ),
415
  )
416
  LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS: float = Field(
417
  default=30000.0,
src/plugins/pocket_tts/tts.py CHANGED
@@ -8,7 +8,7 @@ import threading
8
  import time
9
  from collections.abc import AsyncIterator
10
  from dataclasses import dataclass
11
- from typing import Any, Protocol, cast
12
 
13
  import numpy as np
14
  from pocket_tts import TTSModel
@@ -35,14 +35,6 @@ _SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
35
  _WHITESPACE_RE = re.compile(r"\s+")
36
 
37
 
38
- class TTSMetricsCallback(Protocol):
39
- # duration is end-to-end wall-clock synth time, not model-only compute time.
40
- def __call__(self, *, ttfb: float, duration: float, audio_duration: float) -> None: ...
41
-
42
-
43
- OptionalTTSMetricsCallback = TTSMetricsCallback | None
44
-
45
-
46
  @dataclass
47
  class _GenerationError:
48
  error: Exception
@@ -61,7 +53,6 @@ class PocketTTS(tts.TTS):
61
  lsd_decode_steps: int = 1,
62
  sample_rate: int = NATIVE_SAMPLE_RATE,
63
  max_concurrent_generations: int = 1,
64
- metrics_callback: OptionalTTSMetricsCallback = None,
65
  ) -> None:
66
  """Create a new instance of Pocket TTS.
67
 
@@ -72,7 +63,6 @@ class PocketTTS(tts.TTS):
72
  sample_rate: Output sample rate. Only native 24kHz is supported.
73
  max_concurrent_generations: Maximum number of concurrent synthesis tasks
74
  for this PocketTTS instance.
75
- metrics_callback: Optional callback for per-segment generation metrics.
76
  """
77
  if max_concurrent_generations < 1:
78
  raise ValueError(
@@ -95,8 +85,6 @@ class PocketTTS(tts.TTS):
95
  self._temperature = temperature
96
  self._lsd_decode_steps = lsd_decode_steps
97
  self._max_concurrent_generations = max_concurrent_generations
98
- self._metrics_callback = metrics_callback
99
-
100
  self._model: Any = TTSModel.load_model(temp=temperature, lsd_decode_steps=lsd_decode_steps)
101
  self._voice_state: Any = self._load_voice_state(voice)
102
  self._generation_semaphore = asyncio.Semaphore(max_concurrent_generations)
@@ -336,13 +324,6 @@ class PocketChunkedStream(tts.ChunkedStream):
336
 
337
  output_emitter.flush()
338
 
339
- if pocket_tts._metrics_callback and first_chunk_ttfb >= 0:
340
- pocket_tts._metrics_callback(
341
- ttfb=first_chunk_ttfb,
342
- duration=total_synth_wall_time,
343
- audio_duration=audio_duration,
344
- )
345
-
346
 
347
  class PocketSynthesizeStream(tts.SynthesizeStream):
348
  def __init__(self, *, tts: PocketTTS, conn_options: APIConnectOptions) -> None:
@@ -398,13 +379,6 @@ class PocketSynthesizeStream(tts.SynthesizeStream):
398
  finally:
399
  output_emitter.end_segment()
400
 
401
- if pocket_tts._metrics_callback and first_chunk_ttfb >= 0:
402
- pocket_tts._metrics_callback(
403
- ttfb=first_chunk_ttfb,
404
- duration=total_synth_wall_time,
405
- audio_duration=audio_duration,
406
- )
407
-
408
  async def _synthesize_segment(
409
  self, text: str, output_emitter: tts.AudioEmitter
410
  ) -> tuple[float, float, float]:
 
8
  import time
9
  from collections.abc import AsyncIterator
10
  from dataclasses import dataclass
11
+ from typing import Any, cast
12
 
13
  import numpy as np
14
  from pocket_tts import TTSModel
 
35
  _WHITESPACE_RE = re.compile(r"\s+")
36
 
37
 
 
 
 
 
 
 
 
 
38
  @dataclass
39
  class _GenerationError:
40
  error: Exception
 
53
  lsd_decode_steps: int = 1,
54
  sample_rate: int = NATIVE_SAMPLE_RATE,
55
  max_concurrent_generations: int = 1,
 
56
  ) -> None:
57
  """Create a new instance of Pocket TTS.
58
 
 
63
  sample_rate: Output sample rate. Only native 24kHz is supported.
64
  max_concurrent_generations: Maximum number of concurrent synthesis tasks
65
  for this PocketTTS instance.
 
66
  """
67
  if max_concurrent_generations < 1:
68
  raise ValueError(
 
85
  self._temperature = temperature
86
  self._lsd_decode_steps = lsd_decode_steps
87
  self._max_concurrent_generations = max_concurrent_generations
 
 
88
  self._model: Any = TTSModel.load_model(temp=temperature, lsd_decode_steps=lsd_decode_steps)
89
  self._voice_state: Any = self._load_voice_state(voice)
90
  self._generation_semaphore = asyncio.Semaphore(max_concurrent_generations)
 
324
 
325
  output_emitter.flush()
326
 
 
 
 
 
 
 
 
327
 
328
  class PocketSynthesizeStream(tts.SynthesizeStream):
329
  def __init__(self, *, tts: PocketTTS, conn_options: APIConnectOptions) -> None:
 
379
  finally:
380
  output_emitter.end_segment()
381
 
 
 
 
 
 
 
 
382
  async def _synthesize_segment(
383
  self, text: str, output_emitter: tts.AudioEmitter
384
  ) -> tuple[float, float, float]:
tests/test_langfuse_turn_tracing.py CHANGED
@@ -161,6 +161,7 @@ def _make_tts_metrics(
161
  ttfb: float = 0.15,
162
  duration: float = 0.5,
163
  audio_duration: float = 1.3,
 
164
  ) -> metrics.TTSMetrics:
165
  return metrics.TTSMetrics(
166
  label="tts",
@@ -173,6 +174,7 @@ def _make_tts_metrics(
173
  characters_count=42,
174
  streamed=True,
175
  speech_id=speech_id,
 
176
  )
177
 
178
 
@@ -244,6 +246,34 @@ class _FakeTextMethodPart:
244
 
245
 
246
  class _FakeSpeechHandle:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
247
  def __init__(self, chat_items: list[Any], speech_id: str = "speech-fake") -> None:
248
  self.id = speech_id
249
  self.chat_items = chat_items
@@ -346,6 +376,8 @@ def test_turn_trace_has_required_metadata_and_spans(monkeypatch: pytest.MonkeyPa
346
  assert root.attributes["turn_id"]
347
  assert root.attributes["langfuse.trace.output"] == "hi, how can I help?"
348
  assert root.attributes["langfuse.trace.public"] is False
 
 
349
  assert root.attributes["latency_ms.eou_delay"] == pytest.approx(1100.0)
350
  assert root.attributes["latency_ms.stt_finalization"] == pytest.approx(250.0)
351
  assert root.attributes["latency_ms.stt_total"] == pytest.approx(1350.0)
@@ -377,17 +409,28 @@ def test_turn_trace_has_required_metadata_and_spans(monkeypatch: pytest.MonkeyPa
377
  assert llm_span.attributes["response_text"] == "hi, how can I help?"
378
  assert llm_span.attributes["ttft_ms"] > 0
379
  assert llm_span.attributes["llm_total_latency_ms"] > 0
 
 
 
380
  assert llm_span.attributes["input"] == "hello there"
381
  assert llm_span.attributes["output"] == "hi, how can I help?"
382
- assert llm_span.attributes["duration_ms"] > 0
 
 
383
  assert llm_span.attributes["prompt_tokens"] == 12
384
  assert llm_span.attributes["completion_tokens"] == 24
385
 
386
  assert tts_span.attributes["assistant_text"] == "hi, how can I help?"
387
  assert tts_span.attributes["ttfb_ms"] > 0
 
 
 
 
388
  assert tts_span.attributes["input"] == "hi, how can I help?"
389
  assert tts_span.attributes["output"] == "hi, how can I help?"
390
- assert tts_span.attributes["duration_ms"] > 0
 
 
391
  assert tts_span.attributes["characters_count"] == 42
392
  assert tts_span.attributes["streamed"] is True
393
 
@@ -1734,6 +1777,152 @@ def test_multiple_final_transcripts_are_merged_into_one_turn(
1734
  assert root.attributes["langfuse.trace.metadata.coalesced_turn_count"] == 0
1735
 
1736
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1737
  def test_immediate_continuation_coalesces_aborted_prior_turn(
1738
  monkeypatch: pytest.MonkeyPatch,
1739
  ) -> None:
@@ -2089,6 +2278,103 @@ def test_trace_waits_for_assistant_text_before_emit(monkeypatch: pytest.MonkeyPa
2089
  assert turn_spans[0].attributes["langfuse.trace.output"] == "hello there"
2090
 
2091
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2092
  def test_speech_created_done_callback_backfills_assistant_text(
2093
  monkeypatch: pytest.MonkeyPatch,
2094
  ) -> None:
@@ -2118,7 +2404,7 @@ def test_speech_created_done_callback_backfills_assistant_text(
2118
  await collector.wait_for_pending_trace_tasks()
2119
  assert not fake_tracer.spans
2120
 
2121
- handle = _FakeSpeechHandle(
2122
  chat_items=[_FakeChatItem(role="assistant", content=["fallback reply"])],
2123
  speech_id="speech-speech-created",
2124
  )
@@ -2135,6 +2421,56 @@ def test_speech_created_done_callback_backfills_assistant_text(
2135
  assert turn_spans[0].attributes["langfuse.trace.metadata.assistant_text_missing"] is False
2136
 
2137
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2138
  def test_speech_created_immediate_capture_backfills_assistant_text(
2139
  monkeypatch: pytest.MonkeyPatch,
2140
  ) -> None:
@@ -2221,6 +2557,8 @@ def test_trace_finalize_timeout_for_missing_assistant_text(
2221
  root = turn_spans[0]
2222
  assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is True
2223
  assert root.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
 
 
2224
 
2225
 
2226
  def test_trace_finalize_timeout_uses_pending_assistant_transcript(
@@ -2570,7 +2908,7 @@ def test_stale_orphan_assistant_text_from_absorbed_turn_is_not_attached_to_conti
2570
  assert "stale reply from the absorbed turn" not in root.attributes["langfuse.trace.output"]
2571
 
2572
 
2573
- def test_audio_started_turn_is_finalized_separately_when_new_user_turn_arrives(
2574
  monkeypatch: pytest.MonkeyPatch,
2575
  ) -> None:
2576
  import src.agent.traces.metrics_collector as metrics_collector_module
@@ -2587,6 +2925,7 @@ def test_audio_started_turn_is_finalized_separately_when_new_user_turn_arrives(
2587
  participant_id="web-123",
2588
  langfuse_enabled=True,
2589
  )
 
2590
 
2591
  async def _run() -> None:
2592
  await collector.on_session_metadata(
@@ -2619,15 +2958,116 @@ def test_audio_started_turn_is_finalized_separately_when_new_user_turn_arrives(
2619
  await collector.on_metrics_collected(_make_tts_metrics("speech-separate-b"))
2620
  await collector.wait_for_pending_trace_tasks()
2621
 
 
 
 
 
 
 
 
2622
  asyncio.run(_run())
2623
 
2624
  turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2625
  assert len(turn_spans) == 2
2626
- first, second = turn_spans
2627
- assert first.attributes["langfuse.trace.input"] == "first prompt"
 
 
 
 
 
 
 
 
2628
  assert first.attributes["langfuse.trace.metadata.interrupted"] is True
2629
  assert first.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
2630
- assert second.attributes["langfuse.trace.input"] == "second prompt"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2631
  assert second.attributes["langfuse.trace.output"] == "second reply"
2632
 
2633
 
 
161
  ttfb: float = 0.15,
162
  duration: float = 0.5,
163
  audio_duration: float = 1.3,
164
+ metadata: Any = None,
165
  ) -> metrics.TTSMetrics:
166
  return metrics.TTSMetrics(
167
  label="tts",
 
174
  characters_count=42,
175
  streamed=True,
176
  speech_id=speech_id,
177
+ metadata=metadata,
178
  )
179
 
180
 
 
246
 
247
 
248
  class _FakeSpeechHandle:
249
+ def __init__(self, chat_items: list[Any], speech_id: str = "speech-fake") -> None:
250
+ self.id = speech_id
251
+ self.chat_items = chat_items
252
+ self._callbacks: list[Any] = []
253
+ self._item_added_callbacks: list[Any] = []
254
+
255
+ def add_done_callback(self, callback: Any) -> None:
256
+ self._callbacks.append(callback)
257
+
258
+ def _add_item_added_callback(self, callback: Any) -> None:
259
+ self._item_added_callbacks.append(callback)
260
+
261
+ def _remove_item_added_callback(self, callback: Any) -> None:
262
+ self._item_added_callbacks = [
263
+ registered for registered in self._item_added_callbacks if registered is not callback
264
+ ]
265
+
266
+ def add_chat_item(self, item: Any) -> None:
267
+ self.chat_items.append(item)
268
+ for callback in list(self._item_added_callbacks):
269
+ callback(item)
270
+
271
+ def trigger_done(self) -> None:
272
+ for callback in self._callbacks:
273
+ callback(self)
274
+
275
+
276
+ class _FakeSpeechHandleWithoutItemAddedHook:
277
  def __init__(self, chat_items: list[Any], speech_id: str = "speech-fake") -> None:
278
  self.id = speech_id
279
  self.chat_items = chat_items
 
376
  assert root.attributes["turn_id"]
377
  assert root.attributes["langfuse.trace.output"] == "hi, how can I help?"
378
  assert root.attributes["langfuse.trace.public"] is False
379
+ assert root.attributes["langfuse.trace.metadata.finalization_reason"] == "complete"
380
+ assert root.attributes["langfuse.trace.metadata.assistant_text_source"] == "conversation_item"
381
  assert root.attributes["latency_ms.eou_delay"] == pytest.approx(1100.0)
382
  assert root.attributes["latency_ms.stt_finalization"] == pytest.approx(250.0)
383
  assert root.attributes["latency_ms.stt_total"] == pytest.approx(1350.0)
 
409
  assert llm_span.attributes["response_text"] == "hi, how can I help?"
410
  assert llm_span.attributes["ttft_ms"] > 0
411
  assert llm_span.attributes["llm_total_latency_ms"] > 0
412
+ assert llm_span.attributes["total_duration_ms"] == pytest.approx(
413
+ llm_span.attributes["llm_total_latency_ms"]
414
+ )
415
  assert llm_span.attributes["input"] == "hello there"
416
  assert llm_span.attributes["output"] == "hi, how can I help?"
417
+ assert llm_span.attributes["duration_ms"] == pytest.approx(
418
+ llm_span.attributes["ttft_ms"]
419
+ )
420
  assert llm_span.attributes["prompt_tokens"] == 12
421
  assert llm_span.attributes["completion_tokens"] == 24
422
 
423
  assert tts_span.attributes["assistant_text"] == "hi, how can I help?"
424
  assert tts_span.attributes["ttfb_ms"] > 0
425
+ assert tts_span.attributes["tts_total_latency_ms"] > 0
426
+ assert tts_span.attributes["total_duration_ms"] == pytest.approx(
427
+ tts_span.attributes["tts_total_latency_ms"]
428
+ )
429
  assert tts_span.attributes["input"] == "hi, how can I help?"
430
  assert tts_span.attributes["output"] == "hi, how can I help?"
431
+ assert tts_span.attributes["duration_ms"] == pytest.approx(
432
+ tts_span.attributes["ttfb_ms"]
433
+ )
434
  assert tts_span.attributes["characters_count"] == 42
435
  assert tts_span.attributes["streamed"] is True
436
 
 
1777
  assert root.attributes["langfuse.trace.metadata.coalesced_turn_count"] == 0
1778
 
1779
 
1780
+ def test_same_speech_final_transcripts_keep_merging_after_eou_until_llm(
1781
+ monkeypatch: pytest.MonkeyPatch,
1782
+ ) -> None:
1783
+ import src.agent.traces.metrics_collector as metrics_collector_module
1784
+
1785
+ fake_tracer = _FakeTracer()
1786
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
1787
+
1788
+ room = _FakeRoom()
1789
+ collector = MetricsCollector(
1790
+ room=room, # type: ignore[arg-type]
1791
+ model_name="moonshine",
1792
+ room_name=room.name,
1793
+ room_id="RM123",
1794
+ participant_id="web-123",
1795
+ langfuse_enabled=True,
1796
+ )
1797
+
1798
+ async def _run() -> None:
1799
+ await collector.on_session_metadata(
1800
+ session_id="session-same-speech-final-merge-after-eou",
1801
+ participant_id="web-123",
1802
+ )
1803
+ await collector.on_user_input_transcribed("Hello there.", is_final=True)
1804
+ await collector.on_metrics_collected(
1805
+ _make_eou_metrics("speech-final-merge-after-eou", delay=0.9, transcription_delay=0.2)
1806
+ )
1807
+ await collector.on_user_input_transcribed("I'm missing context.", is_final=True)
1808
+ await collector.on_metrics_collected(_make_llm_metrics("speech-final-merge-after-eou"))
1809
+ await collector.on_conversation_item_added(
1810
+ role="assistant",
1811
+ content="Hi there.",
1812
+ )
1813
+ await collector.on_metrics_collected(_make_tts_metrics("speech-final-merge-after-eou"))
1814
+ await collector.wait_for_pending_trace_tasks()
1815
+
1816
+ asyncio.run(_run())
1817
+
1818
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
1819
+ assert len(turn_spans) == 1
1820
+ root = turn_spans[0]
1821
+ assert root.attributes["langfuse.trace.input"] == "Hello there. I'm missing context."
1822
+ assert root.attributes["langfuse.trace.output"] == "Hi there."
1823
+ assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is False
1824
+
1825
+
1826
+ def test_user_conversation_item_after_eou_merges_instead_of_replacing(
1827
+ monkeypatch: pytest.MonkeyPatch,
1828
+ ) -> None:
1829
+ import src.agent.traces.metrics_collector as metrics_collector_module
1830
+
1831
+ fake_tracer = _FakeTracer()
1832
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
1833
+
1834
+ room = _FakeRoom()
1835
+ collector = MetricsCollector(
1836
+ room=room, # type: ignore[arg-type]
1837
+ model_name="moonshine",
1838
+ room_name=room.name,
1839
+ room_id="RM123",
1840
+ participant_id="web-123",
1841
+ langfuse_enabled=True,
1842
+ )
1843
+
1844
+ async def _run() -> None:
1845
+ await collector.on_session_metadata(
1846
+ session_id="session-user-item-merges-after-eou",
1847
+ participant_id="web-123",
1848
+ )
1849
+ await collector.on_user_input_transcribed("Hello there.", is_final=True)
1850
+ await collector.on_metrics_collected(
1851
+ _make_eou_metrics("speech-user-item-after-eou", delay=0.8, transcription_delay=0.2)
1852
+ )
1853
+ await collector.on_conversation_item_added(
1854
+ role="user",
1855
+ content="I'm missing the rest.",
1856
+ )
1857
+ await collector.on_metrics_collected(_make_llm_metrics("speech-user-item-after-eou"))
1858
+ await collector.on_conversation_item_added(
1859
+ role="assistant",
1860
+ content="Thanks for clarifying.",
1861
+ )
1862
+ await collector.on_metrics_collected(_make_tts_metrics("speech-user-item-after-eou"))
1863
+ await collector.wait_for_pending_trace_tasks()
1864
+
1865
+ asyncio.run(_run())
1866
+
1867
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
1868
+ assert len(turn_spans) == 1
1869
+ root = turn_spans[0]
1870
+ assert root.attributes["langfuse.trace.input"] == "Hello there. I'm missing the rest."
1871
+ assert root.attributes["langfuse.trace.output"] == "Thanks for clarifying."
1872
+
1873
+
1874
+ def test_same_speech_fragmented_input_with_late_speech_done_keeps_full_trace(
1875
+ monkeypatch: pytest.MonkeyPatch,
1876
+ ) -> None:
1877
+ import src.agent.traces.metrics_collector as metrics_collector_module
1878
+
1879
+ fake_tracer = _FakeTracer()
1880
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
1881
+
1882
+ room = _FakeRoom()
1883
+ collector = MetricsCollector(
1884
+ room=room, # type: ignore[arg-type]
1885
+ model_name="moonshine",
1886
+ room_name=room.name,
1887
+ room_id="RM123",
1888
+ participant_id="web-123",
1889
+ langfuse_enabled=True,
1890
+ )
1891
+ handle = _FakeSpeechHandle(chat_items=[], speech_id="speech-fragmented-late-done")
1892
+
1893
+ async def _run() -> None:
1894
+ await collector.on_session_metadata(
1895
+ session_id="session-fragmented-late-done",
1896
+ participant_id="web-123",
1897
+ )
1898
+ await collector.on_user_input_transcribed("Hello there.", is_final=True)
1899
+ await collector.on_metrics_collected(
1900
+ _make_eou_metrics("speech-fragmented-late-done", delay=0.8, transcription_delay=0.2)
1901
+ )
1902
+ await collector.on_user_input_transcribed("I'm missing context.", is_final=True)
1903
+ await collector.on_metrics_collected(_make_llm_metrics("speech-fragmented-late-done"))
1904
+ await collector.on_speech_created(handle)
1905
+ await collector.on_metrics_collected(_make_tts_metrics("speech-fragmented-late-done"))
1906
+ await collector.wait_for_pending_trace_tasks()
1907
+
1908
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
1909
+ assert not turn_spans
1910
+
1911
+ handle.chat_items = [_FakeChatItem(role="assistant", content=["Hi there."])]
1912
+ handle.trigger_done()
1913
+ await asyncio.sleep(0)
1914
+ await collector.wait_for_pending_trace_tasks()
1915
+
1916
+ asyncio.run(_run())
1917
+
1918
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
1919
+ assert len(turn_spans) == 1
1920
+ root = turn_spans[0]
1921
+ assert root.attributes["langfuse.trace.input"] == "Hello there. I'm missing context."
1922
+ assert root.attributes["langfuse.trace.output"] == "Hi there."
1923
+ assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is False
1924
+
1925
+
1926
  def test_immediate_continuation_coalesces_aborted_prior_turn(
1927
  monkeypatch: pytest.MonkeyPatch,
1928
  ) -> None:
 
2278
  assert turn_spans[0].attributes["langfuse.trace.output"] == "hello there"
2279
 
2280
 
2281
+ def test_tts_metric_metadata_assistant_text_emits_without_placeholder(
2282
+ monkeypatch: pytest.MonkeyPatch,
2283
+ ) -> None:
2284
+ import src.agent.traces.metrics_collector as metrics_collector_module
2285
+
2286
+ fake_tracer = _FakeTracer()
2287
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2288
+
2289
+ room = _FakeRoom()
2290
+ collector = MetricsCollector(
2291
+ room=room, # type: ignore[arg-type]
2292
+ model_name="moonshine",
2293
+ room_name=room.name,
2294
+ room_id="RM123",
2295
+ participant_id="web-123",
2296
+ langfuse_enabled=True,
2297
+ )
2298
+
2299
+ async def _run() -> None:
2300
+ await collector.on_session_metadata(
2301
+ session_id="session-tts-metadata",
2302
+ participant_id="web-123",
2303
+ )
2304
+ await collector.on_user_input_transcribed("hi", is_final=True)
2305
+ await collector.on_metrics_collected(_make_llm_metrics("speech-tts-metadata"))
2306
+ tts_metrics = _make_tts_metrics("speech-tts-metadata")
2307
+ tts_metrics.metadata = {
2308
+ "model_name": "pocket-tts",
2309
+ "model_provider": "Kyutai",
2310
+ "assistant_text": "assistant text from tts metadata",
2311
+ }
2312
+ await collector.on_metrics_collected(tts_metrics)
2313
+ await collector.wait_for_pending_trace_tasks()
2314
+
2315
+ asyncio.run(_run())
2316
+
2317
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2318
+ assert len(turn_spans) == 1
2319
+ root = turn_spans[0]
2320
+ assert root.attributes["langfuse.trace.output"] == "assistant text from tts metadata"
2321
+ assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is False
2322
+ assert root.attributes["langfuse.trace.metadata.assistant_text_source"] == "tts_metrics"
2323
+ assert root.attributes["langfuse.trace.metadata.finalization_reason"] == "complete"
2324
+
2325
+
2326
+ def test_speech_item_added_assistant_text_arriving_within_grace_emits_trace(
2327
+ monkeypatch: pytest.MonkeyPatch,
2328
+ ) -> None:
2329
+ import src.agent.traces.metrics_collector as metrics_collector_module
2330
+
2331
+ fake_tracer = _FakeTracer()
2332
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2333
+
2334
+ room = _FakeRoom()
2335
+ collector = MetricsCollector(
2336
+ room=room, # type: ignore[arg-type]
2337
+ model_name="moonshine",
2338
+ room_name=room.name,
2339
+ room_id="RM123",
2340
+ participant_id="web-123",
2341
+ langfuse_enabled=True,
2342
+ )
2343
+ collector._trace_finalize_timeout_sec = 0.05
2344
+ handle = _FakeSpeechHandle([], speech_id="speech-item-added")
2345
+
2346
+ async def _run() -> None:
2347
+ await collector.on_session_metadata(
2348
+ session_id="session-speech-item-added",
2349
+ participant_id="web-123",
2350
+ )
2351
+ await collector.on_user_input_transcribed("hi", is_final=True)
2352
+ await collector.on_metrics_collected(_make_llm_metrics("speech-item-added"))
2353
+ await collector.on_speech_created(handle)
2354
+ await collector.on_metrics_collected(_make_tts_metrics("speech-item-added"))
2355
+ await collector.wait_for_pending_trace_tasks()
2356
+ assert not fake_tracer.spans
2357
+
2358
+ handle.add_chat_item(
2359
+ _FakeChatItem(
2360
+ role="assistant",
2361
+ content=["assistant text from speech item callback"],
2362
+ )
2363
+ )
2364
+ await asyncio.sleep(0)
2365
+ await collector.wait_for_pending_trace_tasks()
2366
+
2367
+ asyncio.run(_run())
2368
+
2369
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2370
+ assert len(turn_spans) == 1
2371
+ root = turn_spans[0]
2372
+ assert root.attributes["langfuse.trace.output"] == "assistant text from speech item callback"
2373
+ assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is False
2374
+ assert root.attributes["langfuse.trace.metadata.assistant_text_source"] == "speech_item_added"
2375
+ assert root.attributes["langfuse.trace.metadata.finalization_reason"] == "complete"
2376
+
2377
+
2378
  def test_speech_created_done_callback_backfills_assistant_text(
2379
  monkeypatch: pytest.MonkeyPatch,
2380
  ) -> None:
 
2404
  await collector.wait_for_pending_trace_tasks()
2405
  assert not fake_tracer.spans
2406
 
2407
+ handle = _FakeSpeechHandleWithoutItemAddedHook(
2408
  chat_items=[_FakeChatItem(role="assistant", content=["fallback reply"])],
2409
  speech_id="speech-speech-created",
2410
  )
 
2421
  assert turn_spans[0].attributes["langfuse.trace.metadata.assistant_text_missing"] is False
2422
 
2423
 
2424
+ def test_speech_done_does_not_replace_speech_item_added_text(
2425
+ monkeypatch: pytest.MonkeyPatch,
2426
+ ) -> None:
2427
+ import src.agent.traces.metrics_collector as metrics_collector_module
2428
+
2429
+ fake_tracer = _FakeTracer()
2430
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2431
+
2432
+ room = _FakeRoom()
2433
+ collector = MetricsCollector(
2434
+ room=room, # type: ignore[arg-type]
2435
+ model_name="moonshine",
2436
+ room_name=room.name,
2437
+ room_id="RM123",
2438
+ participant_id="web-123",
2439
+ langfuse_enabled=True,
2440
+ )
2441
+ handle = _FakeSpeechHandle([], speech_id="speech-speech-item-preferred")
2442
+
2443
+ async def _run() -> None:
2444
+ await collector.on_session_metadata(
2445
+ session_id="session-speech-item-preferred",
2446
+ participant_id="web-123",
2447
+ )
2448
+ await collector.on_user_input_transcribed("hi there", is_final=True)
2449
+ await collector.on_metrics_collected(_make_llm_metrics("speech-speech-item-preferred"))
2450
+ await collector.on_speech_created(handle)
2451
+ handle.add_chat_item(
2452
+ _FakeChatItem(role="assistant", content=["preferred reply from speech item"])
2453
+ )
2454
+ await asyncio.sleep(0)
2455
+
2456
+ handle.chat_items.append(
2457
+ _FakeChatItem(role="assistant", content=["stale reply from speech done"])
2458
+ )
2459
+ handle.trigger_done()
2460
+ await asyncio.sleep(0)
2461
+
2462
+ await collector.on_metrics_collected(_make_tts_metrics("speech-speech-item-preferred"))
2463
+ await collector.wait_for_pending_trace_tasks()
2464
+
2465
+ asyncio.run(_run())
2466
+
2467
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2468
+ assert len(turn_spans) == 1
2469
+ root = turn_spans[0]
2470
+ assert root.attributes["langfuse.trace.output"] == "preferred reply from speech item"
2471
+ assert root.attributes["langfuse.trace.metadata.assistant_text_source"] == "speech_item_added"
2472
+
2473
+
2474
  def test_speech_created_immediate_capture_backfills_assistant_text(
2475
  monkeypatch: pytest.MonkeyPatch,
2476
  ) -> None:
 
2557
  root = turn_spans[0]
2558
  assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is True
2559
  assert root.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
2560
+ assert root.attributes["langfuse.trace.metadata.assistant_text_source"] == "unavailable"
2561
+ assert root.attributes["langfuse.trace.metadata.finalization_reason"] == "assistant_text_grace_timeout"
2562
 
2563
 
2564
  def test_trace_finalize_timeout_uses_pending_assistant_transcript(
 
2908
  assert "stale reply from the absorbed turn" not in root.attributes["langfuse.trace.output"]
2909
 
2910
 
2911
+ def test_audio_started_turn_waits_for_timeout_before_placeholder_on_barge_in(
2912
  monkeypatch: pytest.MonkeyPatch,
2913
  ) -> None:
2914
  import src.agent.traces.metrics_collector as metrics_collector_module
 
2925
  participant_id="web-123",
2926
  langfuse_enabled=True,
2927
  )
2928
+ collector._trace_finalize_timeout_sec = 0.01
2929
 
2930
  async def _run() -> None:
2931
  await collector.on_session_metadata(
 
2958
  await collector.on_metrics_collected(_make_tts_metrics("speech-separate-b"))
2959
  await collector.wait_for_pending_trace_tasks()
2960
 
2961
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2962
+ assert len(turn_spans) == 1
2963
+ assert turn_spans[0].attributes["langfuse.trace.input"] == "second prompt"
2964
+
2965
+ await asyncio.sleep(0.03)
2966
+ await collector.wait_for_pending_trace_tasks()
2967
+
2968
  asyncio.run(_run())
2969
 
2970
  turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2971
  assert len(turn_spans) == 2
2972
+ first = next(
2973
+ span
2974
+ for span in turn_spans
2975
+ if span.attributes["langfuse.trace.input"] == "first prompt"
2976
+ )
2977
+ second = next(
2978
+ span
2979
+ for span in turn_spans
2980
+ if span.attributes["langfuse.trace.input"] == "second prompt"
2981
+ )
2982
  assert first.attributes["langfuse.trace.metadata.interrupted"] is True
2983
  assert first.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
2984
+ assert second.attributes["langfuse.trace.output"] == "second reply"
2985
+
2986
+
2987
+ def test_audio_started_turn_uses_late_speech_done_text_after_barge_in(
2988
+ monkeypatch: pytest.MonkeyPatch,
2989
+ ) -> None:
2990
+ import src.agent.traces.metrics_collector as metrics_collector_module
2991
+
2992
+ fake_tracer = _FakeTracer()
2993
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2994
+
2995
+ room = _FakeRoom()
2996
+ collector = MetricsCollector(
2997
+ room=room, # type: ignore[arg-type]
2998
+ model_name="moonshine",
2999
+ room_name=room.name,
3000
+ room_id="RM123",
3001
+ participant_id="web-123",
3002
+ langfuse_enabled=True,
3003
+ )
3004
+ collector._trace_finalize_timeout_sec = 0.05
3005
+ handle = _FakeSpeechHandle(chat_items=[], speech_id="speech-audio-started-late-done")
3006
+
3007
+ async def _run() -> None:
3008
+ await collector.on_session_metadata(
3009
+ session_id="session-audio-started-late-done",
3010
+ participant_id="web-123",
3011
+ )
3012
+ await collector.on_user_input_transcribed("first prompt", is_final=True)
3013
+ await collector.on_metrics_collected(
3014
+ _make_eou_metrics(
3015
+ "speech-audio-started-late-done",
3016
+ delay=0.4,
3017
+ transcription_delay=0.1,
3018
+ )
3019
+ )
3020
+ await collector.on_metrics_collected(
3021
+ _make_llm_metrics("speech-audio-started-late-done")
3022
+ )
3023
+ await collector.on_speech_created(handle)
3024
+ await collector.on_agent_state_changed(
3025
+ old_state="thinking",
3026
+ new_state="speaking",
3027
+ )
3028
+ await collector.on_metrics_collected(
3029
+ _make_tts_metrics("speech-audio-started-late-done")
3030
+ )
3031
+
3032
+ await collector.on_user_input_transcribed("second prompt", is_final=True)
3033
+ await collector.on_metrics_collected(
3034
+ _make_eou_metrics("speech-separate-late-done-b", delay=0.5, transcription_delay=0.1)
3035
+ )
3036
+ await collector.on_metrics_collected(_make_llm_metrics("speech-separate-late-done-b"))
3037
+ await collector.on_conversation_item_added(
3038
+ role="assistant",
3039
+ content="second reply",
3040
+ )
3041
+ await collector.on_metrics_collected(_make_tts_metrics("speech-separate-late-done-b"))
3042
+ await collector.wait_for_pending_trace_tasks()
3043
+
3044
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
3045
+ assert len(turn_spans) == 1
3046
+ assert turn_spans[0].attributes["langfuse.trace.input"] == "second prompt"
3047
+
3048
+ handle.chat_items = [
3049
+ _FakeChatItem(role="assistant", content=["first reply recovered late"])
3050
+ ]
3051
+ handle.trigger_done()
3052
+ await asyncio.sleep(0)
3053
+ await collector.wait_for_pending_trace_tasks()
3054
+
3055
+ asyncio.run(_run())
3056
+
3057
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
3058
+ assert len(turn_spans) == 2
3059
+ first = next(
3060
+ span
3061
+ for span in turn_spans
3062
+ if span.attributes["langfuse.trace.input"] == "first prompt"
3063
+ )
3064
+ second = next(
3065
+ span
3066
+ for span in turn_spans
3067
+ if span.attributes["langfuse.trace.input"] == "second prompt"
3068
+ )
3069
+ assert first.attributes["langfuse.trace.metadata.interrupted"] is True
3070
+ assert first.attributes["langfuse.trace.output"] == "first reply recovered late"
3071
  assert second.attributes["langfuse.trace.output"] == "second reply"
3072
 
3073