File size: 9,311 Bytes
7344bef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import numpy as np


DEFAULT_TRANSIENT_MAX_SECONDS = 0.1
DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS = 0.2
DEFAULT_TRANSIENT_WINDOW_SECONDS = 0.01
DEFAULT_TRANSIENT_SILENCE_THRESHOLD = 0.015


def _mono(audio_np: np.ndarray) -> np.ndarray:
    return audio_np.mean(axis=1) if audio_np.ndim == 2 else audio_np


def _window_settings(sample_rate: int, max_transient_seconds: float, context_silence_seconds: float, window_seconds: float) -> tuple[int, int, int]:
    window = max(1, int(window_seconds * sample_rate))
    max_noise_windows = max(1, int(np.ceil(max_transient_seconds * sample_rate / window)))
    min_silence_windows = max(1, int(np.ceil(context_silence_seconds * sample_rate / window)))
    return window, max_noise_windows, min_silence_windows


def _rms_windows(mono: np.ndarray, window: int, frame_count: int, offset: int = 0) -> np.ndarray:
    return np.array([np.sqrt(np.mean(mono[offset + i * window : offset + (i + 1) * window].astype(np.float64) ** 2)) for i in range(frame_count)])


def _debug_print(debug: bool, label: str, message: str) -> None:
    if debug:
        print(f"[{label}] {message}")


def trim_leading_transient_noise(
    audio_np: np.ndarray,
    sample_rate: int,
    *,
    max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS,
    context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS,
    window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS,
    threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD,
    debug: bool = False,
    label: str = "Audio Cleaning",
) -> np.ndarray:
    mono = _mono(audio_np)
    window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds)
    frame_count = min(len(mono) // window, max_noise_windows + min_silence_windows)
    if frame_count < max_noise_windows + min_silence_windows:
        return audio_np
    active = _rms_windows(mono, window, frame_count) > threshold
    if not active[0]:
        return audio_np
    for silence_start in range(1, max_noise_windows + 1):
        if not active[silence_start : silence_start + min_silence_windows].any():
            trim_end = silence_start * window
            _debug_print(debug, label, f"Trimmed leading transient noise ({trim_end / sample_rate:.2f}s)")
            return audio_np[trim_end:]
    return audio_np


def trim_trailing_transient_noise(
    audio_np: np.ndarray,
    sample_rate: int,
    *,
    max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS,
    context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS,
    window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS,
    threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD,
    debug: bool = False,
    label: str = "Audio Cleaning",
) -> np.ndarray:
    mono = _mono(audio_np)
    window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds)
    frame_count = min(len(mono) // window, max_noise_windows + min_silence_windows)
    if frame_count < max_noise_windows + min_silence_windows:
        return audio_np
    offset = len(mono) - frame_count * window
    active = _rms_windows(mono, window, frame_count, offset=offset) > threshold
    if not active[-1]:
        return audio_np
    for noise_start in range(frame_count - 1, frame_count - max_noise_windows - 1, -1):
        if not active[noise_start - min_silence_windows : noise_start].any():
            trim_start = offset + noise_start * window
            if trim_start <= 0 or trim_start >= len(audio_np):
                return audio_np
            _debug_print(debug, label, f"Trimmed trailing transient noise ({(len(audio_np) - trim_start) / sample_rate:.2f}s)")
            return audio_np[:trim_start]
    return audio_np


def mute_isolated_transient_noise(
    audio_np: np.ndarray,
    sample_rate: int,
    *,
    max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS,
    context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS,
    window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS,
    threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD,
    debug: bool = False,
    label: str = "Audio Cleaning",
) -> np.ndarray:
    mono = _mono(audio_np)
    window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds)
    frame_count = len(mono) // window
    if frame_count < max_noise_windows + 2 * min_silence_windows:
        return audio_np
    active = _rms_windows(mono, window, frame_count) > threshold
    muted = audio_np.copy()
    active_start = None
    muted_count = 0
    for idx, is_active in enumerate(active):
        if is_active and active_start is None:
            active_start = idx
        elif not is_active and active_start is not None:
            if idx - active_start <= max_noise_windows:
                prev_start = max(0, active_start - min_silence_windows)
                next_end = min(frame_count, idx + min_silence_windows)
                if not active[prev_start:active_start].any() and not active[idx:next_end].any() and active_start > 0 and idx < frame_count:
                    muted[active_start * window : idx * window] = 0
                    muted_count += 1
            active_start = None
    _debug_print(debug and muted_count > 0, label, f"Muted {muted_count} isolated transient noise segment(s)")
    return muted


