dvalle08 commited on
Commit
d9055df
·
1 Parent(s): 6d5c393

Add fallback session ID support in MetricsCollector: Introduce a mechanism to generate a console-prefixed fallback session ID when metadata is absent. Update session handling logic to utilize the fallback ID and enhance test coverage for fallback scenarios.

Browse files
src/agent/agent.py CHANGED
@@ -1,6 +1,7 @@
1
  import asyncio
2
  import base64
3
  import json
 
4
 
5
  from livekit import agents, rtc
6
  from livekit.agents import AgentServer, AgentSession, Agent, room_io
@@ -35,6 +36,13 @@ def _normalize_langfuse_host() -> str | None:
35
  return host.rstrip("/")
36
 
37
 
 
 
 
 
 
 
 
38
  def setup_langfuse_tracer() -> TracerProvider | None:
39
  """Configure LiveKit telemetry tracer to export traces to Langfuse."""
40
  global _langfuse_tracer_provider
@@ -160,6 +168,7 @@ async def session_handler(ctx: agents.JobContext) -> None:
160
  room_name=ctx.room.name,
161
  room_id=initial_room_id,
162
  participant_id=initial_participant_id,
 
163
  langfuse_enabled=trace_provider is not None,
164
  )
165
 
 
1
  import asyncio
2
  import base64
3
  import json
4
+ import sys
5
 
6
  from livekit import agents, rtc
7
  from livekit.agents import AgentServer, AgentSession, Agent, room_io
 
36
  return host.rstrip("/")
37
 
38
 
39
+ def _fallback_session_prefix() -> str | None:
40
+ """Use console-prefixed fallback session id when running `... console`."""
41
+ if any(arg == "console" for arg in sys.argv[1:]):
42
+ return "console"
43
+ return None
44
+
45
+
46
  def setup_langfuse_tracer() -> TracerProvider | None:
47
  """Configure LiveKit telemetry tracer to export traces to Langfuse."""
48
  global _langfuse_tracer_provider
 
168
  room_name=ctx.room.name,
169
  room_id=initial_room_id,
170
  participant_id=initial_participant_id,
171
+ fallback_session_prefix=_fallback_session_prefix(),
172
  langfuse_enabled=trace_provider is not None,
173
  )
174
 
src/agent/metrics_collector.py CHANGED
@@ -179,6 +179,7 @@ class MetricsCollector:
179
  room_name: str,
180
  room_id: Optional[str] = None,
181
  participant_id: Optional[str] = None,
 
182
  langfuse_enabled: bool = False,
183
  ) -> None:
184
  """Initialize metrics collector.
@@ -189,6 +190,8 @@ class MetricsCollector:
189
  room_name: LiveKit room name
190
  room_id: LiveKit room id (sid) when available
191
  participant_id: LiveKit participant identity when available
 
 
192
  langfuse_enabled: Enable one-trace-per-turn Langfuse traces
193
  """
194
  self._room = room
@@ -201,7 +204,10 @@ class MetricsCollector:
201
 
202
  self._room_name = room_name or self.UNKNOWN_ROOM_ID
203
  self._room_id = room_id or room_name or self.UNKNOWN_ROOM_ID
204
- self._session_id = self.UNKNOWN_SESSION_ID
 
 
 
205
  self._participant_id = participant_id or self.UNKNOWN_PARTICIPANT_ID
206
  self._langfuse_enabled = langfuse_enabled
207
  self._pending_trace_turns: deque[TraceTurn] = deque()
@@ -259,8 +265,9 @@ class MetricsCollector:
259
 
260
  for turn in self._pending_trace_turns:
261
  if (
262
- turn.session_id == self.UNKNOWN_SESSION_ID
263
- and self._session_id != self.UNKNOWN_SESSION_ID
 
264
  ):
265
  turn.session_id = self._session_id
266
  if (
@@ -1248,6 +1255,12 @@ class MetricsCollector:
1248
  normalized = value.strip()
1249
  return normalized or None
1250
 
 
 
 
 
 
 
1251
  async def _resolve_room_id(self) -> str:
1252
  if self._room_id and self._room_id != self._room_name:
1253
  return self._room_id
 
179
  room_name: str,
180
  room_id: Optional[str] = None,
181
  participant_id: Optional[str] = None,
182
+ fallback_session_prefix: Optional[str] = None,
183
  langfuse_enabled: bool = False,
184
  ) -> None:
185
  """Initialize metrics collector.
 
190
  room_name: LiveKit room name
191
  room_id: LiveKit room id (sid) when available
192
  participant_id: LiveKit participant identity when available
193
+ fallback_session_prefix: Prefix used for generated fallback session id
194
+ (e.g. "console" -> "console_<uuid>") when no metadata session id exists
195
  langfuse_enabled: Enable one-trace-per-turn Langfuse traces
196
  """
197
  self._room = room
 
204
 
205
  self._room_name = room_name or self.UNKNOWN_ROOM_ID
