dvalle08 commited on
Commit
7fdafe4
·
1 Parent(s): a7654b2

codex fixex

Browse files
src/agent/runtime/assistant.py CHANGED
@@ -86,16 +86,12 @@ class Assistant(Agent):
86
  """Called when the agent enters the session. Set up metrics listeners."""
87
 
88
  def metrics_wrapper(event: MetricsCollectedEvent) -> None:
89
- asyncio.create_task(
90
- self._metrics_collector.on_metrics_collected(event.metrics)
91
- )
92
 
93
  def transcript_wrapper(event: UserInputTranscribedEvent) -> None:
94
- asyncio.create_task(
95
- self._metrics_collector.on_user_input_transcribed(
96
- event.transcript,
97
- is_final=event.is_final,
98
- )
99
  )
100
 
101
  def conversation_item_wrapper(event: ConversationItemAddedEvent) -> None:
@@ -103,27 +99,21 @@ class Assistant(Agent):
103
  role = getattr(item, "role", None)
104
  content = getattr(item, "content", None)
105
  item_created_at = getattr(item, "created_at", None)
106
- asyncio.create_task(
107
- self._metrics_collector.on_conversation_item_added(
108
- role=role,
109
- content=content,
110
- event_created_at=event.created_at,
111
- item_created_at=item_created_at,
112
- )
113
  )
114
 
115
  def speech_created_wrapper(event: SpeechCreatedEvent) -> None:
116
- asyncio.create_task(
117
- self._metrics_collector.on_speech_created(event.speech_handle)
118
- )
119
 
120
  def function_tools_executed_wrapper(event: FunctionToolsExecutedEvent) -> None:
121
- asyncio.create_task(
122
- self._metrics_collector.on_function_tools_executed(
123
- function_calls=event.function_calls,
124
- function_call_outputs=event.function_call_outputs,
125
- created_at=event.created_at,
126
- )
127
  )
128
  if self._tool_feedback is not None:
129
  asyncio.create_task(
@@ -131,11 +121,9 @@ class Assistant(Agent):
131
  )
132
 
133
  def agent_state_changed_wrapper(event: AgentStateChangedEvent) -> None:
134
- asyncio.create_task(
135
- self._metrics_collector.on_agent_state_changed(
136
- old_state=event.old_state,
137
- new_state=event.new_state,
138
- )
139
  )
140
 
141
  def error_wrapper(event: ErrorEvent) -> None:
 
86
  """Called when the agent enters the session. Set up metrics listeners."""
87
 
88
  def metrics_wrapper(event: MetricsCollectedEvent) -> None:
89
+ self._metrics_collector.submit_metrics_collected(event.metrics)
 
 
90
 
91
  def transcript_wrapper(event: UserInputTranscribedEvent) -> None:
92
+ self._metrics_collector.submit_user_input_transcribed(
93
+ event.transcript,
94
+ is_final=event.is_final,
 
 
95
  )
96
 
97
  def conversation_item_wrapper(event: ConversationItemAddedEvent) -> None:
 
99
  role = getattr(item, "role", None)
100
  content = getattr(item, "content", None)
101
  item_created_at = getattr(item, "created_at", None)
102
+ self._metrics_collector.submit_conversation_item_added(
103
+ role=role,
104
+ content=content,
105
+ event_created_at=event.created_at,
106
+ item_created_at=item_created_at,
 
 
107
  )
108
 
109
  def speech_created_wrapper(event: SpeechCreatedEvent) -> None:
110
+ self._metrics_collector.submit_speech_created(event.speech_handle)
 
 
111
 
112
  def function_tools_executed_wrapper(event: FunctionToolsExecutedEvent) -> None:
113
+ self._metrics_collector.submit_function_tools_executed(
114
+ function_calls=event.function_calls,
115
+ function_call_outputs=event.function_call_outputs,
116
+ created_at=event.created_at,
 
 
117
  )
118
  if self._tool_feedback is not None:
119
  asyncio.create_task(
 
121
  )
122
 
123
  def agent_state_changed_wrapper(event: AgentStateChangedEvent) -> None:
124
+ self._metrics_collector.submit_agent_state_changed(
125
+ old_state=event.old_state,
126
+ new_state=event.new_state,
 
 
127
  )
128
 
129
  def error_wrapper(event: ErrorEvent) -> None:
src/agent/runtime/session.py CHANGED
@@ -109,16 +109,6 @@ async def session_handler(ctx: agents.JobContext) -> None:
109
  startup_greeting_task: asyncio.Task[Any] | None = None
110
  tool_feedback = ToolFeedbackController(enabled=False)
111
 
112
- if trace_provider:
113
-
114
- async def flush_trace(_: str) -> None:
115
- try:
116
- trace_provider.force_flush()
117
- except Exception as exc:
118
- logger.warning(f"Failed to flush Langfuse traces: {exc}")
119
-
120
- ctx.add_shutdown_callback(flush_trace)
121
-
122
  async def cancel_startup_greeting(_: str) -> None:
123
  await cancel_task_for_shutdown(
124
  startup_greeting_task,
@@ -147,6 +137,22 @@ async def session_handler(ctx: agents.JobContext) -> None:
147
  langfuse_enabled=trace_provider is not None,
148
  )
149
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
  if isinstance(ctx.job.metadata, str) and ctx.job.metadata.strip():
151
  try:
152
  metadata = json.loads(ctx.job.metadata)
@@ -158,11 +164,9 @@ async def session_handler(ctx: agents.JobContext) -> None:
158
  metadata.get("participant_id"),
159
  ctx.room.name,
160
  )
161
- asyncio.create_task(
162
- metrics_collector.on_session_metadata(
163
- session_id=metadata.get("session_id"),
164
- participant_id=metadata.get("participant_id"),
165
- )
166
  )
167
 
168
  tts_engine = create_tts()
 
109
  startup_greeting_task: asyncio.Task[Any] | None = None
110
  tool_feedback = ToolFeedbackController(enabled=False)
111
 
 
 
 
 
 
 
 
 
 
 
112
  async def cancel_startup_greeting(_: str) -> None:
113
  await cancel_task_for_shutdown(
114
  startup_greeting_task,
 
137
  langfuse_enabled=trace_provider is not None,
138
  )
139
 
140
+ async def drain_pending_traces(_: str) -> None:
141
+ try:
142
+ await metrics_collector.drain_pending_traces()
143
+ except TimeoutError:
144
+ logger.warning("Timed out while draining pending Langfuse traces during shutdown")
145
+ except Exception as exc:
146
+ logger.warning(f"Failed to drain pending Langfuse traces: {exc}")
147
+ if trace_provider is None:
148
+ return
149
+ try:
150
+ trace_provider.force_flush()
151
+ except Exception as exc:
152
+ logger.warning(f"Failed to flush Langfuse traces: {exc}")
153
+
154
+ ctx.add_shutdown_callback(drain_pending_traces)
155
+
156
  if isinstance(ctx.job.metadata, str) and ctx.job.metadata.strip():
157
  try:
158
  metadata = json.loads(ctx.job.metadata)
 
164
  metadata.get("participant_id"),
165
  ctx.room.name,
166
  )
167
+ await metrics_collector.on_session_metadata(
168
+ session_id=metadata.get("session_id"),
169
+ participant_id=metadata.get("participant_id"),
 
 
170
  )
171
 
172
  tts_engine = create_tts()
src/agent/traces/metrics_collector.py CHANGED
@@ -12,7 +12,7 @@ import uuid
12
  from collections import deque
13
  from dataclasses import asdict, dataclass
14
  from time import monotonic, time
15
- from typing import Any, Optional, Sequence, Union
16
 
17
  from livekit import rtc
18
  from livekit.agents import metrics
@@ -213,6 +213,16 @@ class PendingUserUtterance:
213
  watchdog_id: Optional[str] = None
214
 
215
 
 
 
 
 
 
 
 
 
 
 
216
  # ------------------------------------------------------------------
217
  # Facade
218
  # ------------------------------------------------------------------
@@ -261,6 +271,9 @@ class MetricsCollector:
261
  self._latest_vad_metrics: Optional[VADMetrics] = None
262
  self._latest_vad_metric_attributes: Optional[dict[str, Any]] = None
263
  self._first_final_user_turn_logged = False
 
 
 
264
  self._llm_stall_timeout_sec = max(
265
  float(
266
  getattr(
@@ -271,6 +284,19 @@ class MetricsCollector:
271
  ),
272
  0.0,
273
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
274
 
275
  self._tracer = TurnTracer(
276
  publisher=self._publisher,
@@ -300,6 +326,61 @@ class MetricsCollector:
300
  def _trace_post_tool_response_timeout_sec(self, value: float) -> None:
301
  self._tracer._trace_post_tool_response_timeout_sec = value
302
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
303
  # ------------------------------------------------------------------
304
  # Public event handlers
305
  # ------------------------------------------------------------------
@@ -309,6 +390,18 @@ class MetricsCollector:
309
  *,
310
  session_id: Any,
311
  participant_id: Any,
 
 
 
 
 
 
 
 
 
 
 
 
312
  ) -> None:
313
  normalized_session = _normalize(session_id)
314
  normalized_participant = _normalize(participant_id)
@@ -322,6 +415,18 @@ class MetricsCollector:
322
  transcript: str,
323
  *,
324
  is_final: bool,
 
 
 
 
 
 
 
 
 
 
 
 
325
  ) -> None:
326
  if not is_final:
327
  return
@@ -364,6 +469,22 @@ class MetricsCollector:
364
  content: Any,
365
  event_created_at: Optional[float] = None,
366
  item_created_at: Optional[float] = None,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
367
  ) -> None:
