Spaces:
Sleeping
Sleeping
| """ | |
| Audio processing utilities for voice AI agent. | |
| Handles audio format conversion, validation, and preprocessing. | |
| """ | |
| import os | |
| import logging | |
| import wave | |
| import struct | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def validate_audio_file(file_path: str) -> bool: | |
| """ | |
| Validate if file is a valid audio file. | |
| Args: | |
| file_path: Path to audio file | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| if not os.path.exists(file_path): | |
| logger.error(f"File not found: {file_path}") | |
| return False | |
| valid_extensions = ['.wav', '.mp3', '.flac', '.ogg', '.m4a', '.webm'] | |
| _, ext = os.path.splitext(file_path) | |
| if ext.lower() not in valid_extensions: | |
| logger.error(f"Unsupported audio format: {ext}") | |
| return False | |
| return True | |
| def convert_to_wav( | |
| input_path: str, | |
| output_path: Optional[str] = None, | |
| sample_rate: int = 16000, | |
| channels: int = 1 | |
| ) -> str: | |
| """ | |
| Convert audio file to WAV format using ffmpeg. | |
| Args: | |
| input_path: Input audio file path | |
| output_path: Output WAV file path (auto-generated if None) | |
| sample_rate: Target sample rate in Hz | |
| channels: Number of audio channels (1 = mono, 2 = stereo) | |
| Returns: | |
| Path to converted WAV file | |
| """ | |
| import subprocess | |
| if output_path is None: | |
| base, _ = os.path.splitext(input_path) | |
| output_path = f"{base}_converted.wav" | |
| logger.info(f"Converting {input_path} to WAV format") | |
| try: | |
| # Use ffmpeg for conversion | |
| cmd = [ | |
| 'ffmpeg', | |
| '-i', input_path, | |
| '-ar', str(sample_rate), | |
| '-ac', str(channels), | |
| '-y', # Overwrite output file | |
| output_path | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| logger.info(f"Conversion successful: {output_path}") | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Conversion failed: {e.stderr}") | |
| raise | |
| except FileNotFoundError: | |
| logger.error("ffmpeg not found. Please install ffmpeg.") | |
| raise | |
| def get_audio_duration(file_path: str) -> float: | |
| """ | |
| Get duration of audio file in seconds. | |
| Args: | |
| file_path: Path to audio file | |
| Returns: | |
| Duration in seconds | |
| """ | |
| try: | |
| with wave.open(file_path, 'rb') as wf: | |
| frames = wf.getnframes() | |
| rate = wf.getframerate() | |
| duration = frames / float(rate) | |
| return duration | |
| except Exception as e: | |
| logger.error(f"Failed to get audio duration: {e}") | |
| # Fallback: use ffprobe | |
| import subprocess | |
| try: | |
| cmd = [ | |
| 'ffprobe', | |
| '-v', 'error', | |
| '-show_entries', 'format=duration', | |
| '-of', 'default=noprint_wrappers=1:nokey=1', | |
| file_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except: | |
| return 0.0 | |
| def normalize_audio(audio_data: np.ndarray) -> np.ndarray: | |
| """ | |
| Normalize audio data to [-1, 1] range. | |
| Args: | |
| audio_data: Audio samples as numpy array | |
| Returns: | |
| Normalized audio data | |
| """ | |
| max_val = np.abs(audio_data).max() | |
| if max_val > 0: | |
| return audio_data / max_val | |
| return audio_data | |
| def trim_silence( | |
| file_path: str, | |
| output_path: Optional[str] = None, | |
| silence_threshold: float = 0.01, | |
| min_silence_duration: float = 0.5 | |
| ) -> str: | |
| """ | |
| Remove silence from beginning and end of audio. | |
| Args: | |
| file_path: Input audio file path | |
| output_path: Output file path (auto-generated if None) | |
| silence_threshold: Amplitude threshold for silence detection | |
| min_silence_duration: Minimum silence duration to trim (seconds) | |
| Returns: | |
| Path to trimmed audio file | |
| """ | |
| if output_path is None: | |
| base, ext = os.path.splitext(file_path) | |
| output_path = f"{base}_trimmed{ext}" | |
| logger.info(f"Trimming silence from {file_path}") | |
| try: | |
| # Read WAV file | |
| with wave.open(file_path, 'rb') as wf: | |
| sample_rate = wf.getframerate() | |
| n_channels = wf.getnchannels() | |
| sample_width = wf.getsampwidth() | |
| frames = wf.readframes(wf.getnframes()) | |
| # Convert to numpy array | |
| if sample_width == 2: | |
| audio_data = np.frombuffer(frames, dtype=np.int16) | |
| elif sample_width == 4: | |
| audio_data = np.frombuffer(frames, dtype=np.int32) | |
| else: | |
| raise ValueError(f"Unsupported sample width: {sample_width}") | |
| # Reshape for multi-channel | |
| if n_channels > 1: | |
| audio_data = audio_data.reshape(-1, n_channels) | |
| # Normalize | |
| normalized = audio_data.astype(np.float32) / (2 ** (sample_width * 8 - 1)) | |
| # Find non-silent regions | |
| if n_channels > 1: | |
| magnitude = np.abs(normalized).mean(axis=1) | |
| else: | |
| magnitude = np.abs(normalized) | |
| # Find start and end of non-silent audio | |
| non_silent = magnitude > silence_threshold | |
| if not non_silent.any(): | |
| logger.warning("Entire audio is silent!") | |
| return file_path | |
| start_idx = np.argmax(non_silent) | |
| end_idx = len(non_silent) - np.argmax(non_silent[::-1]) | |
| # Extract non-silent portion | |
| if n_channels > 1: | |
| trimmed = audio_data[start_idx:end_idx] | |
| else: | |
| trimmed = audio_data[start_idx:end_idx] | |
| # Write output WAV | |
| with wave.open(output_path, 'wb') as wf: | |
| wf.setnchannels(n_channels) | |
| wf.setsampwidth(sample_width) | |
| wf.setframerate(sample_rate) | |
| wf.writeframes(trimmed.tobytes()) | |
| logger.info(f"Silence trimmed: {output_path}") | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Failed to trim silence: {e}") | |
| return file_path | |
| def resample_audio( | |
| file_path: str, | |
| target_rate: int = 16000, | |
| output_path: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Resample audio to target sample rate. | |
| Args: | |
| file_path: Input audio file | |
| target_rate: Target sample rate in Hz | |
| output_path: Output file path | |
| Returns: | |
| Path to resampled audio | |
| """ | |
| if output_path is None: | |
| base, ext = os.path.splitext(file_path) | |
| output_path = f"{base}_resampled{ext}" | |
| return convert_to_wav( | |
| input_path=file_path, | |
| output_path=output_path, | |
| sample_rate=target_rate | |
| ) | |
| def split_audio_chunks( | |
| file_path: str, | |
| chunk_duration: float = 30.0, | |
| overlap: float = 1.0 | |
| ) -> list: | |
| """ | |
| Split audio into overlapping chunks for processing long files. | |
| Args: | |
| file_path: Input audio file | |
| chunk_duration: Duration of each chunk in seconds | |
| overlap: Overlap between chunks in seconds | |
| Returns: | |
| List of (start_time, end_time, chunk_data) tuples | |
| """ | |
| logger.info(f"Splitting audio into {chunk_duration}s chunks") | |
| try: | |
| with wave.open(file_path, 'rb') as wf: | |
| sample_rate = wf.getframerate() | |
| n_channels = wf.getnchannels() | |
| sample_width = wf.getsampwidth() | |
| total_frames = wf.getnframes() | |
| chunk_frames = int(chunk_duration * sample_rate) | |
| overlap_frames = int(overlap * sample_rate) | |
| chunks = [] | |
| position = 0 | |
| while position < total_frames: | |
| wf.setpos(position) | |
| frames = wf.readframes(min(chunk_frames, total_frames - position)) | |
| start_time = position / sample_rate | |
| end_time = min((position + chunk_frames) / sample_rate, total_frames / sample_rate) | |
| chunks.append((start_time, end_time, frames)) | |
| position += chunk_frames - overlap_frames | |
| logger.info(f"Split into {len(chunks)} chunks") | |
| return chunks | |
| except Exception as e: | |
| logger.error(f"Failed to split audio: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| print("Audio utilities module loaded successfully!") | |
| print("Available functions:") | |
| print(" - validate_audio_file") | |
| print(" - convert_to_wav") | |
| print(" - get_audio_duration") | |
| print(" - normalize_audio") | |
| print(" - trim_silence") | |
| print(" - resample_audio") | |
| print(" - split_audio_chunks") | |