Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import contextlib | |
| import random | |
| from collections.abc import Callable, Sequence | |
| from typing import Any | |
| from livekit import rtc | |
| from livekit.agents import AgentSession, BackgroundAudioPlayer, BuiltinAudioClip | |
| from src.agent.prompts.runtime import ( | |
| DEFAULT_TOOL_FALLBACK_PHRASES, | |
| TOOL_PRE_SPEECH_FALLBACK, | |
| ) | |
| from src.core.logger import logger | |
| DEFAULT_TYPING_CLIPS: tuple[BuiltinAudioClip, ...] = ( | |
| BuiltinAudioClip.KEYBOARD_TYPING, | |
| BuiltinAudioClip.KEYBOARD_TYPING2, | |
| ) | |
| DEFAULT_TYPING_TIMEOUT_SEC = 30.0 | |
| class ToolFeedbackController: | |
| """Controls pre-tool fallback speech selection and typing sound playback.""" | |
| def __init__( | |
| self, | |
| *, | |
| enabled: bool, | |
| fallback_phrases: Sequence[str] = DEFAULT_TOOL_FALLBACK_PHRASES, | |
| typing_clips: Sequence[BuiltinAudioClip] = DEFAULT_TYPING_CLIPS, | |
| typing_timeout_sec: float = DEFAULT_TYPING_TIMEOUT_SEC, | |
| audio_player_factory: Callable[[], Any] = BackgroundAudioPlayer, | |
| rng: random.Random | None = None, | |
| ) -> None: | |
| self._enabled = enabled | |
| self._fallback_phrases = tuple(p for p in fallback_phrases if p and p.strip()) | |
| self._typing_clips = tuple(typing_clips) | |
| self._typing_timeout_sec = max(float(typing_timeout_sec), 0.0) | |
| self._audio_player_factory = audio_player_factory | |
| self._rng = rng or random.Random() | |
| self._audio_player: Any | None = None | |
| self._typing_handle: Any | None = None | |
| self._typing_timeout_task: asyncio.Task[Any] | None = None | |
| self._phrase_rotation: list[str] = [] | |
| self._clip_rotation: list[BuiltinAudioClip] = [] | |
| self._lock = asyncio.Lock() | |
| def enabled(self) -> bool: | |
| return self._enabled | |
| def next_fallback_phrase(self) -> str: | |
| if not self._fallback_phrases: | |
| return TOOL_PRE_SPEECH_FALLBACK | |
| return self._next_rotation_value( | |
| values=self._fallback_phrases, | |
| bucket=self._phrase_rotation, | |
| ) | |
| async def start(self, *, room: rtc.Room, session: AgentSession) -> None: | |
| if not self._enabled: | |
| return | |
| async with self._lock: | |
| if self._audio_player is not None: | |
| return | |
| player = self._audio_player_factory() | |
| self._audio_player = player | |
| try: | |
| await player.start(room=room, agent_session=session) | |
| except Exception as exc: | |
| logger.warning("Tool feedback audio unavailable: failed to start background audio: %s", exc) | |
| async with self._lock: | |
| self._audio_player = None | |
| with contextlib.suppress(Exception): | |
| await player.aclose() | |
| return | |
| logger.info("Tool feedback audio started") | |
| async def start_typing_sound(self) -> None: | |
| if not self._enabled: | |
| return | |
| async with self._lock: | |
| if self._audio_player is None or not self._typing_clips: | |
| return | |
| if self._typing_handle is not None and not self._is_handle_done(self._typing_handle): | |
| return | |
| clip = self._next_rotation_value( | |
| values=self._typing_clips, | |
| bucket=self._clip_rotation, | |
| ) | |
| try: | |
| self._typing_handle = self._audio_player.play(clip, loop=False) | |
| except Exception as exc: | |
| logger.warning("Failed to start typing sound: %s", exc) | |
| return | |
| self._restart_typing_timeout_task_locked() | |
| logger.info("typing_sound_started clip=%s", clip.name) | |
| async def stop_typing_sound(self, *, reason: str) -> None: | |
| handle: Any | None = None | |
| async with self._lock: | |
| timeout_task = self._typing_timeout_task | |
| self._typing_timeout_task = None | |
| if timeout_task is not None: | |
| timeout_task.cancel() | |
| handle = self._typing_handle | |
| self._typing_handle = None | |
| if handle is None: | |
| return | |
| try: | |
| if not self._is_handle_done(handle): | |
| stop = getattr(handle, "stop", None) | |
| if callable(stop): | |
| stop() | |
| except Exception as exc: | |
| logger.warning("Failed to stop typing sound: %s", exc) | |
| finally: | |
| logger.info("typing_sound_stopped reason=%s", reason) | |
| async def aclose(self) -> None: | |
| await self.stop_typing_sound(reason="shutdown") | |
| async with self._lock: | |
| player = self._audio_player | |
| self._audio_player = None | |
| if player is not None: | |
| with contextlib.suppress(Exception): | |
| await player.aclose() | |
| def _next_rotation_value(self, *, values: Sequence[Any], bucket: list[Any]) -> Any: | |
| if not bucket: | |
| bucket.extend(values) | |
| self._rng.shuffle(bucket) | |
| return bucket.pop() | |
| def _restart_typing_timeout_task_locked(self) -> None: | |
| if self._typing_timeout_task is not None: | |
| self._typing_timeout_task.cancel() | |
| if self._typing_timeout_sec <= 0: | |
| self._typing_timeout_task = None | |
| return | |
| self._typing_timeout_task = asyncio.create_task( | |
| self._typing_timeout_worker(), | |
| name="tool-feedback-typing-timeout", | |
| ) | |
| async def _typing_timeout_worker(self) -> None: | |
| try: | |
| await asyncio.sleep(self._typing_timeout_sec) | |
| await self.stop_typing_sound(reason="timeout") | |
| except asyncio.CancelledError: | |
| return | |
| def _is_handle_done(handle: Any) -> bool: | |
| done = getattr(handle, "done", None) | |
| if callable(done): | |
| try: | |
| return bool(done()) | |
| except Exception: | |
| return False | |
| return False | |