open-voice-agent / src /agent /tools /feedback.py
dvalle08's picture
refactor: Restructure agent components and remove unused files
a1d8504
from __future__ import annotations
import asyncio
import contextlib
import random
from collections.abc import Callable, Sequence
from typing import Any
from livekit import rtc
from livekit.agents import AgentSession, BackgroundAudioPlayer, BuiltinAudioClip
from src.agent.prompts.runtime import (
DEFAULT_TOOL_FALLBACK_PHRASES,
TOOL_PRE_SPEECH_FALLBACK,
)
from src.core.logger import logger
DEFAULT_TYPING_CLIPS: tuple[BuiltinAudioClip, ...] = (
BuiltinAudioClip.KEYBOARD_TYPING,
BuiltinAudioClip.KEYBOARD_TYPING2,
)
DEFAULT_TYPING_TIMEOUT_SEC = 30.0
class ToolFeedbackController:
"""Controls pre-tool fallback speech selection and typing sound playback."""
def __init__(
self,
*,
enabled: bool,
fallback_phrases: Sequence[str] = DEFAULT_TOOL_FALLBACK_PHRASES,
typing_clips: Sequence[BuiltinAudioClip] = DEFAULT_TYPING_CLIPS,
typing_timeout_sec: float = DEFAULT_TYPING_TIMEOUT_SEC,
audio_player_factory: Callable[[], Any] = BackgroundAudioPlayer,
rng: random.Random | None = None,
) -> None:
self._enabled = enabled
self._fallback_phrases = tuple(p for p in fallback_phrases if p and p.strip())
self._typing_clips = tuple(typing_clips)
self._typing_timeout_sec = max(float(typing_timeout_sec), 0.0)
self._audio_player_factory = audio_player_factory
self._rng = rng or random.Random()
self._audio_player: Any | None = None
self._typing_handle: Any | None = None
self._typing_timeout_task: asyncio.Task[Any] | None = None
self._phrase_rotation: list[str] = []
self._clip_rotation: list[BuiltinAudioClip] = []
self._lock = asyncio.Lock()
@property
def enabled(self) -> bool:
return self._enabled
def next_fallback_phrase(self) -> str:
if not self._fallback_phrases:
return TOOL_PRE_SPEECH_FALLBACK
return self._next_rotation_value(
values=self._fallback_phrases,
bucket=self._phrase_rotation,
)
async def start(self, *, room: rtc.Room, session: AgentSession) -> None:
if not self._enabled:
return
async with self._lock:
if self._audio_player is not None:
return
player = self._audio_player_factory()
self._audio_player = player
try:
await player.start(room=room, agent_session=session)
except Exception as exc:
logger.warning("Tool feedback audio unavailable: failed to start background audio: %s", exc)
async with self._lock:
self._audio_player = None
with contextlib.suppress(Exception):
await player.aclose()
return
logger.info("Tool feedback audio started")
async def start_typing_sound(self) -> None:
if not self._enabled:
return
async with self._lock:
if self._audio_player is None or not self._typing_clips:
return
if self._typing_handle is not None and not self._is_handle_done(self._typing_handle):
return
clip = self._next_rotation_value(
values=self._typing_clips,
bucket=self._clip_rotation,
)
try:
self._typing_handle = self._audio_player.play(clip, loop=False)
except Exception as exc:
logger.warning("Failed to start typing sound: %s", exc)
return
self._restart_typing_timeout_task_locked()
logger.info("typing_sound_started clip=%s", clip.name)
async def stop_typing_sound(self, *, reason: str) -> None:
handle: Any | None = None
async with self._lock:
timeout_task = self._typing_timeout_task
self._typing_timeout_task = None
if timeout_task is not None:
timeout_task.cancel()
handle = self._typing_handle
self._typing_handle = None
if handle is None:
return
try:
if not self._is_handle_done(handle):
stop = getattr(handle, "stop", None)
if callable(stop):
stop()
except Exception as exc:
logger.warning("Failed to stop typing sound: %s", exc)
finally:
logger.info("typing_sound_stopped reason=%s", reason)
async def aclose(self) -> None:
await self.stop_typing_sound(reason="shutdown")
async with self._lock:
player = self._audio_player
self._audio_player = None
if player is not None:
with contextlib.suppress(Exception):
await player.aclose()
def _next_rotation_value(self, *, values: Sequence[Any], bucket: list[Any]) -> Any:
if not bucket:
bucket.extend(values)
self._rng.shuffle(bucket)
return bucket.pop()
def _restart_typing_timeout_task_locked(self) -> None:
if self._typing_timeout_task is not None:
self._typing_timeout_task.cancel()
if self._typing_timeout_sec <= 0:
self._typing_timeout_task = None
return
self._typing_timeout_task = asyncio.create_task(
self._typing_timeout_worker(),
name="tool-feedback-typing-timeout",
)
async def _typing_timeout_worker(self) -> None:
try:
await asyncio.sleep(self._typing_timeout_sec)
await self.stop_typing_sound(reason="timeout")
except asyncio.CancelledError:
return
@staticmethod
def _is_handle_done(handle: Any) -> bool:
done = getattr(handle, "done", None)
if callable(done):
try:
return bool(done())
except Exception:
return False
return False