def trim_leading_noise_before_speech(
    audio_np: np.ndarray,
    sample_rate: int,
    *,
    speech_threshold: float = 0.03,
    max_leading_seconds: float = 1.0,
    keep_silence_seconds: float = 0.1,
    window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS,
    debug: bool = False,
    label: str = "Audio Cleaning",
) -> np.ndarray:
    mono = _mono(audio_np)
    window = max(1, int(window_seconds * sample_rate))
    frame_count = min(len(mono) // window, max(1, int(max_leading_seconds * sample_rate / window)))
    if frame_count == 0:
        return audio_np
    strong_windows = np.where(_rms_windows(mono, window, frame_count) > speech_threshold)[0]
    if len(strong_windows) == 0:
        return audio_np
    first_speech = int(strong_windows[0]) * window
    keep_samples = int(keep_silence_seconds * sample_rate)
    trim_end = max(0, first_speech - keep_samples)
    if trim_end <= 0:
        return audio_np
    _debug_print(debug, label, f"Trimmed leading low-level noise before speech ({trim_end / sample_rate:.2f}s)")
    return audio_np[trim_end:]


def ensure_trailing_silence(audio_np: np.ndarray, sample_rate: int, min_silence_seconds: float, *, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD) -> np.ndarray:
    if min_silence_seconds <= 0:
        return audio_np
    mono = _mono(audio_np)
    window = max(1, int(DEFAULT_TRANSIENT_WINDOW_SECONDS * sample_rate))
    frame_count = len(mono) // window
    if frame_count == 0:
        return audio_np
    active = _rms_windows(mono, window, frame_count) > threshold
    active_windows = np.where(active)[0]
    if len(active_windows) == 0:
        return audio_np
    last_active_end = min(len(audio_np), (int(active_windows[-1]) + 1) * window)
    existing_silence = len(audio_np) - last_active_end
    target_silence = int(min_silence_seconds * sample_rate)
    missing_silence = target_silence - existing_silence
    if missing_silence <= 0:
        return audio_np
    pad_shape = (missing_silence,) if audio_np.ndim == 1 else (missing_silence, audio_np.shape[1])
    return np.concatenate([audio_np, np.zeros(pad_shape, dtype=audio_np.dtype)], axis=0)


def trim_after_silence_boundary(
    audio_np: np.ndarray,
    sample_rate: int,
    earliest_seconds: float,
    *,
    search_seconds: float = 2.0,
    min_silence_seconds: float = 0.18,
    keep_silence_seconds: float = 0.12,
    window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS,
    threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD,
    debug: bool = False,
    label: str = "Audio Cleaning",
) -> np.ndarray:
    if earliest_seconds <= 0 or search_seconds <= 0 or len(audio_np) == 0:
        return audio_np
    mono = _mono(audio_np)
    window = max(1, int(window_seconds * sample_rate))
    earliest_sample = max(0, int(earliest_seconds * sample_rate))
    if earliest_sample >= len(mono):
        return audio_np
    offset = (earliest_sample // window) * window
    search_end = min(len(mono), earliest_sample + int(search_seconds * sample_rate))
    frame_count = max(0, (search_end - offset) // window)
    min_silence_windows = max(1, int(np.ceil(min_silence_seconds * sample_rate / window)))
    if frame_count < min_silence_windows:
        return audio_np
    active = _rms_windows(mono, window, frame_count, offset=offset) > threshold
    for idx in range(0, len(active) - min_silence_windows + 1):
        if not active[idx : idx + min_silence_windows].any():
            cut_sample = min(len(audio_np), offset + idx * window + int(keep_silence_seconds * sample_rate))
            if cut_sample <= 0 or cut_sample >= len(audio_np):
                return audio_np
            _debug_print(debug, label, f"Trimmed chunk tail at silence boundary ({cut_sample / sample_rate:.2f}s)")
            return audio_np[:cut_sample]
    return audio_np