""" Audio Processor Module ====================== Handles audio loading, preprocessing, and segmentation. """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple, Union import numpy as np import torch import torchaudio from torchaudio.transforms import Resample try: import librosa LIBROSA_AVAILABLE = True except ImportError: LIBROSA_AVAILABLE = False @dataclass class AudioConfig: """Configuration for audio processing""" sample_rate: int = 16000 mono: bool = True normalize: bool = True trim_silence: bool = False silence_threshold_db: float = -40.0 max_duration_seconds: Optional[float] = None @dataclass class AudioInfo: """Information about loaded audio""" path: str duration_seconds: float sample_rate: int num_channels: int num_samples: int class AudioProcessor: """ Handles all audio preprocessing operations. Converts input audio to standardized format for downstream processing. Attributes: config: AudioConfig object with processing settings Example: >>> processor = AudioProcessor() >>> waveform, sr = processor.load_audio("meeting.wav") >>> print(f"Duration: {processor.get_duration(waveform, sr):.2f}s") """ SUPPORTED_FORMATS = {".wav", ".mp3", ".flac", ".ogg", ".m4a", ".wma", ".aac"} def __init__(self, config: Optional[AudioConfig] = None): """ Initialize AudioProcessor. Args: config: AudioConfig object (uses defaults if None) """ self.config = config or AudioConfig() self._resampler_cache: dict = {} def load_audio( self, audio_path: Union[str, Path], start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> Tuple[torch.Tensor, int]: """ Load and preprocess audio file. Args: audio_path: Path to audio file start_time: Start time in seconds (optional) end_time: End time in seconds (optional) Returns: Tuple of (waveform tensor [1, T], sample_rate) Raises: FileNotFoundError: If audio file doesn't exist ValueError: If audio format is not supported """ audio_path = Path(audio_path) # Validate file exists if not audio_path.exists(): raise FileNotFoundError(f"Audio file not found: {audio_path}") # Validate format if audio_path.suffix.lower() not in self.SUPPORTED_FORMATS: raise ValueError( f"Unsupported audio format: {audio_path.suffix}. " f"Supported formats: {self.SUPPORTED_FORMATS}" ) # Load audio try: waveform, orig_sr = torchaudio.load(str(audio_path)) except Exception as e: # Fallback to librosa if torchaudio fails if LIBROSA_AVAILABLE: try: audio_np, orig_sr = librosa.load(str(audio_path), sr=None, mono=False) if audio_np.ndim == 1: audio_np = audio_np[np.newaxis, :] waveform = torch.from_numpy(audio_np).float() except Exception: # Try pydub (requires ffmpeg) as a robust fallback try: from pydub import AudioSegment seg = AudioSegment.from_file(str(audio_path)) orig_sr = seg.frame_rate samples = np.array(seg.get_array_of_samples()) if seg.channels > 1: samples = samples.reshape((-1, seg.channels)).T else: samples = samples[np.newaxis, :] # Normalize based on sample width max_val = float(1 << (8 * seg.sample_width - 1)) audio_np = samples.astype(np.float32) / max_val waveform = torch.from_numpy(audio_np).float() except Exception: # Try ffmpeg CLI (system binary) to decode to WAV in-memory (no extra Python packages required) try: import io import subprocess import soundfile as sf proc = subprocess.run( [ "ffmpeg", "-i", str(audio_path), "-f", "wav", "-ar", "16000", "-ac", "1", "pipe:1", ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) out = proc.stdout audio_np, orig_sr = sf.read(io.BytesIO(out), dtype="float32") if audio_np.ndim == 1: audio_np = audio_np[np.newaxis, :] else: audio_np = audio_np.T waveform = torch.from_numpy(audio_np).float() except Exception: # Last resort: use ffmpeg-python to decode into WAV bytes and read via soundfile try: import io import ffmpeg import soundfile as sf out, _ = ( ffmpeg.input(str(audio_path)) .output("pipe:", format="wav", acodec="pcm_s16le") .run(capture_stdout=True, capture_stderr=True) ) audio_np, orig_sr = sf.read(io.BytesIO(out), dtype="float32") if audio_np.ndim == 1: audio_np = audio_np[np.newaxis, :] else: audio_np = audio_np.T waveform = torch.from_numpy(audio_np).float() except Exception: raise RuntimeError( "Format file tidak didukung atau backend decoding (ffmpeg) tidak tersedia. " "Silakan install ffmpeg (pastikan tersedia di PATH) atau gunakan format WAV/MP3 yang didukung." ) else: raise RuntimeError(f"Failed to load audio: {e}") # Trim to time range if specified if start_time is not None or end_time is not None: waveform = self._trim_to_range(waveform, orig_sr, start_time, end_time) # Convert to mono if needed if self.config.mono and waveform.shape[0] > 1: waveform = torch.mean(waveform, dim=0, keepdim=True) # Resample if needed if orig_sr != self.config.sample_rate: waveform = self._resample(waveform, orig_sr, self.config.sample_rate) # Normalize amplitude if self.config.normalize: waveform = self._normalize(waveform) # Trim silence if requested if self.config.trim_silence: waveform = self._trim_silence(waveform) # Enforce max duration if self.config.max_duration_seconds: max_samples = int(self.config.max_duration_seconds * self.config.sample_rate) if waveform.shape[-1] > max_samples: waveform = waveform[:, :max_samples] return waveform, self.config.sample_rate def get_audio_info(self, audio_path: Union[str, Path]) -> AudioInfo: """ Get information about audio file without loading full waveform. Args: audio_path: Path to audio file Returns: AudioInfo object with file details """ audio_path = Path(audio_path) if not audio_path.exists(): raise FileNotFoundError(f"Audio file not found: {audio_path}") info = torchaudio.info(str(audio_path)) return AudioInfo( path=str(audio_path), duration_seconds=info.num_frames / info.sample_rate, sample_rate=info.sample_rate, num_channels=info.num_channels, num_samples=info.num_frames, ) def _trim_to_range( self, waveform: torch.Tensor, sample_rate: int, start_time: Optional[float], end_time: Optional[float], ) -> torch.Tensor: """Trim waveform to specified time range""" start_sample = int((start_time or 0) * sample_rate) end_sample = int((end_time or waveform.shape[-1] / sample_rate) * sample_rate) start_sample = max(0, start_sample) end_sample = min(waveform.shape[-1], end_sample) return waveform[:, start_sample:end_sample] def _resample(self, waveform: torch.Tensor, orig_sr: int, target_sr: int) -> torch.Tensor: """Resample audio to target sample rate with caching""" cache_key = (orig_sr, target_sr) if cache_key not in self._resampler_cache: self._resampler_cache[cache_key] = Resample(orig_freq=orig_sr, new_freq=target_sr) return self._resampler_cache[cache_key](waveform) def _normalize(self, waveform: torch.Tensor) -> torch.Tensor: """Normalize waveform to [-1, 1] range""" max_val = torch.max(torch.abs(waveform)) if max_val > 0: waveform = waveform / max_val return waveform def _trim_silence(self, waveform: torch.Tensor) -> torch.Tensor: """Remove leading and trailing silence""" # Convert threshold from dB to amplitude threshold = 10 ** (self.config.silence_threshold_db / 20) # Find non-silent regions amplitude = torch.abs(waveform).squeeze() non_silent = amplitude > threshold if not non_silent.any(): return waveform # Find first and last non-silent sample non_silent_indices = torch.where(non_silent)[0] start_idx = non_silent_indices[0].item() end_idx = non_silent_indices[-1].item() + 1 return waveform[:, start_idx:end_idx] def get_duration(self, waveform: torch.Tensor, sample_rate: int) -> float: """Get duration of waveform in seconds""" return waveform.shape[-1] / sample_rate def cut_segment( self, waveform: torch.Tensor, start_sec: float, end_sec: float, sample_rate: int ) -> torch.Tensor: """ Extract a segment from waveform. Args: waveform: Input waveform [C, T] start_sec: Start time in seconds end_sec: End time in seconds sample_rate: Sample rate of waveform Returns: Segment waveform [C, t] """ start_sample = int(max(0, start_sec) * sample_rate) end_sample = int(min(end_sec * sample_rate, waveform.shape[-1])) return waveform[:, start_sample:end_sample] def split_into_chunks( self, waveform: torch.Tensor, chunk_duration: float, overlap: float = 0.0, sample_rate: Optional[int] = None, ) -> List[Tuple[torch.Tensor, float, float]]: """ Split waveform into overlapping chunks. Args: waveform: Input waveform chunk_duration: Duration of each chunk in seconds overlap: Overlap between chunks in seconds sample_rate: Sample rate (uses config if None) Returns: List of (chunk_waveform, start_sec, end_sec) """ sample_rate = sample_rate or self.config.sample_rate total_duration = self.get_duration(waveform, sample_rate) chunks = [] start = 0.0 while start < total_duration: end = min(start + chunk_duration, total_duration) chunk = self.cut_segment(waveform, start, end, sample_rate) chunks.append((chunk, start, end)) start += chunk_duration - overlap return chunks def add_noise( self, waveform: torch.Tensor, noise_level: float = 0.01, noise_type: str = "gaussian" ) -> torch.Tensor: """ Add noise to waveform (for data augmentation). Args: waveform: Input waveform noise_level: Noise amplitude (0-1) noise_type: Type of noise ("gaussian", "uniform") Returns: Waveform with added noise """ if noise_type == "gaussian": noise = torch.randn_like(waveform) * noise_level elif noise_type == "uniform": noise = (torch.rand_like(waveform) * 2 - 1) * noise_level else: raise ValueError(f"Unknown noise type: {noise_type}") return waveform + noise def save_audio( self, waveform: torch.Tensor, output_path: Union[str, Path], sample_rate: Optional[int] = None, ): """ Save waveform to audio file. Args: waveform: Waveform to save output_path: Output file path sample_rate: Sample rate (uses config if None) """ sample_rate = sample_rate or self.config.sample_rate torchaudio.save(str(output_path), waveform, sample_rate)