File size: 5,950 Bytes
c0055e3
 
 
 
 
 
 
 
 
 
 
a1d8504
 
 
 
c0055e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1d8504
c0055e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from __future__ import annotations

import asyncio
import contextlib
import random
from collections.abc import Callable, Sequence
from typing import Any

from livekit import rtc
from livekit.agents import AgentSession, BackgroundAudioPlayer, BuiltinAudioClip

from src.agent.prompts.runtime import (
    DEFAULT_TOOL_FALLBACK_PHRASES,
    TOOL_PRE_SPEECH_FALLBACK,
)
from src.core.logger import logger

DEFAULT_TYPING_CLIPS: tuple[BuiltinAudioClip, ...] = (
    BuiltinAudioClip.KEYBOARD_TYPING,
    BuiltinAudioClip.KEYBOARD_TYPING2,
)
DEFAULT_TYPING_TIMEOUT_SEC = 30.0


class ToolFeedbackController:
    """Controls pre-tool fallback speech selection and typing sound playback."""

    def __init__(
        self,
        *,
        enabled: bool,
        fallback_phrases: Sequence[str] = DEFAULT_TOOL_FALLBACK_PHRASES,
        typing_clips: Sequence[BuiltinAudioClip] = DEFAULT_TYPING_CLIPS,
        typing_timeout_sec: float = DEFAULT_TYPING_TIMEOUT_SEC,
        audio_player_factory: Callable[[], Any] = BackgroundAudioPlayer,
        rng: random.Random | None = None,
    ) -> None:
        self._enabled = enabled
        self._fallback_phrases = tuple(p for p in fallback_phrases if p and p.strip())
        self._typing_clips = tuple(typing_clips)
        self._typing_timeout_sec = max(float(typing_timeout_sec), 0.0)
        self._audio_player_factory = audio_player_factory
        self._rng = rng or random.Random()

        self._audio_player: Any | None = None
        self._typing_handle: Any | None = None
        self._typing_timeout_task: asyncio.Task[Any] | None = None

        self._phrase_rotation: list[str] = []
        self._clip_rotation: list[BuiltinAudioClip] = []
        self._lock = asyncio.Lock()

    @property
    def enabled(self) -> bool:
        return self._enabled

    def next_fallback_phrase(self) -> str:
        if not self._fallback_phrases:
            return TOOL_PRE_SPEECH_FALLBACK
        return self._next_rotation_value(
            values=self._fallback_phrases,
            bucket=self._phrase_rotation,
        )

    async def start(self, *, room: rtc.Room, session: AgentSession) -> None:
        if not self._enabled:
            return

        async with self._lock:
            if self._audio_player is not None:
                return
            player = self._audio_player_factory()
            self._audio_player = player

        try:
            await player.start(room=room, agent_session=session)
        except Exception as exc:
            logger.warning("Tool feedback audio unavailable: failed to start background audio: %s", exc)
            async with self._lock:
                self._audio_player = None
            with contextlib.suppress(Exception):
                await player.aclose()
            return

        logger.info("Tool feedback audio started")

    async def start_typing_sound(self) -> None:
        if not self._enabled:
            return

        async with self._lock:
            if self._audio_player is None or not self._typing_clips:
                return

            if self._typing_handle is not None and not self._is_handle_done(self._typing_handle):
                return

            clip = self._next_rotation_value(
                values=self._typing_clips,
                bucket=self._clip_rotation,
            )
            try:
                self._typing_handle = self._audio_player.play(clip, loop=False)
            except Exception as exc:
                logger.warning("Failed to start typing sound: %s", exc)
                return

            self._restart_typing_timeout_task_locked()
            logger.info("typing_sound_started clip=%s", clip.name)

    async def stop_typing_sound(self, *, reason: str) -> None:
        handle: Any | None = None
        async with self._lock:
            timeout_task = self._typing_timeout_task
            self._typing_timeout_task = None
            if timeout_task is not None:
                timeout_task.cancel()

            handle = self._typing_handle
            self._typing_handle = None

        if handle is None:
            return

        try:
            if not self._is_handle_done(handle):
                stop = getattr(handle, "stop", None)
                if callable(stop):
                    stop()
        except Exception as exc:
            logger.warning("Failed to stop typing sound: %s", exc)
        finally:
            logger.info("typing_sound_stopped reason=%s", reason)

    async def aclose(self) -> None:
        await self.stop_typing_sound(reason="shutdown")

        async with self._lock:
            player = self._audio_player
            self._audio_player = None

        if player is not None:
            with contextlib.suppress(Exception):
                await player.aclose()

    def _next_rotation_value(self, *, values: Sequence[Any], bucket: list[Any]) -> Any:
        if not bucket:
            bucket.extend(values)
            self._rng.shuffle(bucket)
        return bucket.pop()

    def _restart_typing_timeout_task_locked(self) -> None:
        if self._typing_timeout_task is not None:
            self._typing_timeout_task.cancel()

        if self._typing_timeout_sec <= 0:
            self._typing_timeout_task = None
            return

        self._typing_timeout_task = asyncio.create_task(
            self._typing_timeout_worker(),
            name="tool-feedback-typing-timeout",
        )

    async def _typing_timeout_worker(self) -> None:
        try:
            await asyncio.sleep(self._typing_timeout_sec)
            await self.stop_typing_sound(reason="timeout")
        except asyncio.CancelledError:
            return

    @staticmethod
    def _is_handle_done(handle: Any) -> bool:
        done = getattr(handle, "done", None)
        if callable(done):
            try:
                return bool(done())
            except Exception:
                return False
        return False