File size: 4,271 Bytes
7274ee2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from dataclasses import dataclass, field
import time

import numpy as np


@dataclass
class UtteranceState:
    sample_rate: int
    speech_frames: list[np.ndarray] = field(default_factory=list)
    silence_ms: float = 0.0
    pending_speech_ms: float = 0.0
    in_speech: bool = False
    preroll_frames: list[np.ndarray] = field(default_factory=list)
    assistant_ignore_until: float = 0.0
    assistant_barge_until: float = 0.0
    pending_barge_ms: float = 0.0
    turn_speech_ms: float = 0.0
    active_speech_ms: float = 0.0
    last_backchannel_at: float = 0.0
    backchannel_count: int = 0
    last_partial_transcript_at: float = 0.0
    last_partial_transcript_text: str = ""
    last_partial_transcript_change_at: float = 0.0
    last_partial_response_text: str = ""
    dynamic_endpoint_target_ms: float = 0.0
    barge_in_active: bool = False
    recent_backchannels: list[str] = field(default_factory=list)
    turn_started_at: float = 0.0

    def push_preroll(self, frame: np.ndarray, max_samples: int) -> None:
        self.preroll_frames.append(frame)
        total_samples = sum(chunk.size for chunk in self.preroll_frames)
        while self.preroll_frames and total_samples > max_samples:
            total_samples -= self.preroll_frames.pop(0).size

    def start(self) -> None:
        self.in_speech = True
        self.turn_started_at = time.monotonic()
        if self.preroll_frames:
            self.speech_frames.extend(self.preroll_frames)
            self.preroll_frames.clear()

    def append(self, frame: np.ndarray) -> None:
        self.speech_frames.append(frame)

    def reset_input(self) -> None:
        self.clear_active_input(preserve_preroll=False)

    def clear_active_input(self, *, preserve_preroll: bool) -> None:
        self.in_speech = False
        self.silence_ms = 0.0
        self.pending_speech_ms = 0.0
        self.pending_barge_ms = 0.0
        self.turn_speech_ms = 0.0
        self.active_speech_ms = 0.0
        self.backchannel_count = 0
        self.last_partial_transcript_at = 0.0
        self.last_partial_transcript_text = ""
        self.last_partial_transcript_change_at = 0.0
        self.last_partial_response_text = ""
        self.dynamic_endpoint_target_ms = 0.0
        self.barge_in_active = False
        self.turn_started_at = 0.0
        self.recent_backchannels.clear()
        self.speech_frames.clear()
        if not preserve_preroll:
            self.preroll_frames.clear()

    def set_assistant_active(self, duration_ms: float, holdoff_ms: int) -> None:
        now = time.monotonic()
        total_s = max(duration_ms, 0.0) / 1000.0 + max(holdoff_ms, 0) / 1000.0
        self.assistant_ignore_until = max(self.assistant_ignore_until, now + total_s)

    def interrupt_assistant(self) -> None:
        self.assistant_ignore_until = 0.0
        self.assistant_barge_until = 0.0

    def should_ignore_input(self) -> bool:
        return time.monotonic() < self.assistant_ignore_until

    def set_barge_grace(self, grace_ms: int) -> None:
        grace_s = max(grace_ms, 0) / 1000.0
        self.assistant_barge_until = max(self.assistant_barge_until, time.monotonic() + grace_s)

    def can_barge_in(self) -> bool:
        return time.monotonic() >= self.assistant_barge_until

    def finish(self) -> np.ndarray:
        self.in_speech = False
        self.silence_ms = 0.0
        self.pending_speech_ms = 0.0
        self.pending_barge_ms = 0.0
        self.turn_speech_ms = 0.0
        self.active_speech_ms = 0.0
        self.backchannel_count = 0
        self.last_partial_transcript_at = 0.0
        self.last_partial_transcript_text = ""
        self.last_partial_transcript_change_at = 0.0
        self.last_partial_response_text = ""
        self.dynamic_endpoint_target_ms = 0.0
        self.barge_in_active = False
        self.turn_started_at = 0.0
        self.recent_backchannels.clear()
        audio = np.concatenate(self.speech_frames) if self.speech_frames else np.zeros(0, dtype=np.float32)
        self.speech_frames.clear()
        self.preroll_frames.clear()
        return audio

    def current_audio(self) -> np.ndarray:
        if not self.speech_frames:
            return np.zeros(0, dtype=np.float32)
        return np.concatenate(self.speech_frames)