Spaces:
Sleeping
Sleeping
File size: 5,950 Bytes
c0055e3 a1d8504 c0055e3 a1d8504 c0055e3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | from __future__ import annotations
import asyncio
import contextlib
import random
from collections.abc import Callable, Sequence
from typing import Any
from livekit import rtc
from livekit.agents import AgentSession, BackgroundAudioPlayer, BuiltinAudioClip
from src.agent.prompts.runtime import (
DEFAULT_TOOL_FALLBACK_PHRASES,
TOOL_PRE_SPEECH_FALLBACK,
)
from src.core.logger import logger
DEFAULT_TYPING_CLIPS: tuple[BuiltinAudioClip, ...] = (
BuiltinAudioClip.KEYBOARD_TYPING,
BuiltinAudioClip.KEYBOARD_TYPING2,
)
DEFAULT_TYPING_TIMEOUT_SEC = 30.0
class ToolFeedbackController:
"""Controls pre-tool fallback speech selection and typing sound playback."""
def __init__(
self,
*,
enabled: bool,
fallback_phrases: Sequence[str] = DEFAULT_TOOL_FALLBACK_PHRASES,
typing_clips: Sequence[BuiltinAudioClip] = DEFAULT_TYPING_CLIPS,
typing_timeout_sec: float = DEFAULT_TYPING_TIMEOUT_SEC,
audio_player_factory: Callable[[], Any] = BackgroundAudioPlayer,
rng: random.Random | None = None,
) -> None:
self._enabled = enabled
self._fallback_phrases = tuple(p for p in fallback_phrases if p and p.strip())
self._typing_clips = tuple(typing_clips)
self._typing_timeout_sec = max(float(typing_timeout_sec), 0.0)
self._audio_player_factory = audio_player_factory
self._rng = rng or random.Random()
self._audio_player: Any | None = None
self._typing_handle: Any | None = None
self._typing_timeout_task: asyncio.Task[Any] | None = None
self._phrase_rotation: list[str] = []
self._clip_rotation: list[BuiltinAudioClip] = []
self._lock = asyncio.Lock()
@property
def enabled(self) -> bool:
return self._enabled
def next_fallback_phrase(self) -> str:
if not self._fallback_phrases:
return TOOL_PRE_SPEECH_FALLBACK
return self._next_rotation_value(
values=self._fallback_phrases,
bucket=self._phrase_rotation,
)
async def start(self, *, room: rtc.Room, session: AgentSession) -> None:
if not self._enabled:
return
async with self._lock:
if self._audio_player is not None:
return
player = self._audio_player_factory()
self._audio_player = player
try:
await player.start(room=room, agent_session=session)
except Exception as exc:
logger.warning("Tool feedback audio unavailable: failed to start background audio: %s", exc)
async with self._lock:
self._audio_player = None
with contextlib.suppress(Exception):
await player.aclose()
return
logger.info("Tool feedback audio started")
async def start_typing_sound(self) -> None:
if not self._enabled:
return
async with self._lock:
if self._audio_player is None or not self._typing_clips:
return
if self._typing_handle is not None and not self._is_handle_done(self._typing_handle):
return
clip = self._next_rotation_value(
values=self._typing_clips,
bucket=self._clip_rotation,
)
try:
self._typing_handle = self._audio_player.play(clip, loop=False)
except Exception as exc:
logger.warning("Failed to start typing sound: %s", exc)
return
self._restart_typing_timeout_task_locked()
logger.info("typing_sound_started clip=%s", clip.name)
async def stop_typing_sound(self, *, reason: str) -> None:
handle: Any | None = None
async with self._lock:
timeout_task = self._typing_timeout_task
self._typing_timeout_task = None
if timeout_task is not None:
timeout_task.cancel()
handle = self._typing_handle
self._typing_handle = None
if handle is None:
return
try:
if not self._is_handle_done(handle):
stop = getattr(handle, "stop", None)
if callable(stop):
stop()
except Exception as exc:
logger.warning("Failed to stop typing sound: %s", exc)
finally:
logger.info("typing_sound_stopped reason=%s", reason)
async def aclose(self) -> None:
await self.stop_typing_sound(reason="shutdown")
async with self._lock:
player = self._audio_player
self._audio_player = None
if player is not None:
with contextlib.suppress(Exception):
await player.aclose()
def _next_rotation_value(self, *, values: Sequence[Any], bucket: list[Any]) -> Any:
if not bucket:
bucket.extend(values)
self._rng.shuffle(bucket)
return bucket.pop()
def _restart_typing_timeout_task_locked(self) -> None:
if self._typing_timeout_task is not None:
self._typing_timeout_task.cancel()
if self._typing_timeout_sec <= 0:
self._typing_timeout_task = None
return
self._typing_timeout_task = asyncio.create_task(
self._typing_timeout_worker(),
name="tool-feedback-typing-timeout",
)
async def _typing_timeout_worker(self) -> None:
try:
await asyncio.sleep(self._typing_timeout_sec)
await self.stop_typing_sound(reason="timeout")
except asyncio.CancelledError:
return
@staticmethod
def _is_handle_done(handle: Any) -> bool:
done = getattr(handle, "done", None)
if callable(done):
try:
return bool(done())
except Exception:
return False
return False
|