368
  if role not in {"user", "assistant"}:
369
  return
@@ -408,6 +529,20 @@ class MetricsCollector:
408
  function_calls: list[Any],
409
  function_call_outputs: list[Any],
410
  created_at: float,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
  ) -> None:
412
  trace_turn = await self._tracer.attach_function_tools_executed(
413
  function_calls=function_calls,
@@ -418,12 +553,18 @@ class MetricsCollector:
418
  await self._tracer.maybe_finalize(trace_turn)
419
 
420
  async def on_tool_step_started(self) -> bool:
 
 
 
421
  trace_turn, should_announce = await self._tracer.attach_tool_step_started()
422
  await self._publish_partial_turn_pipeline_summary(trace_turn)
423
  await self._tracer.maybe_finalize(trace_turn)
424
  return should_announce
425
 
426
  async def on_speech_created(self, speech_handle: Any) -> None:
 
 
 
427
  speech_id = _normalize(getattr(speech_handle, "id", None))
428
  if speech_id:
429
  self._pending_speech_ids_for_first_audio.append(speech_id)
@@ -435,6 +576,7 @@ class MetricsCollector:
435
  await self._on_assistant_text(
436
  assistant_text,
437
  event_created_at=assistant_created_at,
 
438
  )
439
 
440
  add_done_callback = getattr(speech_handle, "add_done_callback", None)
@@ -444,18 +586,15 @@ class MetricsCollector:
444
  def _on_done(handle: Any) -> None:
445
  try:
446
  done_speech_id = _normalize(getattr(handle, "id", None))
447
- if done_speech_id:
448
- self._discard_pending_speech_id(done_speech_id)
449
  text, created_at = _extract_latest_assistant_chat_item(
450
  getattr(handle, "chat_items", [])
451
  )
452
- if text:
453
- asyncio.create_task(
454
- self._on_assistant_text(
455
- text,
456
- event_created_at=created_at,
457
- )
458
- )
459
  except Exception:
460
  return
461
 
@@ -464,11 +603,38 @@ class MetricsCollector:
464
  except Exception:
465
  return
466
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
467
  async def on_agent_state_changed(
468
  self,
469
  *,
470
  old_state: str,
471
  new_state: str,
 
 
 
 
 
 
 
 
 
 
 
 
472
  ) -> None:
473
  if new_state != "speaking":
474
  return
@@ -498,6 +664,10 @@ class MetricsCollector:
498
  old_state,
499
  new_state,
500
  )
 
 
 
 
501
 
502
  async def on_tts_synthesized(
503
  self,
@@ -505,6 +675,20 @@ class MetricsCollector:
505
  ttfb: float,
506
  duration: float,
507
  audio_duration: float,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
508
  ) -> None:
509
  if ttfb < 0:
510
  return
@@ -558,6 +742,21 @@ class MetricsCollector:
558
  metrics.EOUMetrics,
559
  metrics.VADMetrics,
560
  ],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
561
  ) -> None:
562
  speech_id = None
563
  turn_metrics = None
@@ -595,8 +794,6 @@ class MetricsCollector:
595
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
596
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
597
  self._latest_agent_speech_id = speech_id
598
- if self._pending_agent_transcripts and not turn_metrics.transcript:
599
- turn_metrics.transcript = self._pending_agent_transcripts.popleft()
600
  turn_metrics.llm = LLMMetrics(
601
  type=collected_metrics.type,
602
  label=collected_metrics.label,
@@ -727,12 +924,98 @@ class MetricsCollector:
727
  await self._tracer.maybe_finalize(trace_turn)
728
 
729
  async def wait_for_pending_trace_tasks(self) -> None:
 
730
  await self._tracer.wait_for_pending_tasks()
731
 
 
 
 
 
 
 
 
 
732
  # ------------------------------------------------------------------
733
  # Internal helpers
734
  # ------------------------------------------------------------------
735
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
736
  def _get_or_create_state(self, speech_id: str) -> TurnState:
737
  if speech_id not in self._turns:
738
  self._turns[speech_id] = TurnState()
@@ -753,14 +1036,15 @@ class MetricsCollector:
753
  assistant_text: str,
754
  *,
755
  event_created_at: Optional[float] = None,
 
756
  ) -> None:
757
  normalized = assistant_text.strip()
758
  if not normalized:
759
  return
760
- _append_if_new(self._pending_agent_transcripts, normalized)
761
  trace_turn = await self._tracer.attach_assistant_text(
762
  normalized,
763
  event_created_at=event_created_at,
 
764
  )
765
  await self._tracer.maybe_finalize(trace_turn)
766
 
@@ -840,7 +1124,7 @@ class MetricsCollector:
840
 
841
  def _current_open_user_utterance(self) -> Optional[PendingUserUtterance]:
842
  utterance = self._latest_user_utterance()
843
- if utterance is None or utterance.committed:
844
  return None
845
  return utterance
846
 
 
12
  from collections import deque
13
  from dataclasses import asdict, dataclass
14
  from time import monotonic, time
15
+ from typing import Any, Awaitable, Callable, Optional, Sequence, Union
16
 
17
  from livekit import rtc
18
  from livekit.agents import metrics
 
213
  watchdog_id: Optional[str] = None
214
 
215
 
216
+ @dataclass
217
+ class QueuedCollectorEvent:
218
+ """FIFO collector event that must be processed in-order."""
219
+
220
+ handler: Callable[..., Awaitable[Any]]
221
+ args: tuple[Any, ...]
222
+ kwargs: dict[str, Any]
223
+ waiter: asyncio.Future[Any] | None = None
224
+
225
+
226
  # ------------------------------------------------------------------
227
  # Facade
228
  # ------------------------------------------------------------------
 
271
  self._latest_vad_metrics: Optional[VADMetrics] = None
272
  self._latest_vad_metric_attributes: Optional[dict[str, Any]] = None
273
  self._first_final_user_turn_logged = False
274
+ self._event_queue: deque[QueuedCollectorEvent] = deque()
275
+ self._event_worker_task: asyncio.Task[None] | None = None
276
+ self._event_loop: asyncio.AbstractEventLoop | None = None
277
  self._llm_stall_timeout_sec = max(
278
  float(
279
  getattr(
 
284
  ),
285
  0.0,
286
  )
287
+ self._shutdown_drain_timeout_sec = (
288
+ max(
289
+ float(
290
+ getattr(
291
+ settings.langfuse,
292
+ "LANGFUSE_SHUTDOWN_DRAIN_TIMEOUT_MS",
293
+ 3000.0,
294
+ )
295
+ ),
296
+ 0.0,
297
+ )
298
+ / 1000.0
299
+ )
300
 
301
  self._tracer = TurnTracer(
302
  publisher=self._publisher,
 
326
  def _trace_post_tool_response_timeout_sec(self, value: float) -> None:
327
  self._tracer._trace_post_tool_response_timeout_sec = value
328
 
329
+ def submit_metrics_collected(self, collected_metrics: Any) -> None:
330
+ self._submit_serialized(self._handle_metrics_collected, collected_metrics)
331
+
332
+ def submit_user_input_transcribed(self, transcript: str, *, is_final: bool) -> None:
333
+ self._submit_serialized(
334
+ self._handle_user_input_transcribed,
335
+ transcript,
336
+ is_final=is_final,
337
+ )
338
+
339
+ def submit_conversation_item_added(
340
+ self,
341
+ *,
342
+ role: Optional[str],
343
+ content: Any,
344
+ event_created_at: Optional[float] = None,
345
+ item_created_at: Optional[float] = None,
346
+ ) -> None:
347
+ self._submit_serialized(
348
+ self._handle_conversation_item_added,
349
+ role=role,
350
+ content=content,
351
+ event_created_at=event_created_at,
352
+ item_created_at=item_created_at,
353
+ )
354
+
355
+ def submit_speech_created(self, speech_handle: Any) -> None:
356
+ self._submit_serialized(self._handle_speech_created, speech_handle)
357
+
358
+ def submit_function_tools_executed(
359
+ self,
360
+ *,
361
+ function_calls: list[Any],
362
+ function_call_outputs: list[Any],
363
+ created_at: float,
364
+ ) -> None:
365
+ self._submit_serialized(
366
+ self._handle_function_tools_executed,
367
+ function_calls=function_calls,
368
+ function_call_outputs=function_call_outputs,
369
+ created_at=created_at,
370
+ )
371
+
372
+ def submit_agent_state_changed(
373
+ self,
374
+ *,
375
+ old_state: str,
376
+ new_state: str,
377
+ ) -> None:
378
+ self._submit_serialized(
379
+ self._handle_agent_state_changed,
380
+ old_state=old_state,
381
+ new_state=new_state,
382
+ )
383
+
384
  # ------------------------------------------------------------------
385
  # Public event handlers
386
  # ------------------------------------------------------------------
 
390
  *,
391
  session_id: Any,
392
  participant_id: Any,
393
+ ) -> None:
394
+ await self._call_serialized(
395
+ self._handle_session_metadata,
396
+ session_id=session_id,
397
+ participant_id=participant_id,
398
+ )
399
+
400
+ async def _handle_session_metadata(
401
+ self,
402
+ *,
403
+ session_id: Any,
404
+ participant_id: Any,
405
  ) -> None:
406
  normalized_session = _normalize(session_id)
407
  normalized_participant = _normalize(participant_id)
 
415
  transcript: str,
416
  *,
417
  is_final: bool,
418
+ ) -> None:
419
+ await self._call_serialized(
420
+ self._handle_user_input_transcribed,
421
+ transcript,
422
+ is_final=is_final,
423
+ )
424
+
425
+ async def _handle_user_input_transcribed(
426
+ self,
427
+ transcript: str,
428
+ *,
429
+ is_final: bool,
430
  ) -> None:
431
  if not is_final:
432
  return
 
469
  content: Any,
470
  event_created_at: Optional[float] = None,
471
  item_created_at: Optional[float] = None,
472
+ ) -> None:
473
+ await self._call_serialized(
474
+ self._handle_conversation_item_added,
475
+ role=role,
476
+ content=content,
477
+ event_created_at=event_created_at,
478
+ item_created_at=item_created_at,
479
+ )
480
+
481
+ async def _handle_conversation_item_added(
482
+ self,
483
+ *,
484
+ role: Optional[str],
485
+ content: Any,
486
+ event_created_at: Optional[float] = None,
487
+ item_created_at: Optional[float] = None,
488
  ) -> None:
489
  if role not in {"user", "assistant"}:
490
  return
 
529
  function_calls: list[Any],
530
  function_call_outputs: list[Any],
531
  created_at: float,
532
+ ) -> None:
533
+ await self._call_serialized(
534
+ self._handle_function_tools_executed,
535
+ function_calls=function_calls,
536
+ function_call_outputs=function_call_outputs,
537
+ created_at=created_at,
538
+ )
539
+
540
+ async def _handle_function_tools_executed(
541
+ self,
542
+ *,
543
+ function_calls: list[Any],
544
+ function_call_outputs: list[Any],
545
+ created_at: float,
546
  ) -> None:
547
  trace_turn = await self._tracer.attach_function_tools_executed(
548
  function_calls=function_calls,
 
553
  await self._tracer.maybe_finalize(trace_turn)
554
 
555
  async def on_tool_step_started(self) -> bool:
556
+ return await self._call_serialized(self._handle_tool_step_started)
557
+
558
+ async def _handle_tool_step_started(self) -> bool:
559
  trace_turn, should_announce = await self._tracer.attach_tool_step_started()
560
  await self._publish_partial_turn_pipeline_summary(trace_turn)
561
  await self._tracer.maybe_finalize(trace_turn)
562
  return should_announce
563
 
564
  async def on_speech_created(self, speech_handle: Any) -> None:
565
+ await self._call_serialized(self._handle_speech_created, speech_handle)
566
+
567
+ async def _handle_speech_created(self, speech_handle: Any) -> None:
568
  speech_id = _normalize(getattr(speech_handle, "id", None))
569
  if speech_id:
570
  self._pending_speech_ids_for_first_audio.append(speech_id)
 
576
  await self._on_assistant_text(
577
  assistant_text,
578
  event_created_at=assistant_created_at,
579
+ speech_id=speech_id,
580
  )
581
 
582
  add_done_callback = getattr(speech_handle, "add_done_callback", None)
 
586
  def _on_done(handle: Any) -> None:
587
  try:
588
  done_speech_id = _normalize(getattr(handle, "id", None))
 
 
589
  text, created_at = _extract_latest_assistant_chat_item(
590
  getattr(handle, "chat_items", [])
591
  )
592
+ self._submit_serialized(
593
+ self._handle_speech_done,
594
+ done_speech_id,
595
+ text,
596
+ created_at,
597
+ )
 
598
  except Exception:
599
  return
600
 
 
603
  except Exception:
604
  return
605
 
606
+ async def _handle_speech_done(
607
+ self,
608
+ speech_id: Optional[str],
609
+ assistant_text: str,
610
+ event_created_at: Optional[float],
611
+ ) -> None:
612
+ if speech_id:
613
+ self._discard_pending_speech_id(speech_id)
614
+ if assistant_text:
615
+ await self._on_assistant_text(
616
+ assistant_text,
617
+ event_created_at=event_created_at,
618
+ speech_id=speech_id,
619
+ )
620
+
621
  async def on_agent_state_changed(
622
  self,
623
  *,
624
  old_state: str,
625
  new_state: str,
626
+ ) -> None:
627
+ await self._call_serialized(
628
+ self._handle_agent_state_changed,
629
+ old_state=old_state,
630
+ new_state=new_state,
631
+ )
632
+
633
+ async def _handle_agent_state_changed(
634
+ self,
635
+ *,
636
+ old_state: str,
637
+ new_state: str,
638
  ) -> None:
639
  if new_state != "speaking":
640
  return
 
664
  old_state,
665
  new_state,
666
  )
667
+ await self._tracer.mark_first_audio_started(
668
+ speech_id=speech_id,
669
+ started_at=time(),
670
+ )
671
 
672
  async def on_tts_synthesized(
673
  self,
 
675
  ttfb: float,
676
  duration: float,
677
  audio_duration: float,
678
+ ) -> None:
679
+ await self._call_serialized(
680
+ self._handle_tts_synthesized,
681
+ ttfb=ttfb,
682
+ duration=duration,
683
+ audio_duration=audio_duration,
684
+ )
685
+
686
+ async def _handle_tts_synthesized(
687
+ self,
688
+ *,
689
+ ttfb: float,
690
+ duration: float,
691
+ audio_duration: float,
692
  ) -> None:
693
  if ttfb < 0:
694
  return
 
742
  metrics.EOUMetrics,
743
  metrics.VADMetrics,
744
  ],
745
+ ) -> None:
746
+ await self._call_serialized(
747
+ self._handle_metrics_collected,
748
+ collected_metrics,
749
+ )
750
+
751
+ async def _handle_metrics_collected(
752
+ self,
753
+ collected_metrics: Union[
754
+ metrics.STTMetrics,
755
+ metrics.LLMMetrics,
756
+ metrics.TTSMetrics,
757
+ metrics.EOUMetrics,
758
+ metrics.VADMetrics,
759
+ ],
760
  ) -> None:
761
  speech_id = None
762
  turn_metrics = None
 
794
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
795
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
796
  self._latest_agent_speech_id = speech_id
 
 