206
  self._room_id = room_id or room_name or self.UNKNOWN_ROOM_ID
207
+ self._fallback_session_id = self._build_fallback_session_id(
208
+ fallback_session_prefix
209
+ )
210
+ self._session_id = self._fallback_session_id or self.UNKNOWN_SESSION_ID
211
  self._participant_id = participant_id or self.UNKNOWN_PARTICIPANT_ID
212
  self._langfuse_enabled = langfuse_enabled
213
  self._pending_trace_turns: deque[TraceTurn] = deque()
 
265
 
266
  for turn in self._pending_trace_turns:
267
  if (
268
+ turn.session_id in {self.UNKNOWN_SESSION_ID, self._fallback_session_id}
269
+ and self._session_id
270
+ not in {self.UNKNOWN_SESSION_ID, self._fallback_session_id}
271
  ):
272
  turn.session_id = self._session_id
273
  if (
 
1255
  normalized = value.strip()
1256
  return normalized or None
1257
 
1258
+ def _build_fallback_session_id(self, prefix: Optional[str]) -> Optional[str]:
1259
+ normalized_prefix = self._normalize_optional_text(prefix)
1260
+ if not normalized_prefix:
1261
+ return None
1262
+ return f"{normalized_prefix}_{uuid.uuid4()}"
1263
+
1264
  async def _resolve_room_id(self) -> str:
1265
  if self._room_id and self._room_id != self._room_name:
1266
  return self._room_id
tests/test_langfuse_turn_tracing.py CHANGED
@@ -494,3 +494,75 @@ def test_trace_finalize_timeout_for_missing_assistant_text(
494
  root = turn_spans[0]
495
  assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is True
496
  assert root.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
494
  root = turn_spans[0]
495
  assert root.attributes["langfuse.trace.metadata.assistant_text_missing"] is True
496
  assert root.attributes["langfuse.trace.output"] == "[assistant text unavailable]"
497
+
498
+
499
+ def test_fallback_console_session_id_is_used_when_metadata_absent(
500
+ monkeypatch: pytest.MonkeyPatch,
501
+ ) -> None:
502
+ import src.agent.metrics_collector as metrics_collector_module
503
+
504
+ fake_tracer = _FakeTracer()
505
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
506
+
507
+ room = _FakeRoom()
508
+ collector = MetricsCollector(
509
+ room=room, # type: ignore[arg-type]
510
+ model_name="moonshine",
511
+ room_name=room.name,
512
+ room_id="RM123",
513
+ participant_id="console-user",
514
+ fallback_session_prefix="console",
515
+ langfuse_enabled=True,
516
+ )
517
+
518
+ async def _run() -> None:
519
+ await collector.on_user_input_transcribed("console test", is_final=True)
520
+ await collector.on_metrics_collected(_make_llm_metrics("speech-console"))
521
+ await collector.on_conversation_item_added(role="assistant", content="console reply")
522
+ await collector.on_metrics_collected(_make_tts_metrics("speech-console"))
523
+ await collector.wait_for_pending_trace_tasks()
524
+
525
+ asyncio.run(_run())
526
+
527
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
528
+ assert len(turn_spans) == 1
529
+ session_id = turn_spans[0].attributes["session_id"]
530
+ assert session_id.startswith("console_")
531
+ assert session_id != "unknown-session"
532
+
533
+
534
+ def test_real_session_metadata_overrides_fallback_for_pending_turns(
535
+ monkeypatch: pytest.MonkeyPatch,
536
+ ) -> None:
537
+ import src.agent.metrics_collector as metrics_collector_module
538
+
539
+ fake_tracer = _FakeTracer()
540
+ monkeypatch.setattr(metrics_collector_module, "tracer", fake_tracer)
541
+
542
+ room = _FakeRoom()
543
+ collector = MetricsCollector(
544
+ room=room, # type: ignore[arg-type]
545
+ model_name="moonshine",
546
+ room_name=room.name,
547
+ room_id="RM123",
548
+ participant_id="console-user",
549
+ fallback_session_prefix="console",
550
+ langfuse_enabled=True,
551
+ )
552
+
553
+ async def _run() -> None:
554
+ await collector.on_user_input_transcribed("override test", is_final=True)
555
+ await collector.on_metrics_collected(_make_llm_metrics("speech-override"))
556
+ await collector.on_metrics_collected(_make_tts_metrics("speech-override"))
557
+ await collector.on_session_metadata(
558
+ session_id="session-real",
559
+ participant_id="web-override",
560
+ )
561
+ await collector.on_conversation_item_added(role="assistant", content="reply")
562
+ await collector.wait_for_pending_trace_tasks()
563
+
564
+ asyncio.run(_run())
565
+
566
+ turn_spans = [span for span in fake_tracer.spans if span.name == "turn"]
567
+ assert len(turn_spans) == 1
568
+ assert turn_spans[0].attributes["session_id"] == "session-real"