dvalle08 commited on
Commit
ff948a9
·
1 Parent(s): c37dfc2

Add agent state change handling and improve metrics collection: Introduce AgentStateChangedEvent to track agent state transitions. Update MetricsCollector to record timestamps for first assistant audio when entering the speaking state. Enhance test coverage for new functionality in agent state management.

Browse files
src/agent/agent.py CHANGED
@@ -7,6 +7,7 @@ from livekit import agents, rtc
7
  from livekit.agents import AgentServer, AgentSession, Agent, room_io
8
  from livekit.agents.telemetry import set_tracer_provider
9
  from livekit.agents.voice.events import (
 
10
  ConversationItemAddedEvent,
11
  MetricsCollectedEvent,
12
  SpeechCreatedEvent,
@@ -134,10 +135,19 @@ class Assistant(Agent):
134
  self._metrics_collector.on_speech_created(event.speech_handle)
135
  )
136
 
 
 
 
 
 
 
 
 
137
  self.session.on("metrics_collected", metrics_wrapper)
138
  self.session.on("user_input_transcribed", transcript_wrapper)
139
  self.session.on("conversation_item_added", conversation_item_wrapper)
140
  self.session.on("speech_created", speech_created_wrapper)
 
141
 
142
 
143
  server = AgentServer(num_idle_processes=settings.livekit.LIVEKIT_NUM_IDLE_PROCESSES)
 
7
  from livekit.agents import AgentServer, AgentSession, Agent, room_io
8
  from livekit.agents.telemetry import set_tracer_provider
9
  from livekit.agents.voice.events import (
10
+ AgentStateChangedEvent,
11
  ConversationItemAddedEvent,
12
  MetricsCollectedEvent,
13
  SpeechCreatedEvent,
 
135
  self._metrics_collector.on_speech_created(event.speech_handle)
136
  )
137
 
138
+ def agent_state_changed_wrapper(event: AgentStateChangedEvent) -> None:
139
+ asyncio.create_task(
140
+ self._metrics_collector.on_agent_state_changed(
141
+ old_state=event.old_state,
142
+ new_state=event.new_state,
143
+ )
144
+ )
145
+
146
  self.session.on("metrics_collected", metrics_wrapper)
147
  self.session.on("user_input_transcribed", transcript_wrapper)
148
  self.session.on("conversation_item_added", conversation_item_wrapper)
149
  self.session.on("speech_created", speech_created_wrapper)
150
+ self.session.on("agent_state_changed", agent_state_changed_wrapper)
151
 
152
 
153
  server = AgentServer(num_idle_processes=settings.livekit.LIVEKIT_NUM_IDLE_PROCESSES)
src/agent/metrics_collector.py CHANGED
@@ -219,6 +219,7 @@ class MetricsCollector:
219
  self._stt_finalization_delays: dict[str, float] = {}
220
  self._speech_end_monotonic_by_speech: dict[str, float] = {}
221
  self._first_audio_monotonic_by_speech: dict[str, float] = {}
 
222
  self._pending_transcripts: deque[str] = deque()
223
  self._pending_agent_transcripts: deque[str] = deque()
224
  self._latest_agent_speech_id: Optional[str] = None
@@ -353,6 +354,10 @@ class MetricsCollector:
353
 
354
  async def on_speech_created(self, speech_handle: Any) -> None:
355
  """Attach a done callback to capture assistant text when playout is complete."""
 
 
 
 
356
  # Try immediate extraction first. Some pipelines do not preserve/trigger
357
  # done callbacks consistently for long responses.
