from __future__ import annotations import io import math import wave from collections.abc import Iterable import numpy as np from scipy.signal import resample_poly def pcm16_bytes_to_float32(data: bytes) -> np.ndarray: pcm = np.frombuffer(data, dtype=np.int16).astype(np.float32) return pcm / 32768.0 def rms(audio: np.ndarray) -> float: if audio.size == 0: return 0.0 return float(np.sqrt(np.mean(np.square(audio), dtype=np.float32))) def peak(audio: np.ndarray) -> float: if audio.size == 0: return 0.0 return float(np.max(np.abs(audio))) def frame_duration_ms(frame: np.ndarray, sample_rate: int) -> float: if sample_rate <= 0: return 0.0 return (len(frame) / sample_rate) * 1000.0 def chunk_audio(audio: np.ndarray, chunk_samples: int) -> Iterable[np.ndarray]: for start in range(0, len(audio), chunk_samples): yield audio[start : start + chunk_samples] def resample_audio(audio: np.ndarray, src_rate: int, dst_rate: int) -> np.ndarray: if audio.size == 0 or src_rate <= 0 or dst_rate <= 0 or src_rate == dst_rate: return audio gcd = math.gcd(src_rate, dst_rate) up = dst_rate // gcd down = src_rate // gcd return np.asarray(resample_poly(audio, up, down), dtype=np.float32) def trim_silence( audio: np.ndarray, sample_rate: int, threshold: float, frame_ms: int = 20, keep_edge_ms: int = 0, ) -> np.ndarray: if audio.size == 0 or sample_rate <= 0: return audio frame_samples = max(1, int(sample_rate * (frame_ms / 1000.0))) keep_edge_samples = max(0, int(sample_rate * (keep_edge_ms / 1000.0))) start = 0 end = len(audio) while start + frame_samples <= end and rms(audio[start : start + frame_samples]) < threshold: start += frame_samples while end - frame_samples >= start and rms(audio[end - frame_samples : end]) < threshold: end -= frame_samples if start >= end: return audio[start:end] start = max(0, start - keep_edge_samples) end = min(len(audio), end + keep_edge_samples) return audio[start:end] def wav_bytes_from_float32(audio: np.ndarray, sample_rate: int) -> bytes: clipped = np.clip(audio, -1.0, 1.0) pcm16 = (clipped * 32767.0).astype(np.int16) buffer = io.BytesIO() with wave.open(buffer, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(sample_rate) wav_file.writeframes(pcm16.tobytes()) return buffer.getvalue()