import numpy as np DEFAULT_TRANSIENT_MAX_SECONDS = 0.1 DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS = 0.2 DEFAULT_TRANSIENT_WINDOW_SECONDS = 0.01 DEFAULT_TRANSIENT_SILENCE_THRESHOLD = 0.015 def _mono(audio_np: np.ndarray) -> np.ndarray: return audio_np.mean(axis=1) if audio_np.ndim == 2 else audio_np def _window_settings(sample_rate: int, max_transient_seconds: float, context_silence_seconds: float, window_seconds: float) -> tuple[int, int, int]: window = max(1, int(window_seconds * sample_rate)) max_noise_windows = max(1, int(np.ceil(max_transient_seconds * sample_rate / window))) min_silence_windows = max(1, int(np.ceil(context_silence_seconds * sample_rate / window))) return window, max_noise_windows, min_silence_windows def _rms_windows(mono: np.ndarray, window: int, frame_count: int, offset: int = 0) -> np.ndarray: return np.array([np.sqrt(np.mean(mono[offset + i * window : offset + (i + 1) * window].astype(np.float64) ** 2)) for i in range(frame_count)]) def _debug_print(debug: bool, label: str, message: str) -> None: if debug: print(f"[{label}] {message}") def trim_leading_transient_noise( audio_np: np.ndarray, sample_rate: int, *, max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS, context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS, window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, debug: bool = False, label: str = "Audio Cleaning", ) -> np.ndarray: mono = _mono(audio_np) window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds) frame_count = min(len(mono) // window, max_noise_windows + min_silence_windows) if frame_count < max_noise_windows + min_silence_windows: return audio_np active = _rms_windows(mono, window, frame_count) > threshold if not active[0]: return audio_np for silence_start in range(1, max_noise_windows + 1): if not active[silence_start : silence_start + min_silence_windows].any(): trim_end = silence_start * window _debug_print(debug, label, f"Trimmed leading transient noise ({trim_end / sample_rate:.2f}s)") return audio_np[trim_end:] return audio_np def trim_trailing_transient_noise( audio_np: np.ndarray, sample_rate: int, *, max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS, context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS, window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, debug: bool = False, label: str = "Audio Cleaning", ) -> np.ndarray: mono = _mono(audio_np) window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds) frame_count = min(len(mono) // window, max_noise_windows + min_silence_windows) if frame_count < max_noise_windows + min_silence_windows: return audio_np offset = len(mono) - frame_count * window active = _rms_windows(mono, window, frame_count, offset=offset) > threshold if not active[-1]: return audio_np for noise_start in range(frame_count - 1, frame_count - max_noise_windows - 1, -1): if not active[noise_start - min_silence_windows : noise_start].any(): trim_start = offset + noise_start * window if trim_start <= 0 or trim_start >= len(audio_np): return audio_np _debug_print(debug, label, f"Trimmed trailing transient noise ({(len(audio_np) - trim_start) / sample_rate:.2f}s)") return audio_np[:trim_start] return audio_np def mute_isolated_transient_noise( audio_np: np.ndarray, sample_rate: int, *, max_transient_seconds: float = DEFAULT_TRANSIENT_MAX_SECONDS, context_silence_seconds: float = DEFAULT_TRANSIENT_CONTEXT_SILENCE_SECONDS, window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, debug: bool = False, label: str = "Audio Cleaning", ) -> np.ndarray: mono = _mono(audio_np) window, max_noise_windows, min_silence_windows = _window_settings(sample_rate, max_transient_seconds, context_silence_seconds, window_seconds) frame_count = len(mono) // window if frame_count < max_noise_windows + 2 * min_silence_windows: return audio_np active = _rms_windows(mono, window, frame_count) > threshold muted = audio_np.copy() active_start = None muted_count = 0 for idx, is_active in enumerate(active): if is_active and active_start is None: active_start = idx elif not is_active and active_start is not None: if idx - active_start <= max_noise_windows: prev_start = max(0, active_start - min_silence_windows) next_end = min(frame_count, idx + min_silence_windows) if not active[prev_start:active_start].any() and not active[idx:next_end].any() and active_start > 0 and idx < frame_count: muted[active_start * window : idx * window] = 0 muted_count += 1 active_start = None _debug_print(debug and muted_count > 0, label, f"Muted {muted_count} isolated transient noise segment(s)") return muted def trim_leading_noise_before_speech( audio_np: np.ndarray, sample_rate: int, *, speech_threshold: float = 0.03, max_leading_seconds: float = 1.0, keep_silence_seconds: float = 0.1, window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, debug: bool = False, label: str = "Audio Cleaning", ) -> np.ndarray: mono = _mono(audio_np) window = max(1, int(window_seconds * sample_rate)) frame_count = min(len(mono) // window, max(1, int(max_leading_seconds * sample_rate / window))) if frame_count == 0: return audio_np strong_windows = np.where(_rms_windows(mono, window, frame_count) > speech_threshold)[0] if len(strong_windows) == 0: return audio_np first_speech = int(strong_windows[0]) * window keep_samples = int(keep_silence_seconds * sample_rate) trim_end = max(0, first_speech - keep_samples) if trim_end <= 0: return audio_np _debug_print(debug, label, f"Trimmed leading low-level noise before speech ({trim_end / sample_rate:.2f}s)") return audio_np[trim_end:] def ensure_trailing_silence(audio_np: np.ndarray, sample_rate: int, min_silence_seconds: float, *, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD) -> np.ndarray: if min_silence_seconds <= 0: return audio_np mono = _mono(audio_np) window = max(1, int(DEFAULT_TRANSIENT_WINDOW_SECONDS * sample_rate)) frame_count = len(mono) // window if frame_count == 0: return audio_np active = _rms_windows(mono, window, frame_count) > threshold active_windows = np.where(active)[0] if len(active_windows) == 0: return audio_np last_active_end = min(len(audio_np), (int(active_windows[-1]) + 1) * window) existing_silence = len(audio_np) - last_active_end target_silence = int(min_silence_seconds * sample_rate) missing_silence = target_silence - existing_silence if missing_silence <= 0: return audio_np pad_shape = (missing_silence,) if audio_np.ndim == 1 else (missing_silence, audio_np.shape[1]) return np.concatenate([audio_np, np.zeros(pad_shape, dtype=audio_np.dtype)], axis=0) def trim_after_silence_boundary( audio_np: np.ndarray, sample_rate: int, earliest_seconds: float, *, search_seconds: float = 2.0, min_silence_seconds: float = 0.18, keep_silence_seconds: float = 0.12, window_seconds: float = DEFAULT_TRANSIENT_WINDOW_SECONDS, threshold: float = DEFAULT_TRANSIENT_SILENCE_THRESHOLD, debug: bool = False, label: str = "Audio Cleaning", ) -> np.ndarray: if earliest_seconds <= 0 or search_seconds <= 0 or len(audio_np) == 0: return audio_np mono = _mono(audio_np) window = max(1, int(window_seconds * sample_rate)) earliest_sample = max(0, int(earliest_seconds * sample_rate)) if earliest_sample >= len(mono): return audio_np offset = (earliest_sample // window) * window search_end = min(len(mono), earliest_sample + int(search_seconds * sample_rate)) frame_count = max(0, (search_end - offset) // window) min_silence_windows = max(1, int(np.ceil(min_silence_seconds * sample_rate / window))) if frame_count < min_silence_windows: return audio_np active = _rms_windows(mono, window, frame_count, offset=offset) > threshold for idx in range(0, len(active) - min_silence_windows + 1): if not active[idx : idx + min_silence_windows].any(): cut_sample = min(len(audio_np), offset + idx * window + int(keep_silence_seconds * sample_rate)) if cut_sample <= 0 or cut_sample >= len(audio_np): return audio_np _debug_print(debug, label, f"Trimmed chunk tail at silence boundary ({cut_sample / sample_rate:.2f}s)") return audio_np[:cut_sample] return audio_np