797
  turn_metrics.llm = LLMMetrics(
798
  type=collected_metrics.type,
799
  label=collected_metrics.label,
 
924
  await self._tracer.maybe_finalize(trace_turn)
925
 
926
  async def wait_for_pending_trace_tasks(self) -> None:
927
+ await self._wait_for_event_queue_idle()
928
  await self._tracer.wait_for_pending_tasks()
929
 
930
+ async def drain_pending_traces(self) -> None:
931
+ if self._shutdown_drain_timeout_sec <= 0.0:
932
+ return
933
+ await asyncio.wait_for(
934
+ self._drain_pending_traces_once(),
935
+ timeout=self._shutdown_drain_timeout_sec,
936
+ )
937
+
938
  # ------------------------------------------------------------------
939
  # Internal helpers
940
  # ------------------------------------------------------------------
941
 
942
+ async def _drain_pending_traces_once(self) -> None:
943
+ await self._wait_for_event_queue_idle()
944
+ await self._tracer.drain_pending_turns()
945
+ await self._tracer.wait_for_pending_tasks()
946
+
947
+ async def _call_serialized(
948
+ self,
949
+ handler: Callable[..., Awaitable[Any]],
950
+ *args: Any,
951
+ **kwargs: Any,
952
+ ) -> Any:
953
+ loop = asyncio.get_running_loop()
954
+ waiter: asyncio.Future[Any] = loop.create_future()
955
+ self._enqueue_serialized(handler, args=args, kwargs=kwargs, waiter=waiter)
956
+ return await waiter
957
+
958
+ def _submit_serialized(
959
+ self,
960
+ handler: Callable[..., Awaitable[Any]],
961
+ *args: Any,
962
+ **kwargs: Any,
963
+ ) -> None:
964
+ self._enqueue_serialized(handler, args=args, kwargs=kwargs, waiter=None)
965
+
966
+ def _enqueue_serialized(
967
+ self,
968
+ handler: Callable[..., Awaitable[Any]],
969
+ *,
970
+ args: tuple[Any, ...],
971
+ kwargs: dict[str, Any],
972
+ waiter: asyncio.Future[Any] | None,
973
+ ) -> None:
974
+ loop = asyncio.get_running_loop()
975
+ if self._event_loop is None:
976
+ self._event_loop = loop
977
+ elif self._event_loop is not loop:
978
+ raise RuntimeError("MetricsCollector cannot be shared across event loops")
979
+
980
+ self._event_queue.append(
981
+ QueuedCollectorEvent(
982
+ handler=handler,
983
+ args=args,
984
+ kwargs=kwargs,
985
+ waiter=waiter,
986
+ )
987
+ )
988
+ if self._event_worker_task is None:
989
+ self._event_worker_task = loop.create_task(self._run_event_worker())
990
+
991
+ async def _run_event_worker(self) -> None:
992
+ while True:
993
+ if not self._event_queue:
994
+ self._event_worker_task = None
995
+ return
996
+
997
+ event = self._event_queue.popleft()
998
+ try:
999
+ result = await event.handler(*event.args, **event.kwargs)
1000
+ except Exception as exc:
1001
+ if event.waiter is not None and not event.waiter.done():
1002
+ event.waiter.set_exception(exc)
1003
+ else:
1004
+ logger.exception(
1005
+ "Metrics collector event processing failed: handler=%s",
1006
+ getattr(event.handler, "__name__", repr(event.handler)),
1007
+ )
1008
+ else:
1009
+ if event.waiter is not None and not event.waiter.done():
1010
+ event.waiter.set_result(result)
1011
+
1012
+ async def _wait_for_event_queue_idle(self) -> None:
1013
+ while self._event_worker_task is not None:
1014
+ task = self._event_worker_task
1015
+ await asyncio.gather(task, return_exceptions=True)
1016
+ if self._event_worker_task is task:
1017
+ break
1018
+
1019
  def _get_or_create_state(self, speech_id: str) -> TurnState:
1020
  if speech_id not in self._turns:
1021
  self._turns[speech_id] = TurnState()
 
1036
  assistant_text: str,
1037
  *,
1038
  event_created_at: Optional[float] = None,
1039
+ speech_id: Optional[str] = None,
1040
  ) -> None:
1041
  normalized = assistant_text.strip()
1042
  if not normalized:
1043
  return
 
1044
  trace_turn = await self._tracer.attach_assistant_text(
1045
  normalized,
1046
  event_created_at=event_created_at,
1047
+ speech_id=speech_id,
1048
  )
1049
  await self._tracer.maybe_finalize(trace_turn)
1050
 
 
1124
 
1125
  def _current_open_user_utterance(self) -> Optional[PendingUserUtterance]:
1126
  utterance = self._latest_user_utterance()
1127
+ if utterance is None or utterance.committed or utterance.llm_started:
1128
  return None
1129
  return utterance
1130
 
src/agent/traces/turn_tracer.py CHANGED
@@ -76,6 +76,11 @@ class TraceTurn:
76
  tts_updated_order: Optional[int] = None
77
  event_counter: int = 0
78
  tool_post_response_missing: bool = False
 
 
 
 
 
79
  coalesced_turn_ids: list[str] = field(default_factory=list)
80
  coalesced_user_transcripts: list[str] = field(default_factory=list)
81
  coalesced_fragment_count: int = 0
@@ -130,6 +135,14 @@ class TTSCallTrace:
130
  first_audio_at: Optional[float] = None
131
 
132
 
 
 
 
 
 
 
 
 
133
  @dataclass
134
  class TimelineEvent:
135
  """Ordered event for building trace phases."""
@@ -193,6 +206,10 @@ class TurnTracer:
193
  self._pending_agent_transcripts = pending_agent_transcripts
194
 
195
  self._pending_trace_turns: deque[TraceTurn] = deque()
 
 
 
 
196
  self._trace_lock = asyncio.Lock()
197
  self._trace_emit_tasks: set[asyncio.Task[None]] = set()
198
  self._trace_finalize_tasks: dict[str, asyncio.Task[None]] = {}
@@ -305,33 +322,38 @@ class TurnTracer:
305
  # ------------------------------------------------------------------
306
 
307
  async def create_turn(self, *, user_transcript: str, room_id: str) -> None:
 
308
  async with self._trace_lock:
309
  normalized = user_transcript.strip()
310
  if not normalized:
311
  return
312
 
 
 
313
  current_turn = self._latest_turn_where(lambda c: not c.user_turn_committed)
314
  if current_turn is not None:
315
  self._update_user_turn_text(current_turn, normalized)
316
- return
 
 
 
 
 
 
 
 
 
317
 
318
- new_turn = TraceTurn(
319
- turn_id=str(uuid.uuid4()),
320
- session_id=self._session_id,
321
- room_id=room_id,
322
- participant_id=self._participant_id,
323
- user_transcript=normalized,
324
- prompt_text=normalized,
325
- )
326
- new_turn.user_transcript_updated_at = new_turn.created_at
327
 
328
- coalesced_turn = self._coalesced_turn_candidate()
329
- if coalesced_turn is not None:
330
- self._absorb_coalesced_turn_metadata(new_turn, coalesced_turn)
331
- self._pending_trace_turns.remove(coalesced_turn)
332
- self._cancel_finalize_timeout(coalesced_turn.turn_id)
333
 
334
- self._pending_trace_turns.append(new_turn)
 
335
 
336
  async def attach_user_text(
337
  self,
@@ -450,6 +472,8 @@ class TurnTracer:
450
  turn.prompt_text = turn.user_transcript
451
  if normalized_speech_id and turn.speech_id is None:
452
  turn.speech_id = normalized_speech_id
 
 
453
 
454
  llm_attrs = _sanitize_component_attributes(metric_attributes)
455
  order = self._next_event_order(turn)
@@ -496,6 +520,8 @@ class TurnTracer:
496
 
497
  if normalized_speech_id and turn.speech_id is None:
498
  turn.speech_id = normalized_speech_id
 
 
499
 
500
  tts_attrs = _sanitize_component_attributes(metric_attributes)
501
  order = self._next_event_order(turn)
@@ -555,31 +581,47 @@ class TurnTracer:
555
  assistant_text: str,
556
  *,
557
  event_created_at: Optional[float] = None,
 
558
  ) -> Optional[TraceTurn]:
559
  async with self._trace_lock:
560
- turn = self._latest_turn_where(lambda c: bool(c.llm_calls))
561
- if not turn:
562
- turn = self._latest_turn_where(lambda _: True)
563
- if not turn:
564
- return None
565
  normalized_text = assistant_text.strip()
566
  if not normalized_text:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
567
  return turn
568
- previous_assistant_text = turn.assistant_text or turn.response_text
569
- order = self._next_event_order(turn)
570
- turn.assistant_text = normalized_text
571
- turn.response_text = normalized_text
572
- assistant_event_created_at = _to_optional_float(event_created_at)
573
- turn.assistant_text_updated_at = _resolved_event_timestamp(
574
- assistant_event_created_at
575
  )
576
- turn.assistant_text_updated_order = order
577
- _reconcile_assistant_text_with_tts_calls(
578
- turn=turn,
579
- assistant_text=normalized_text,
580
- previous_assistant_text=previous_assistant_text,
 
 
 
 
 
 
581
  )
582
- self._maybe_close_tool_phase(turn)
583
  return turn
584
 
585
  async def attach_tool_step_started(self) -> tuple[Optional[TraceTurn], bool]:
@@ -651,6 +693,55 @@ class TurnTracer:
651
  self._maybe_close_tool_phase(turn)
652
  return turn
653
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
654
  # ------------------------------------------------------------------
655
  # Finalization
656
  # ------------------------------------------------------------------
@@ -725,6 +816,8 @@ class TurnTracer:
725
  return False
726
  if not turn.user_transcript.strip():
727
  return False
 
 
728
  if turn.assistant_text.strip() or turn.response_text.strip():
729
  return False
730
  if turn.tool_step_announced or turn.tool_executions or turn.last_tool_event_order is not None:
@@ -768,6 +861,13 @@ class TurnTracer:
768
  new_turn.coalesced_fragment_count = (
769
  absorbed_turn.coalesced_fragment_count + 1
770
  )
 
 
 
 
 
 
 
771
 
772
  def _next_turn_where(
773
  self,
@@ -813,6 +913,132 @@ class TurnTracer:
813
  return matched_without_id
814
  return self._latest_turn_where(lambda c: bool(c.llm_calls))
815
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
816
  def _maybe_update_perceived_second_audio_latency(
817
  self,
818
  turn: TraceTurn,
@@ -985,6 +1211,7 @@ class TurnTracer:
985
  min_observed_order: Optional[int] = None,
986
  include_pending_agent_transcripts: bool = True,
987
  ) -> str:
 
988
  if turn.assistant_text.strip():
989
  if (
990
  min_observed_order is None
@@ -1003,6 +1230,16 @@ class TurnTracer:
1003
  )
1004
  ):
