voice-intelligence / audio.py
unknownfriend00007's picture
Upload 8 files
48c3b28 verified
from __future__ import annotations
import os
import tempfile
from dataclasses import dataclass
from typing import Any
import numpy as np
import soundfile as sf
from faster_whisper.audio import decode_audio
@dataclass
class SilenceTrimOptions:
enabled: bool
threshold_db: float
min_silence_sec: float
keep_padding_sec: float
analysis_window_ms: int
def seconds_from_samples(sample_count: int, sample_rate: int) -> float:
return sample_count / float(sample_rate)
def round_sec(value: float) -> float:
return round(value, 4)
def clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def db_to_amplitude(db_value: float) -> float:
return float(10.0 ** (db_value / 20.0))
def resolve_trim_options(
enabled: bool,
threshold_db: float,
min_silence_sec: float,
keep_padding_sec: float,
analysis_window_ms: int,
) -> SilenceTrimOptions:
threshold_db = clamp(float(threshold_db), -80.0, -5.0)
min_silence_sec = clamp(float(min_silence_sec), 0.02, 10.0)
keep_padding_sec = clamp(float(keep_padding_sec), 0.0, min_silence_sec)
analysis_window_ms = int(max(1, min(250, analysis_window_ms)))
return SilenceTrimOptions(
enabled=bool(enabled),
threshold_db=threshold_db,
min_silence_sec=min_silence_sec,
keep_padding_sec=keep_padding_sec,
analysis_window_ms=analysis_window_ms,
)
def ensure_wav(audio_path: str, sample_rate: int) -> tuple[str, np.ndarray]:
audio = decode_audio(audio_path, sampling_rate=sample_rate)
tmp_dir = tempfile.mkdtemp(prefix="voice-intel-")
wav_path = os.path.join(tmp_dir, "input.wav")
sf.write(wav_path, audio, sample_rate, subtype="PCM_16")
return wav_path, np.asarray(audio, dtype=np.float32)
def save_wav(audio: np.ndarray, wav_path: str, sample_rate: int) -> None:
sf.write(wav_path, np.asarray(audio, dtype=np.float32), sample_rate, subtype="PCM_16")
def detect_silence_runs(
audio: np.ndarray,
sample_rate: int,
threshold_db: float,
min_silence_sec: float,
analysis_window_ms: int,
) -> list[tuple[int, int]]:
if audio.size == 0:
return []
threshold = db_to_amplitude(threshold_db)
window_samples = max(1, int(round((analysis_window_ms / 1000.0) * sample_rate)))
min_silence_samples = max(1, int(round(min_silence_sec * sample_rate)))
frame_ranges: list[tuple[int, int]] = []
frame_silent: list[bool] = []
for start in range(0, len(audio), window_samples):
end = min(len(audio), start + window_samples)
frame_ranges.append((start, end))
if end <= start:
rms = 0.0
else:
chunk = audio[start:end]
rms = float(np.sqrt(np.mean(np.square(chunk.astype(np.float32)))))
frame_silent.append(rms < threshold)
runs: list[tuple[int, int]] = []
run_start: int | None = None
for idx, is_silent in enumerate(frame_silent):
if is_silent and run_start is None:
run_start = idx
elif not is_silent and run_start is not None:
run_sample_start = frame_ranges[run_start][0]
run_sample_end = frame_ranges[idx - 1][1]
if run_sample_end - run_sample_start >= min_silence_samples:
runs.append((run_sample_start, run_sample_end))
run_start = None
if run_start is not None:
run_sample_start = frame_ranges[run_start][0]
run_sample_end = frame_ranges[-1][1]
if run_sample_end - run_sample_start >= min_silence_samples:
runs.append((run_sample_start, run_sample_end))
return runs
def trim_audio(
audio: np.ndarray,
sample_rate: int,
options: SilenceTrimOptions,
) -> tuple[np.ndarray, dict[str, Any], list[tuple[int, int]]]:
raw_duration_sec = round_sec(seconds_from_samples(len(audio), sample_rate))
if not options.enabled:
return audio, {
"enabled": False,
"threshold_db": options.threshold_db,
"min_silence_sec": options.min_silence_sec,
"keep_padding_sec": options.keep_padding_sec,
"analysis_window_ms": options.analysis_window_ms,
"detected_runs": [],
"removed_runs": [],
"removed_silence_sec": 0.0,
"raw_duration_sec": raw_duration_sec,
"processed_duration_sec": raw_duration_sec,
}, []
runs = detect_silence_runs(
audio=audio,
sample_rate=sample_rate,
threshold_db=options.threshold_db,
min_silence_sec=options.min_silence_sec,
analysis_window_ms=options.analysis_window_ms,
)
keep_pad_samples = int(round(options.keep_padding_sec * sample_rate))
removed_intervals: list[tuple[int, int]] = []
for start, end in runs:
remove_start = min(end, start + keep_pad_samples)
remove_end = max(start, end - keep_pad_samples)
if remove_end > remove_start:
removed_intervals.append((remove_start, remove_end))
if not removed_intervals:
return audio, {
"enabled": True,
"threshold_db": options.threshold_db,
"min_silence_sec": options.min_silence_sec,
"keep_padding_sec": options.keep_padding_sec,
"analysis_window_ms": options.analysis_window_ms,
"detected_runs": [_run_payload(start, end, sample_rate) for start, end in runs],
"removed_runs": [],
"removed_silence_sec": 0.0,
"raw_duration_sec": raw_duration_sec,
"processed_duration_sec": raw_duration_sec,
}, []
chunks: list[np.ndarray] = []
cursor = 0
for start, end in removed_intervals:
if start > cursor:
chunks.append(audio[cursor:start])
cursor = end
if cursor < len(audio):
chunks.append(audio[cursor:])
trimmed_audio = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32)
removed_total_samples = sum(end - start for start, end in removed_intervals)
silence_payload = {
"enabled": True,
"threshold_db": options.threshold_db,
"min_silence_sec": options.min_silence_sec,
"keep_padding_sec": options.keep_padding_sec,
"analysis_window_ms": options.analysis_window_ms,
"detected_runs": [_run_payload(start, end, sample_rate) for start, end in runs],
"removed_runs": [_run_payload(start, end, sample_rate) for start, end in removed_intervals],
"removed_silence_sec": round_sec(seconds_from_samples(removed_total_samples, sample_rate)),
"raw_duration_sec": raw_duration_sec,
"processed_duration_sec": round_sec(seconds_from_samples(len(trimmed_audio), sample_rate)),
}
return trimmed_audio, silence_payload, removed_intervals
def _run_payload(start_sample: int, end_sample: int, sample_rate: int) -> dict[str, Any]:
return {
"start_sample": start_sample,
"end_sample": end_sample,
"start_sec": round_sec(seconds_from_samples(start_sample, sample_rate)),
"end_sec": round_sec(seconds_from_samples(end_sample, sample_rate)),
"duration_sec": round_sec(seconds_from_samples(max(0, end_sample - start_sample), sample_rate)),
}