358
  assistant_text = self._extract_text_from_chat_items(
@@ -367,6 +372,9 @@ class MetricsCollector:
367
 
368
  def _on_done(handle: Any) -> None:
369
  try:
 
 
 
370
  assistant_text = self._extract_text_from_chat_items(
371
  getattr(handle, "chat_items", [])
372
  )
@@ -382,6 +390,37 @@ class MetricsCollector:
382
  except Exception:
383
  return
384
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
385
  async def _on_assistant_text(self, assistant_text: str) -> None:
386
  normalized = assistant_text.strip()
387
  if not normalized:
@@ -401,7 +440,6 @@ class MetricsCollector:
401
  if ttfb < 0:
402
  return
403
  speech_id = self._latest_agent_speech_id or f"tts-{uuid.uuid4()}"
404
- self._record_first_assistant_audio_timestamp(speech_id)
405
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
406
  turn_metrics.tts = TTSMetrics(
407
  ttfb=ttfb,
@@ -509,7 +547,6 @@ class MetricsCollector:
509
 
510
  elif isinstance(collected_metrics, metrics.TTSMetrics):
511
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
512
- self._record_first_assistant_audio_timestamp(speech_id)
513
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
514
  turn_metrics.tts = TTSMetrics(
515
  ttfb=collected_metrics.ttfb,
@@ -648,6 +685,7 @@ class MetricsCollector:
648
  self._stt_finalization_delays.pop(speech_id, None)
649
  self._speech_end_monotonic_by_speech.pop(speech_id, None)
650
  self._first_audio_monotonic_by_speech.pop(speech_id, None)
 
651
 
652
  async def _publish_live_update(
653
  self,
@@ -921,6 +959,13 @@ class MetricsCollector:
921
  def _record_first_assistant_audio_timestamp(self, speech_id: str) -> None:
922
  self._first_audio_monotonic_by_speech.setdefault(speech_id, monotonic())
923
 
 
 
 
 
 
 
 
924
  def _observed_total_latency_seconds(self, speech_id: str) -> Optional[float]:
925
  start = self._speech_end_monotonic_by_speech.get(speech_id)
926
  end = self._first_audio_monotonic_by_speech.get(speech_id)
 
219
  self._stt_finalization_delays: dict[str, float] = {}
220
  self._speech_end_monotonic_by_speech: dict[str, float] = {}
221
  self._first_audio_monotonic_by_speech: dict[str, float] = {}
222
+ self._pending_speech_ids_for_first_audio: deque[str] = deque()
223
  self._pending_transcripts: deque[str] = deque()
224
  self._pending_agent_transcripts: deque[str] = deque()
225
  self._latest_agent_speech_id: Optional[str] = None
 
354
 
355
  async def on_speech_created(self, speech_handle: Any) -> None:
356
  """Attach a done callback to capture assistant text when playout is complete."""
357
+ speech_id = self._normalize_optional_text(getattr(speech_handle, "id", None))
358
+ if speech_id:
359
+ self._pending_speech_ids_for_first_audio.append(speech_id)
360
+
361
  # Try immediate extraction first. Some pipelines do not preserve/trigger
362
  # done callbacks consistently for long responses.
363
  assistant_text = self._extract_text_from_chat_items(
 
372
 
373
  def _on_done(handle: Any) -> None:
374
  try:
375
+ done_speech_id = self._normalize_optional_text(getattr(handle, "id", None))
376
+ if done_speech_id:
377
+ self._discard_pending_speech_id(done_speech_id)
378
  assistant_text = self._extract_text_from_chat_items(
379
  getattr(handle, "chat_items", [])
380
  )
 
390
  except Exception:
391
  return
392
 
393
+ async def on_agent_state_changed(
394
+ self,
395
+ *,
396
+ old_state: str,
397
+ new_state: str,
398
+ ) -> None:
399
+ """Record first assistant audio timestamp when agent enters speaking state."""
400
+ if new_state != "speaking":
401
+ return
402
+
403
+ speech_id: Optional[str] = None
404
+ while self._pending_speech_ids_for_first_audio:
405
+ candidate = self._pending_speech_ids_for_first_audio.popleft()
406
+ if candidate not in self._first_audio_monotonic_by_speech:
407
+ speech_id = candidate
408
+ break
409
+
410
+ if speech_id is None:
411
+ latest = self._latest_agent_speech_id
412
+ if latest and latest not in self._first_audio_monotonic_by_speech:
413
+ speech_id = latest
414
+
415
+ if speech_id:
416
+ self._record_first_assistant_audio_timestamp(speech_id)
417
+ logger.debug(
418
+ "First assistant audio recorded from state transition: speech_id=%s, old_state=%s, new_state=%s",
419
+ speech_id,
420
+ old_state,
421
+ new_state,
422
+ )
423
+
424
  async def _on_assistant_text(self, assistant_text: str) -> None:
425
  normalized = assistant_text.strip()
426
  if not normalized:
 
440
  if ttfb < 0:
441
  return
442
  speech_id = self._latest_agent_speech_id or f"tts-{uuid.uuid4()}"
 
443
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
444
  turn_metrics.tts = TTSMetrics(
445
  ttfb=ttfb,
 
547
 
548
  elif isinstance(collected_metrics, metrics.TTSMetrics):
549
  speech_id = collected_metrics.speech_id or collected_metrics.request_id
 
550
  turn_metrics = self._get_or_create_turn(speech_id, role="agent")
551
  turn_metrics.tts = TTSMetrics(
552
  ttfb=collected_metrics.ttfb,
 
685
  self._stt_finalization_delays.pop(speech_id, None)
686
  self._speech_end_monotonic_by_speech.pop(speech_id, None)
687
  self._first_audio_monotonic_by_speech.pop(speech_id, None)
688
+ self._discard_pending_speech_id(speech_id)
689
 
690
  async def _publish_live_update(
691
  self,
 
959
  def _record_first_assistant_audio_timestamp(self, speech_id: str) -> None:
960
  self._first_audio_monotonic_by_speech.setdefault(speech_id, monotonic())
961
 
962
+ def _discard_pending_speech_id(self, speech_id: str) -> None:
963
+ if not self._pending_speech_ids_for_first_audio:
964
+ return
965
+ self._pending_speech_ids_for_first_audio = deque(
966
+ pending for pending in self._pending_speech_ids_for_first_audio if pending != speech_id
967
+ )
968
+
969
  def _observed_total_latency_seconds(self, speech_id: str) -> Optional[float]:
970
  start = self._speech_end_monotonic_by_speech.get(speech_id)
971
  end = self._first_audio_monotonic_by_speech.get(speech_id)
tests/test_langfuse_turn_tracing.py CHANGED
@@ -204,7 +204,8 @@ class _FakeTextMethodPart:
204
 
205
 
206
  class _FakeSpeechHandle:
207
- def __init__(self, chat_items: list[Any]) -> None:
 
208
  self.chat_items = chat_items
209
  self._callbacks: list[Any] = []
210
 
@@ -672,6 +673,7 @@ def test_long_response_latency_accounts_for_llm_generation_wait(
672
  session_id="session-long-gap",
673
  participant_id="web-123",
674
  )
 
675
  await collector.on_user_input_transcribed("Explain neural networks", is_final=True)
676
  await collector.on_metrics_collected(
677
  _make_eou_metrics(speech_id, delay=0.0, transcription_delay=0.0)
@@ -684,6 +686,7 @@ def test_long_response_latency_accounts_for_llm_generation_wait(
684
  content="A neural network is a layered function approximator.",
685
  )
686
  await asyncio.sleep(0.2)
 
687
  await collector.on_metrics_collected(
688
  _make_tts_metrics(speech_id, ttfb=0.01, duration=0.2, audio_duration=0.8)
689
  )
 
204
 
205
 
206
  class _FakeSpeechHandle:
207
+ def __init__(self, chat_items: list[Any], speech_id: str = "speech-fake") -> None:
208
+ self.id = speech_id
209
  self.chat_items = chat_items
210
  self._callbacks: list[Any] = []
211
 
 
673
  session_id="session-long-gap",
674
  participant_id="web-123",
675
  )
676
+ await collector.on_speech_created(_FakeSpeechHandle(chat_items=[], speech_id=speech_id))
677
  await collector.on_user_input_transcribed("Explain neural networks", is_final=True)
678
  await collector.on_metrics_collected(
679
  _make_eou_metrics(speech_id, delay=0.0, transcription_delay=0.0)
 
686
  content="A neural network is a layered function approximator.",
687
  )
688
  await asyncio.sleep(0.2)
689
+ await collector.on_agent_state_changed(old_state="thinking", new_state="speaking")
690
  await collector.on_metrics_collected(
691
  _make_tts_metrics(speech_id, ttfb=0.01, duration=0.2, audio_duration=0.8)
692
  )