from __future__ import annotations import os import tempfile from dataclasses import dataclass from typing import Any import numpy as np import soundfile as sf from faster_whisper.audio import decode_audio @dataclass class SilenceTrimOptions: enabled: bool threshold_db: float min_silence_sec: float keep_padding_sec: float analysis_window_ms: int def seconds_from_samples(sample_count: int, sample_rate: int) -> float: return sample_count / float(sample_rate) def round_sec(value: float) -> float: return round(value, 4) def clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def db_to_amplitude(db_value: float) -> float: return float(10.0 ** (db_value / 20.0)) def resolve_trim_options( enabled: bool, threshold_db: float, min_silence_sec: float, keep_padding_sec: float, analysis_window_ms: int, ) -> SilenceTrimOptions: threshold_db = clamp(float(threshold_db), -80.0, -5.0) min_silence_sec = clamp(float(min_silence_sec), 0.02, 10.0) keep_padding_sec = clamp(float(keep_padding_sec), 0.0, min_silence_sec) analysis_window_ms = int(max(1, min(250, analysis_window_ms))) return SilenceTrimOptions( enabled=bool(enabled), threshold_db=threshold_db, min_silence_sec=min_silence_sec, keep_padding_sec=keep_padding_sec, analysis_window_ms=analysis_window_ms, ) def ensure_wav(audio_path: str, sample_rate: int) -> tuple[str, np.ndarray]: audio = decode_audio(audio_path, sampling_rate=sample_rate) tmp_dir = tempfile.mkdtemp(prefix="voice-intel-") wav_path = os.path.join(tmp_dir, "input.wav") sf.write(wav_path, audio, sample_rate, subtype="PCM_16") return wav_path, np.asarray(audio, dtype=np.float32) def save_wav(audio: np.ndarray, wav_path: str, sample_rate: int) -> None: sf.write(wav_path, np.asarray(audio, dtype=np.float32), sample_rate, subtype="PCM_16") def detect_silence_runs( audio: np.ndarray, sample_rate: int, threshold_db: float, min_silence_sec: float, analysis_window_ms: int, ) -> list[tuple[int, int]]: if audio.size == 0: return [] threshold = db_to_amplitude(threshold_db) window_samples = max(1, int(round((analysis_window_ms / 1000.0) * sample_rate))) min_silence_samples = max(1, int(round(min_silence_sec * sample_rate))) frame_ranges: list[tuple[int, int]] = [] frame_silent: list[bool] = [] for start in range(0, len(audio), window_samples): end = min(len(audio), start + window_samples) frame_ranges.append((start, end)) if end <= start: rms = 0.0 else: chunk = audio[start:end] rms = float(np.sqrt(np.mean(np.square(chunk.astype(np.float32))))) frame_silent.append(rms < threshold) runs: list[tuple[int, int]] = [] run_start: int | None = None for idx, is_silent in enumerate(frame_silent): if is_silent and run_start is None: run_start = idx elif not is_silent and run_start is not None: run_sample_start = frame_ranges[run_start][0] run_sample_end = frame_ranges[idx - 1][1] if run_sample_end - run_sample_start >= min_silence_samples: runs.append((run_sample_start, run_sample_end)) run_start = None if run_start is not None: run_sample_start = frame_ranges[run_start][0] run_sample_end = frame_ranges[-1][1] if run_sample_end - run_sample_start >= min_silence_samples: runs.append((run_sample_start, run_sample_end)) return runs def trim_audio( audio: np.ndarray, sample_rate: int, options: SilenceTrimOptions, ) -> tuple[np.ndarray, dict[str, Any], list[tuple[int, int]]]: raw_duration_sec = round_sec(seconds_from_samples(len(audio), sample_rate)) if not options.enabled: return audio, { "enabled": False, "threshold_db": options.threshold_db, "min_silence_sec": options.min_silence_sec, "keep_padding_sec": options.keep_padding_sec, "analysis_window_ms": options.analysis_window_ms, "detected_runs": [], "removed_runs": [], "removed_silence_sec": 0.0, "raw_duration_sec": raw_duration_sec, "processed_duration_sec": raw_duration_sec, }, [] runs = detect_silence_runs( audio=audio, sample_rate=sample_rate, threshold_db=options.threshold_db, min_silence_sec=options.min_silence_sec, analysis_window_ms=options.analysis_window_ms, ) keep_pad_samples = int(round(options.keep_padding_sec * sample_rate)) removed_intervals: list[tuple[int, int]] = [] for start, end in runs: remove_start = min(end, start + keep_pad_samples) remove_end = max(start, end - keep_pad_samples) if remove_end > remove_start: removed_intervals.append((remove_start, remove_end)) if not removed_intervals: return audio, { "enabled": True, "threshold_db": options.threshold_db, "min_silence_sec": options.min_silence_sec, "keep_padding_sec": options.keep_padding_sec, "analysis_window_ms": options.analysis_window_ms, "detected_runs": [_run_payload(start, end, sample_rate) for start, end in runs], "removed_runs": [], "removed_silence_sec": 0.0, "raw_duration_sec": raw_duration_sec, "processed_duration_sec": raw_duration_sec, }, [] chunks: list[np.ndarray] = [] cursor = 0 for start, end in removed_intervals: if start > cursor: chunks.append(audio[cursor:start]) cursor = end if cursor < len(audio): chunks.append(audio[cursor:]) trimmed_audio = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32) removed_total_samples = sum(end - start for start, end in removed_intervals) silence_payload = { "enabled": True, "threshold_db": options.threshold_db, "min_silence_sec": options.min_silence_sec, "keep_padding_sec": options.keep_padding_sec, "analysis_window_ms": options.analysis_window_ms, "detected_runs": [_run_payload(start, end, sample_rate) for start, end in runs], "removed_runs": [_run_payload(start, end, sample_rate) for start, end in removed_intervals], "removed_silence_sec": round_sec(seconds_from_samples(removed_total_samples, sample_rate)), "raw_duration_sec": raw_duration_sec, "processed_duration_sec": round_sec(seconds_from_samples(len(trimmed_audio), sample_rate)), } return trimmed_audio, silence_payload, removed_intervals def _run_payload(start_sample: int, end_sample: int, sample_rate: int) -> dict[str, Any]: return { "start_sample": start_sample, "end_sample": end_sample, "start_sec": round_sec(seconds_from_samples(start_sample, sample_rate)), "end_sec": round_sec(seconds_from_samples(end_sample, sample_rate)), "duration_sec": round_sec(seconds_from_samples(max(0, end_sample - start_sample), sample_rate)), }