1005
  return turn.response_text.strip()
 
 
 
 
 
 
 
 
 
 
1006
  for tts_call in reversed(turn.tts_calls):
1007
  if (
1008
  min_observed_order is not None
@@ -1011,10 +1248,44 @@ class TurnTracer:
1011
  continue
1012
  if tts_call.assistant_text.strip():
1013
  return tts_call.assistant_text.strip()
 
 
 
1014
  if include_pending_agent_transcripts and self._pending_agent_transcripts:
1015
  return self._pending_agent_transcripts.popleft().strip()
1016
  return ""
1017
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1018
  # ------------------------------------------------------------------
1019
  # Timeout scheduling
1020
  # ------------------------------------------------------------------
@@ -2155,6 +2426,9 @@ def _set_root_attributes(
2155
  "langfuse.trace.metadata.tool_phase_announced": turn.tool_step_announced,
2156
  "langfuse.trace.metadata.tool_post_response_missing": turn.tool_post_response_missing,
2157
  "langfuse.trace.metadata.user_turn_committed": turn.user_turn_committed,
 
 
 
2158
  "langfuse.trace.metadata.coalesced_turn_count": len(turn.coalesced_turn_ids),
2159
  "langfuse.trace.metadata.coalesced_fragment_count": turn.coalesced_fragment_count,
2160
  "langfuse.trace.metadata.coalesced_turn_ids": turn.coalesced_turn_ids,
 
76
  tts_updated_order: Optional[int] = None
77
  event_counter: int = 0
78
  tool_post_response_missing: bool = False
79
+ assistant_audio_started: bool = False
80
+ assistant_audio_started_at: Optional[float] = None
81
+ interrupted: bool = False
82
+ interrupted_reason: Optional[str] = None
83
+ orphan_assistant_cutoff_at: Optional[float] = None
84
  coalesced_turn_ids: list[str] = field(default_factory=list)
85
  coalesced_user_transcripts: list[str] = field(default_factory=list)
86
  coalesced_fragment_count: int = 0
 
135
  first_audio_at: Optional[float] = None
136
 
137
 
138
+ @dataclass
139
+ class AssistantTextRecord:
140
+ """Buffered assistant text that has not been correlated safely yet."""
141
+
142
+ text: str
143
+ event_created_at: Optional[float] = None
144
+
145
+
146
  @dataclass
147
  class TimelineEvent:
148
  """Ordered event for building trace phases."""
 
206
  self._pending_agent_transcripts = pending_agent_transcripts
207
 
208
  self._pending_trace_turns: deque[TraceTurn] = deque()
209
+ self._pending_agent_transcripts_by_speech_id: dict[
210
+ str, deque[AssistantTextRecord]
211
+ ] = {}
212
+ self._orphan_assistant_text_records: deque[AssistantTextRecord] = deque()
213
  self._trace_lock = asyncio.Lock()
214
  self._trace_emit_tasks: set[asyncio.Task[None]] = set()
215
  self._trace_finalize_tasks: dict[str, asyncio.Task[None]] = {}
 
322
  # ------------------------------------------------------------------
323
 
324
  async def create_turn(self, *, user_transcript: str, room_id: str) -> None:
325
+ completed_turns: list[TraceTurn] = []
326
  async with self._trace_lock:
327
  normalized = user_transcript.strip()
328
  if not normalized:
329
  return
330
 
331
+ completed_turns = self._finalize_interrupted_turns_before_new_user_turn_locked()
332
+
333
  current_turn = self._latest_turn_where(lambda c: not c.user_turn_committed)
334
  if current_turn is not None:
335
  self._update_user_turn_text(current_turn, normalized)
336
+ else:
337
+ new_turn = TraceTurn(
338
+ turn_id=str(uuid.uuid4()),
339
+ session_id=self._session_id,
340
+ room_id=room_id,
341
+ participant_id=self._participant_id,
342
+ user_transcript=normalized,
343
+ prompt_text=normalized,
344
+ )
345
+ new_turn.user_transcript_updated_at = new_turn.created_at
346
 
347
+ coalesced_turn = self._coalesced_turn_candidate()
348
+ if coalesced_turn is not None:
349
+ self._absorb_coalesced_turn_metadata(new_turn, coalesced_turn)
350
+ self._pending_trace_turns.remove(coalesced_turn)
351
+ self._cancel_finalize_timeout(coalesced_turn.turn_id)
 
 
 
 
352
 
353
+ self._pending_trace_turns.append(new_turn)
 
 
 
 
354
 
355
+ for completed_turn in completed_turns:
356
+ self._schedule_trace_emit(completed_turn)
357
 
358
  async def attach_user_text(
359
  self,
 
472
  turn.prompt_text = turn.user_transcript
473
  if normalized_speech_id and turn.speech_id is None:
474
  turn.speech_id = normalized_speech_id
475
+ if normalized_speech_id:
476
+ self._apply_buffered_assistant_text_for_speech_id(turn)
477
 
478
  llm_attrs = _sanitize_component_attributes(metric_attributes)
479
  order = self._next_event_order(turn)
 
520
 
521
  if normalized_speech_id and turn.speech_id is None:
522
  turn.speech_id = normalized_speech_id
523
+ if normalized_speech_id:
524
+ self._apply_buffered_assistant_text_for_speech_id(turn)
525
 
526
  tts_attrs = _sanitize_component_attributes(metric_attributes)
527
  order = self._next_event_order(turn)
 
581
  assistant_text: str,
582
  *,
583
  event_created_at: Optional[float] = None,
584
+ speech_id: Optional[str] = None,
585
  ) -> Optional[TraceTurn]:
586
  async with self._trace_lock:
 
 
 
 
 
587
  normalized_text = assistant_text.strip()
588
  if not normalized_text:
589
+ return None
590
+
591
+ normalized_speech_id = _normalize_optional_str(speech_id)
592
+ resolved_event_created_at = _to_optional_float(event_created_at)
593
+
594
+ if normalized_speech_id:
595
+ turn = self._resolve_turn_for_exact_speech_id(normalized_speech_id)
596
+ if turn is None:
597
+ self._buffer_assistant_text(
598
+ normalized_text,
599
+ event_created_at=resolved_event_created_at,
600
+ speech_id=normalized_speech_id,
601
+ )
602
+ return None
603
+ self._apply_assistant_text_to_turn(
604
+ turn,
605
+ normalized_text,
606
+ event_created_at=resolved_event_created_at,
607
+ )
608
  return turn
609
+
610
+ turn = self._select_turn_for_orphan_assistant_text(
611
+ event_created_at=resolved_event_created_at
 
 
 
 
612
  )
613
+ if turn is None:
614
+ self._buffer_assistant_text(
615
+ normalized_text,
616
+ event_created_at=resolved_event_created_at,
617
+ )
618
+ return None
619
+
620
+ self._apply_assistant_text_to_turn(
621
+ turn,
622
+ normalized_text,
623
+ event_created_at=resolved_event_created_at,
624
  )
 
625
  return turn
626
 
627
  async def attach_tool_step_started(self) -> tuple[Optional[TraceTurn], bool]:
 
693
  self._maybe_close_tool_phase(turn)
694
  return turn
695
 
696
+ async def mark_first_audio_started(
697
+ self,
698
+ *,
699
+ speech_id: str,
700
+ started_at: Optional[float] = None,
701
+ ) -> Optional[TraceTurn]:
702
+ async with self._trace_lock:
703
+ normalized_speech_id = _normalize_optional_str(speech_id)
704
+ if not normalized_speech_id:
705
+ return None
706
+ turn = self._resolve_turn_for_exact_speech_id(normalized_speech_id)
707
+ if turn is None:
708
+ return None
709
+ turn.assistant_audio_started = True
710
+ turn.assistant_audio_started_at = _resolved_event_timestamp(
711
+ _to_optional_float(started_at)
712
+ )
713
+ return turn
714
+
715
+ async def drain_pending_turns(self) -> None:
716
+ completed_turns: list[TraceTurn] = []
717
+ async with self._trace_lock:
718
+ for turn in list(self._pending_trace_turns):
719
+ self._apply_buffered_assistant_text_for_speech_id(turn)
720
+ self._try_attach_latest_usable_orphan_assistant_text(turn)
721
+ if not (turn.user_transcript and turn.llm_calls and turn.tts_calls):
722
+ continue
723
+ requires_post_tool_response = self._requires_post_tool_follow_up(turn)
724
+ missing_post_tool_assistant = bool(
725
+ requires_post_tool_response
726
+ and not self._post_tool_assistant_observed(turn)
727
+ )
728
+ if turn.interrupted_reason is None:
729
+ turn.interrupted = True
730
+ turn.interrupted_reason = "shutdown_drain"
731
+ completed_turns.append(
732
+ self._finalize_locked(
733
+ turn,
734
+ missing_assistant_fallback=(
735
+ missing_post_tool_assistant or not bool(turn.assistant_text)
736
+ ),
737
+ tool_post_response_missing=requires_post_tool_response,
738
+ drop_assistant_text=missing_post_tool_assistant,
739
+ )
740
+ )
741
+
742
+ for completed_turn in completed_turns:
743
+ self._schedule_trace_emit(completed_turn)
744
+
745
  # ------------------------------------------------------------------
746
  # Finalization
747
  # ------------------------------------------------------------------
 
816
  return False
817
  if not turn.user_transcript.strip():
818
  return False
819
+ if turn.assistant_audio_started:
820
+ return False
821
  if turn.assistant_text.strip() or turn.response_text.strip():
822
  return False
823
  if turn.tool_step_announced or turn.tool_executions or turn.last_tool_event_order is not None:
 
861
  new_turn.coalesced_fragment_count = (
862
  absorbed_turn.coalesced_fragment_count + 1
863
  )
864
+ absorbed_recent_activity = self._turn_recent_activity_at(absorbed_turn)
865
+ existing_cutoff = new_turn.orphan_assistant_cutoff_at
866
+ if absorbed_recent_activity is not None:
867
+ new_turn.orphan_assistant_cutoff_at = max(
868
+ existing_cutoff or absorbed_recent_activity,
869
+ absorbed_recent_activity,
870
+ )
871
 
872
  def _next_turn_where(
873
  self,
 
913
  return matched_without_id
914
  return self._latest_turn_where(lambda c: bool(c.llm_calls))
915
 
916
+ def _resolve_turn_for_exact_speech_id(self, speech_id: str) -> Optional[TraceTurn]:
917
+ matched = self._latest_turn_where(lambda c: c.speech_id == speech_id)
918
+ if matched is not None:
919
+ return matched
920
+ candidates = [
921
+ turn
922
+ for turn in self._pending_trace_turns
923
+ if turn.speech_id is None and bool(turn.llm_calls or turn.tts_calls)
924
+ ]
925
+ if len(candidates) != 1:
926
+ return None
927
+ turn = candidates[0]
928
+ turn.speech_id = speech_id
929
+ return turn
930
+
931
+ def _select_turn_for_orphan_assistant_text(
932
+ self,
933
+ *,
934
+ event_created_at: Optional[float],
935
+ ) -> Optional[TraceTurn]:
936
+ candidates = [
937
+ turn
938
+ for turn in self._pending_trace_turns
939
+ if bool(turn.llm_calls or turn.tts_calls or turn.tool_phase_open)
940
+ ]
941
+ if len(candidates) != 1:
942
+ return None
943
+ turn = candidates[0]
944
+ cutoff = turn.orphan_assistant_cutoff_at
945
+ if cutoff is not None:
946
+ if event_created_at is None:
947
+ return None
948
+ if event_created_at < cutoff:
949
+ return None
950
+ return turn
951
+
952
+ def _apply_assistant_text_to_turn(
953
+ self,
954
+ turn: TraceTurn,
955
+ assistant_text: str,
956
+ *,
957
+ event_created_at: Optional[float],
958
+ ) -> None:
959
+ previous_assistant_text = turn.assistant_text or turn.response_text
960
+ order = self._next_event_order(turn)
961
+ turn.assistant_text = assistant_text
962
+ turn.response_text = assistant_text
963
+ turn.assistant_text_updated_at = _resolved_event_timestamp(event_created_at)
964
+ turn.assistant_text_updated_order = order
965
+ _reconcile_assistant_text_with_tts_calls(
966
+ turn=turn,
967
+ assistant_text=assistant_text,
968
+ previous_assistant_text=previous_assistant_text,
969
+ )
970
+ self._maybe_close_tool_phase(turn)
971
+
972
+ def _buffer_assistant_text(
973
+ self,
974
+ assistant_text: str,
975
+ *,
976
+ event_created_at: Optional[float],
977
+ speech_id: Optional[str] = None,
978
+ ) -> None:
979
+ normalized = assistant_text.strip()
980
+ if not normalized:
981
+ return
982
+ record = AssistantTextRecord(
983
+ text=normalized,
984
+ event_created_at=_to_optional_float(event_created_at),
985
+ )
986
+ normalized_speech_id = _normalize_optional_str(speech_id)
987
+ if normalized_speech_id:
988
+ queue = self._pending_agent_transcripts_by_speech_id.setdefault(
989
+ normalized_speech_id,
990
+ deque(),
991
+ )
992
+ if queue and queue[-1].text == normalized:
993
+ return
994
+ queue.append(record)
995
+ return
996
+ if self._orphan_assistant_text_records and self._orphan_assistant_text_records[-1].text == normalized:
997
+ return
998
+ self._orphan_assistant_text_records.append(record)
999
+
1000
+ def _apply_buffered_assistant_text_for_speech_id(self, turn: TraceTurn) -> None:
1001
+ speech_id = _normalize_optional_str(turn.speech_id)
1002
+ if not speech_id:
1003
+ return
1004
+ queue = self._pending_agent_transcripts_by_speech_id.get(speech_id)
1005
+ if not queue:
1006
+ return
1007
+ while queue:
1008
+ record = queue.popleft()
1009
+ self._apply_assistant_text_to_turn(
1010
+ turn,
1011
+ record.text,
1012
+ event_created_at=record.event_created_at,
1013
+ )
1014
+ if not queue:
1015
+ self._pending_agent_transcripts_by_speech_id.pop(speech_id, None)
1016
+
1017
+ def _try_attach_latest_usable_orphan_assistant_text(
1018
+ self,
1019
+ turn: TraceTurn,
1020
+ ) -> bool:
1021
+ if not self._orphan_assistant_text_records:
1022
+ return False
1023
+ if self._select_turn_for_orphan_assistant_text(
1024
+ event_created_at=self._orphan_assistant_text_records[-1].event_created_at
1025
+ ) is not turn:
1026
+ return False
1027
+ for index in range(len(self._orphan_assistant_text_records) - 1, -1, -1):
1028
+ record = self._orphan_assistant_text_records[index]
1029
+ if self._select_turn_for_orphan_assistant_text(
1030
+ event_created_at=record.event_created_at
1031
+ ) is not turn:
1032
+ continue
1033
+ del self._orphan_assistant_text_records[index]
1034
+ self._apply_assistant_text_to_turn(
1035
+ turn,
1036
+ record.text,
1037
+ event_created_at=record.event_created_at,
1038
+ )
1039
+ return True
1040
+ return False
1041
+
1042
  def _maybe_update_perceived_second_audio_latency(
1043
  self,
1044
  turn: TraceTurn,
 
1211
  min_observed_order: Optional[int] = None,
1212
  include_pending_agent_transcripts: bool = True,
1213
  ) -> str:
1214
+ speech_id = _normalize_optional_str(turn.speech_id)
1215
  if turn.assistant_text.strip():
1216
  if (
1217
  min_observed_order is None
 
1230
  )
1231
  ):
1232
  return turn.response_text.strip()
1233
+ if speech_id:
1234
+ buffered_exact = self._pending_agent_transcripts_by_speech_id.get(speech_id)
1235
+ if buffered_exact:
1236
+ while buffered_exact:
1237
+ record = buffered_exact.popleft()
1238
+ if not record.text.strip():
1239
+ continue
1240
+ if not buffered_exact:
1241
+ self._pending_agent_transcripts_by_speech_id.pop(speech_id, None)
1242
+ return record.text.strip()
1243
  for tts_call in reversed(turn.tts_calls):
1244
  if (
1245
  min_observed_order is not None
 
1248
  continue
1249
  if tts_call.assistant_text.strip():
1250
  return tts_call.assistant_text.strip()
1251
+ if self._try_attach_latest_usable_orphan_assistant_text(turn):
1252
+ if turn.assistant_text.strip():
1253
+ return turn.assistant_text.strip()
1254
  if include_pending_agent_transcripts and self._pending_agent_transcripts:
1255
  return self._pending_agent_transcripts.popleft().strip()
1256
  return ""
1257
 
1258
+ def _finalize_interrupted_turns_before_new_user_turn_locked(self) -> list[TraceTurn]:
1259
+ completed_turns: list[TraceTurn] = []
1260
+ for turn in list(self._pending_trace_turns):
1261
+ if not (turn.user_transcript and turn.llm_calls and turn.tts_calls):
1262
+ continue
1263
+ if not turn.assistant_audio_started:
1264
+ continue
1265
+ turn.interrupted = True
1266
+ if not turn.interrupted_reason:
1267
+ turn.interrupted_reason = "user_barge_in_after_audio_started"
1268
+ requires_post_tool_response = self._requires_post_tool_follow_up(turn)
1269
+ missing_post_tool_assistant = bool(
1270
+ requires_post_tool_response and not self._post_tool_assistant_observed(turn)
1271
+ )
1272
+ completed_turns.append(
1273
+ self._finalize_locked(
1274
+ turn,
1275
+ missing_assistant_fallback=(
1276
+ missing_post_tool_assistant or not bool(turn.assistant_text)
1277
+ ),
1278
+ tool_post_response_missing=requires_post_tool_response,
1279
+ drop_assistant_text=missing_post_tool_assistant,
1280
+ )
1281
+ )
1282
+ return completed_turns
1283
+
1284
+ def _requires_post_tool_follow_up(self, turn: TraceTurn) -> bool:
1285
+ if turn.last_tool_event_order is None:
1286
+ return False
1287
+ return self._requires_post_tool_response(turn)
1288
+
1289
  # ------------------------------------------------------------------
1290
  # Timeout scheduling
1291
  # ------------------------------------------------------------------
 
2426
  "langfuse.trace.metadata.tool_phase_announced": turn.tool_step_announced,
2427
  "langfuse.trace.metadata.tool_post_response_missing": turn.tool_post_response_missing,
2428
  "langfuse.trace.metadata.user_turn_committed": turn.user_turn_committed,
2429
+ "langfuse.trace.metadata.assistant_audio_started": turn.assistant_audio_started,
2430
+ "langfuse.trace.metadata.interrupted": turn.interrupted,
2431
+ "langfuse.trace.metadata.interrupted_reason": turn.interrupted_reason,
2432
  "langfuse.trace.metadata.coalesced_turn_count": len(turn.coalesced_turn_ids),
2433
  "langfuse.trace.metadata.coalesced_fragment_count": turn.coalesced_fragment_count,
2434
  "langfuse.trace.metadata.coalesced_turn_ids": turn.coalesced_turn_ids,
src/core/settings.py CHANGED
@@ -422,6 +422,15 @@ class LangfuseSettings(CoreSettings):
422
  le=10000.0,
423
  description="Best-effort tracer flush timeout in milliseconds",
424
  )
 
 
 
 
 
 
 
 
 
425
  LANGFUSE_CONTINUATION_COALESCE_WINDOW_MS: float = Field(
426
  default=1500.0,
427
  ge=0.0,
 
422
  le=10000.0,
423
  description="Best-effort tracer flush timeout in milliseconds",
424
  )
425
+ LANGFUSE_SHUTDOWN_DRAIN_TIMEOUT_MS: float = Field(
426
+ default=3000.0,
427
+ ge=0.0,
428
+ le=15000.0,
429
+ description=(
430
+ "Maximum time to drain pending Langfuse trace finalization/emission work "
431
+ "during shutdown before exiting"
432
+ ),
433
+ )
434
  LANGFUSE_CONTINUATION_COALESCE_WINDOW_MS: float = Field(
435
  default=1500.0,
436
  ge=0.0,
tests/test_langfuse_turn_tracing.py CHANGED
@@ -1758,6 +1758,7 @@ def test_immediate_continuation_coalesces_aborted_prior_turn(
1758
  )
1759
 
1760
  async def _run() -> None:
 
1761
  await collector.on_session_metadata(
1762
  session_id="session-coalesce",
1763
  participant_id="web-123",
@@ -1779,9 +1780,16 @@ def test_immediate_continuation_coalesces_aborted_prior_turn(
1779
  await collector.on_conversation_item_added(
1780
  role="user",
1781
  content="What the difference between speech to text and speech recognition?",
 
 
1782
  )
1783
  await collector.on_metrics_collected(_make_llm_metrics("speech-b"))
1784
- await collector.on_conversation_item_added(role="assistant", content="Speech to text writes words down.")
 
 
 
 
 
1785
  await collector.on_metrics_collected(_make_tts_metrics("speech-b"))
1786
  await collector.wait_for_pending_trace_tasks()
1787
 
@@ -2110,7 +2118,10 @@ def test_speech_created_done_callback_backfills_assistant_text(
2110
  await collector.wait_for_pending_trace_tasks()
2111
  assert not fake_tracer.spans
2112
 
2113
- handle = _FakeSpeechHandle(chat_items=[_FakeChatItem(role="assistant", content=["fallback reply"])])
 
 
 
2114
  await collector.on_speech_created(handle)
2115
  handle.trigger_done()
2116
  await asyncio.sleep(0)
@@ -2159,7 +2170,8 @@ def test_speech_created_immediate_capture_backfills_assistant_text(
2159
  role="assistant",
2160
  content=[_FakeTextMethodPart("immediate fallback reply")],
2161
  )
2162
- ]
 
2163
  )
2164
  await collector.on_speech_created(handle)
2165
  await collector.wait_for_pending_trace_tasks()
@@ -2480,3 +2492,239 @@ def test_real_participant_metadata_overrides_fallback_for_pending_turns(
2480
  assert len(turn_spans) == 1
2481
  assert turn_spans[0].attributes["session_id"] == "session-real-participant"
2482
  assert turn_spans[0].attributes["participant_id"] == "web-real-participant"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1758
  )
1759
 
1760
  async def _run() -> None:
1761
+ base_time = time.time()
1762
  await collector.on_session_metadata(
1763
  session_id="session-coalesce",
1764
  participant_id="web-123",
 
1780
  await collector.on_conversation_item_added(
1781
  role="user",
1782
  content="What the difference between speech to text and speech recognition?",
1783
+ event_created_at=base_time + 0.2,
1784
+ item_created_at=base_time + 0.2,
1785
  )
1786
  await collector.on_metrics_collected(_make_llm_metrics("speech-b"))
1787
+ await collector.on_conversation_item_added(
1788
+ role="assistant",
1789
+ content="Speech to text writes words down.",
1790
+ event_created_at=base_time + 0.4,
1791
+ item_created_at=base_time + 0.4,
1792
+ )
1793
  await collector.on_metrics_collected(_make_tts_metrics("speech-b"))
1794
  await collector.wait_for_pending_trace_tasks()
1795
 
 
2118
  await collector.wait_for_pending_trace_tasks()
2119
  assert not fake_tracer.spans
2120
 
2121
+ handle = _FakeSpeechHandle(
2122
+ chat_items=[_FakeChatItem(role="assistant", content=["fallback reply"])],
2123
+ speech_id="speech-speech-created",
2124
+ )
2125
  await collector.on_speech_created(handle)
2126
  handle.trigger_done()
2127
  await asyncio.sleep(0)
 
2170
  role="assistant",
2171
  content=[_FakeTextMethodPart("immediate fallback reply")],
2172
  )
2173
+ ],
2174
+ speech_id="speech-speech-created-immediate",
2175
  )
2176
  await collector.on_speech_created(handle)
2177
  await collector.wait_for_pending_trace_tasks()
 
2492
  assert len(turn_spans) == 1
2493
  assert turn_spans[0].attributes["session_id"] == "session-real-participant"
2494
  assert turn_spans[0].attributes["participant_id"] == "web-real-participant"
2495
+
2496
+
2497
+ def test_stale_orphan_assistant_text_from_absorbed_turn_is_not_attached_to_continuation(
2498
+ monkeypatch: pytest.MonkeyPatch,
2499
+ ) -> None:
2500
+ import src.agent.traces.metrics_collector as metrics_collector_module
2501
+
2502
+ fake_tracer = _FakeTracer()
2503
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2504
+
2505
+ room = _FakeRoom()
2506
+ collector = MetricsCollector(
2507
+ room=room, # type: ignore[arg-type]
2508
+ model_name="moonshine",
2509
+ room_name=room.name,
2510
+ room_id="RM123",
2511
+ participant_id="web-123",
2512
+ langfuse_enabled=True,
2513
+ )
2514
+
2515
+ base_time = time.time()
2516
+ final_input = "What is the difference between speech to text and speech recognition?"
2517
+ final_output = (
2518
+ "Speech recognition detects spoken words. Speech to text writes them down."
2519
+ )
2520
+
2521
+ async def _run() -> None:
2522
+ await collector.on_session_metadata(
2523
+ session_id="session-stale-orphan-after-coalesce",
2524
+ participant_id="web-123",
2525
+ )
2526
+ await collector.on_user_input_transcribed("What", is_final=True)
2527
+ await collector.on_metrics_collected(
2528
+ _make_eou_metrics("speech-a", delay=0.7, transcription_delay=0.2)
2529
+ )
2530
+ await collector.on_metrics_collected(_make_llm_metrics("speech-a"))
2531
+ await collector.on_metrics_collected(_make_tts_metrics("speech-a"))
2532
+
2533
+ await collector.on_user_input_transcribed(
2534
+ "is the difference between speech to text and speech recognition?",
2535
+ is_final=True,
2536
+ )
2537
+ await collector.on_metrics_collected(
2538
+ _make_eou_metrics("speech-b", delay=0.7, transcription_delay=0.2)
2539
+ )
2540
+ await collector.on_conversation_item_added(
2541
+ role="user",
2542
+ content=final_input,
2543
+ event_created_at=base_time + 0.25,
2544
+ item_created_at=base_time + 0.25,
2545
+ )
2546
+ await collector.on_metrics_collected(_make_llm_metrics("speech-b"))
2547
+
2548
+ await collector.on_conversation_item_added(
2549
+ role="assistant",
2550
+ content="stale reply from the absorbed turn",
2551
+ event_created_at=base_time + 1.0,
2552
+ item_created_at=base_time + 0.1,
2553
+ )
2554
+ await collector.on_conversation_item_added(
2555
+ role="assistant",
2556
+ content=final_output,
2557
+ event_created_at=base_time + 1.2,
2558
+ item_created_at=base_time + 1.2,
2559
+ )
2560
+ await collector.on_metrics_collected(_make_tts_metrics("speech-b"))
2561
+ await collector.wait_for_pending_trace_tasks()
2562
+
2563
+ asyncio.run(_run())
2564
+
2565
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2566
+ assert len(turn_spans) == 1
2567
+ root = turn_spans[0]
2568
+ assert root.attributes["langfuse.trace.input"] == final_input
2569
+ assert root.attributes["langfuse.trace.output"] == final_output
2570
+ assert "stale reply from the absorbed turn" not in root.attributes["langfuse.trace.output"]
2571
+
2572
+
2573
+ def test_audio_started_turn_is_finalized_separately_when_new_user_turn_arrives(
2574
+ monkeypatch: pytest.MonkeyPatch,
2575
+ ) -> None:
2576
+ import src.agent.traces.metrics_collector as metrics_collector_module
2577
+
2578
+ fake_tracer = _FakeTracer()
2579
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2580
+
2581
+ room = _FakeRoom()
2582
+ collector = MetricsCollector(
2583
+ room=room, # type: ignore[arg-type]
2584
+ model_name="moonshine",
2585
+ room_name=room.name,
2586
+ room_id="RM123",
2587
+ participant_id="web-123",
2588
+ langfuse_enabled=True,
2589
+ )
2590
+
2591
+ async def _run() -> None:
2592
+ await collector.on_session_metadata(
2593
+ session_id="session-audio-started-no-coalesce",
2594
+ participant_id="web-123",
2595
+ )
2596
+ await collector.on_user_input_transcribed("first prompt", is_final=True)
2597
+ await collector.on_metrics_collected(
2598
+ _make_eou_metrics("speech-audio-started", delay=0.4, transcription_delay=0.1)
2599
+ )
2600
+ await collector.on_metrics_collected(_make_llm_metrics("speech-audio-started"))
2601
+ await collector.on_speech_created(
2602
+ _FakeSpeechHandle(chat_items=[], speech_id="speech-audio-started")
2603
+ )
2604
+ await collector.on_agent_state_changed(
2605
+ old_state="thinking",
2606
+ new_state="speaking",
2607
+ )
2608
+ await collector.on_metrics_collected(_make_tts_metrics("speech-audio-started"))
2609
+
2610
+ await collector.on_user_input_transcribed("second prompt", is_final=True)
2611
+ await collector.on_metrics_collected(
2612
+ _make_eou_metrics("speech-separate-b", delay=0.5, transcription_delay=0.1)
2613
+ )
2614
+ await collector.on_metrics_collected(_make_llm_metrics("speech-separate-b"))
2615
+ await collector.on_conversation_item_added(
2616
+ role="assistant",
2617
+ content="second reply",
2618
+ )
2619
+ await collector.on_metrics_collected(_make_tts_metrics("speech-separate-b"))
2620
+ await collector.wait_for_pending_trace_tasks()
2621
+
2622
+ asyncio.run(_run())
2623
+
2624
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2625
+ assert len(turn_spans) == 2
2626
+ first, second = turn_spans
2627
+ assert first.attributes["langfuse.trace.input"] == "first prompt"
2628
+ assert first.attributes["langfuse.trace.metadata.interrupted"] is True
2629
+ assert first.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
2630
+ assert second.attributes["langfuse.trace.input"] == "second prompt"
2631
+ assert second.attributes["langfuse.trace.output"] == "second reply"
2632
+
2633
+
2634
+ def test_interrupted_pretool_leadin_keeps_own_output_and_next_turn_stays_separate(
2635
+ monkeypatch: pytest.MonkeyPatch,
2636
+ ) -> None:
2637
+ import src.agent.traces.metrics_collector as metrics_collector_module
2638
+
2639
+ fake_tracer = _FakeTracer()
2640
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2641
+
2642
+ room = _FakeRoom()
2643
+ collector = MetricsCollector(
2644
+ room=room, # type: ignore[arg-type]
2645
+ model_name="moonshine",
2646
+ room_name=room.name,
2647
+ room_id="RM123",
2648
+ participant_id="web-123",
2649
+ langfuse_enabled=True,
2650
+ )
2651
+
2652
+ async def _run() -> None:
2653
+ await collector.on_session_metadata(
2654
+ session_id="session-interrupted-pretool-leadin",
2655
+ participant_id="web-123",
2656
+ )
2657
+ await collector.on_user_input_transcribed("search papers", is_final=True)
2658
+ await collector.on_metrics_collected(_make_llm_metrics("speech-pretool-interrupted"))
2659
+ await collector.on_tool_step_started()
2660
+ await collector.on_conversation_item_added(
2661
+ role="assistant",
2662
+ content="Let me check that.",
2663
+ )
2664
+ await collector.on_speech_created(
2665
+ _FakeSpeechHandle(chat_items=[], speech_id="speech-pretool-interrupted")
2666
+ )
2667
+ await collector.on_agent_state_changed(
2668
+ old_state="thinking",
2669
+ new_state="speaking",
2670
+ )
2671
+ await collector.on_metrics_collected(_make_tts_metrics("speech-pretool-interrupted"))
2672
+
2673
+ await collector.on_user_input_transcribed("never mind", is_final=True)
2674
+ await collector.on_metrics_collected(_make_llm_metrics("speech-pretool-next"))
2675
+ await collector.on_conversation_item_added(
2676
+ role="assistant",
2677
+ content="Okay, stopping here.",
2678
+ )
2679
+ await collector.on_metrics_collected(_make_tts_metrics("speech-pretool-next"))
2680
+ await collector.wait_for_pending_trace_tasks()
2681
+
2682
+ asyncio.run(_run())
2683
+
2684
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2685
+ assert len(turn_spans) == 2
2686
+ first, second = turn_spans
2687
+ assert first.attributes["langfuse.trace.input"] == "search papers"
2688
+ assert first.attributes["langfuse.trace.metadata.interrupted"] is True
2689
+ assert first.attributes["langfuse.trace.output"] == "Let me check that."
2690
+ assert second.attributes["langfuse.trace.input"] == "never mind"
2691
+ assert second.attributes["langfuse.trace.output"] == "Okay, stopping here."
2692
+
2693
+
2694
+ def test_drain_pending_traces_finalizes_without_manual_sleep(
2695
+ monkeypatch: pytest.MonkeyPatch,
2696
+ ) -> None:
2697
+ import src.agent.traces.metrics_collector as metrics_collector_module
2698
+
2699
+ fake_tracer = _FakeTracer()
2700
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
2701
+
2702
+ room = _FakeRoom()
2703
+ collector = MetricsCollector(
2704
+ room=room, # type: ignore[arg-type]
2705
+ model_name="moonshine",
2706
+ room_name=room.name,
2707
+ room_id="RM123",
2708
+ participant_id="web-123",
2709
+ langfuse_enabled=True,
2710
+ )
2711
+
2712
+ async def _run() -> None:
2713
+ await collector.on_session_metadata(
2714
+ session_id="session-drain-pending-traces",
2715
+ participant_id="web-123",
2716
+ )
2717
+ await collector.on_user_input_transcribed("drain pending", is_final=True)
2718
+ await collector.on_metrics_collected(_make_llm_metrics("speech-drain-pending"))
2719
+ await collector.on_metrics_collected(_make_tts_metrics("speech-drain-pending"))
2720
+ await collector.drain_pending_traces()
2721
+
2722
+ asyncio.run(_run())
2723
+
2724
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
2725
+ assert len(turn_spans) == 1
2726
+ root = turn_spans[0]
2727
+ assert root.attributes["langfuse.trace.input"] == "drain pending"
2728
+ assert root.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
2729
+ assert root.attributes["langfuse.trace.metadata.interrupted"] is True
2730
+ assert root.attributes["langfuse.trace.metadata.interrupted_reason"] == "shutdown_drain"
tests/test_runtime_settings.py CHANGED
@@ -56,6 +56,7 @@ def test_langfuse_runtime_tuning_defaults_are_declared() -> None:
56
 
57
  assert fields["LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS"].default == 8000.0
58
  assert fields["LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS"].default == 30000.0
 
59
  assert fields["LANGFUSE_CONTINUATION_COALESCE_WINDOW_MS"].default == 1500.0
60
 
61
 
 
56
 
57
  assert fields["LANGFUSE_TRACE_FINALIZE_TIMEOUT_MS"].default == 8000.0
58
  assert fields["LANGFUSE_POST_TOOL_RESPONSE_TIMEOUT_MS"].default == 30000.0
59
+ assert fields["LANGFUSE_SHUTDOWN_DRAIN_TIMEOUT_MS"].default == 3000.0
60
  assert fields["LANGFUSE_CONTINUATION_COALESCE_WINDOW_MS"].default == 1500.0
61
 
62