""" Audio processing utilities for voice AI agent. Handles audio format conversion, validation, and preprocessing. """ import os import logging import wave import struct from typing import Optional, Tuple import numpy as np logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def validate_audio_file(file_path: str) -> bool: """ Validate if file is a valid audio file. Args: file_path: Path to audio file Returns: True if valid, False otherwise """ if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False valid_extensions = ['.wav', '.mp3', '.flac', '.ogg', '.m4a', '.webm'] _, ext = os.path.splitext(file_path) if ext.lower() not in valid_extensions: logger.error(f"Unsupported audio format: {ext}") return False return True def convert_to_wav( input_path: str, output_path: Optional[str] = None, sample_rate: int = 16000, channels: int = 1 ) -> str: """ Convert audio file to WAV format using ffmpeg. Args: input_path: Input audio file path output_path: Output WAV file path (auto-generated if None) sample_rate: Target sample rate in Hz channels: Number of audio channels (1 = mono, 2 = stereo) Returns: Path to converted WAV file """ import subprocess if output_path is None: base, _ = os.path.splitext(input_path) output_path = f"{base}_converted.wav" logger.info(f"Converting {input_path} to WAV format") try: # Use ffmpeg for conversion cmd = [ 'ffmpeg', '-i', input_path, '-ar', str(sample_rate), '-ac', str(channels), '-y', # Overwrite output file output_path ] result = subprocess.run( cmd, capture_output=True, text=True, check=True ) logger.info(f"Conversion successful: {output_path}") return output_path except subprocess.CalledProcessError as e: logger.error(f"Conversion failed: {e.stderr}") raise except FileNotFoundError: logger.error("ffmpeg not found. Please install ffmpeg.") raise def get_audio_duration(file_path: str) -> float: """ Get duration of audio file in seconds. Args: file_path: Path to audio file Returns: Duration in seconds """ try: with wave.open(file_path, 'rb') as wf: frames = wf.getnframes() rate = wf.getframerate() duration = frames / float(rate) return duration except Exception as e: logger.error(f"Failed to get audio duration: {e}") # Fallback: use ffprobe import subprocess try: cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return float(result.stdout.strip()) except: return 0.0 def normalize_audio(audio_data: np.ndarray) -> np.ndarray: """ Normalize audio data to [-1, 1] range. Args: audio_data: Audio samples as numpy array Returns: Normalized audio data """ max_val = np.abs(audio_data).max() if max_val > 0: return audio_data / max_val return audio_data def trim_silence( file_path: str, output_path: Optional[str] = None, silence_threshold: float = 0.01, min_silence_duration: float = 0.5 ) -> str: """ Remove silence from beginning and end of audio. Args: file_path: Input audio file path output_path: Output file path (auto-generated if None) silence_threshold: Amplitude threshold for silence detection min_silence_duration: Minimum silence duration to trim (seconds) Returns: Path to trimmed audio file """ if output_path is None: base, ext = os.path.splitext(file_path) output_path = f"{base}_trimmed{ext}" logger.info(f"Trimming silence from {file_path}") try: # Read WAV file with wave.open(file_path, 'rb') as wf: sample_rate = wf.getframerate() n_channels = wf.getnchannels() sample_width = wf.getsampwidth() frames = wf.readframes(wf.getnframes()) # Convert to numpy array if sample_width == 2: audio_data = np.frombuffer(frames, dtype=np.int16) elif sample_width == 4: audio_data = np.frombuffer(frames, dtype=np.int32) else: raise ValueError(f"Unsupported sample width: {sample_width}") # Reshape for multi-channel if n_channels > 1: audio_data = audio_data.reshape(-1, n_channels) # Normalize normalized = audio_data.astype(np.float32) / (2 ** (sample_width * 8 - 1)) # Find non-silent regions if n_channels > 1: magnitude = np.abs(normalized).mean(axis=1) else: magnitude = np.abs(normalized) # Find start and end of non-silent audio non_silent = magnitude > silence_threshold if not non_silent.any(): logger.warning("Entire audio is silent!") return file_path start_idx = np.argmax(non_silent) end_idx = len(non_silent) - np.argmax(non_silent[::-1]) # Extract non-silent portion if n_channels > 1: trimmed = audio_data[start_idx:end_idx] else: trimmed = audio_data[start_idx:end_idx] # Write output WAV with wave.open(output_path, 'wb') as wf: wf.setnchannels(n_channels) wf.setsampwidth(sample_width) wf.setframerate(sample_rate) wf.writeframes(trimmed.tobytes()) logger.info(f"Silence trimmed: {output_path}") return output_path except Exception as e: logger.error(f"Failed to trim silence: {e}") return file_path def resample_audio( file_path: str, target_rate: int = 16000, output_path: Optional[str] = None ) -> str: """ Resample audio to target sample rate. Args: file_path: Input audio file target_rate: Target sample rate in Hz output_path: Output file path Returns: Path to resampled audio """ if output_path is None: base, ext = os.path.splitext(file_path) output_path = f"{base}_resampled{ext}" return convert_to_wav( input_path=file_path, output_path=output_path, sample_rate=target_rate ) def split_audio_chunks( file_path: str, chunk_duration: float = 30.0, overlap: float = 1.0 ) -> list: """ Split audio into overlapping chunks for processing long files. Args: file_path: Input audio file chunk_duration: Duration of each chunk in seconds overlap: Overlap between chunks in seconds Returns: List of (start_time, end_time, chunk_data) tuples """ logger.info(f"Splitting audio into {chunk_duration}s chunks") try: with wave.open(file_path, 'rb') as wf: sample_rate = wf.getframerate() n_channels = wf.getnchannels() sample_width = wf.getsampwidth() total_frames = wf.getnframes() chunk_frames = int(chunk_duration * sample_rate) overlap_frames = int(overlap * sample_rate) chunks = [] position = 0 while position < total_frames: wf.setpos(position) frames = wf.readframes(min(chunk_frames, total_frames - position)) start_time = position / sample_rate end_time = min((position + chunk_frames) / sample_rate, total_frames / sample_rate) chunks.append((start_time, end_time, frames)) position += chunk_frames - overlap_frames logger.info(f"Split into {len(chunks)} chunks") return chunks except Exception as e: logger.error(f"Failed to split audio: {e}") raise if __name__ == "__main__": print("Audio utilities module loaded successfully!") print("Available functions:") print(" - validate_audio_file") print(" - convert_to_wav") print(" - get_audio_duration") print(" - normalize_audio") print(" - trim_silence") print(" - resample_audio") print(" - split_audio_chunks")