|
|
|
|
|
import librosa |
|
|
import numpy as np |
|
|
import torch |
|
|
import torchaudio |
|
|
import soundfile as sf |
|
|
from pathlib import Path |
|
|
import warnings |
|
|
from config import SAMPLE_RATE, SUPPORTED_FORMATS, MAX_FILE_SIZE_MB |
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
class AudioProcessor: |
|
|
def __init__(self): |
|
|
self.sample_rate = SAMPLE_RATE |
|
|
|
|
|
def validate_audio_file(self, file_path): |
|
|
"""Validate audio file before processing""" |
|
|
file_path = Path(file_path) |
|
|
|
|
|
if not file_path.exists(): |
|
|
raise FileNotFoundError(f"Audio file not found: {file_path}") |
|
|
|
|
|
if file_path.suffix.lower() not in SUPPORTED_FORMATS: |
|
|
raise ValueError(f"Unsupported format: {file_path.suffix}. Supported: {SUPPORTED_FORMATS}") |
|
|
|
|
|
file_size_mb = file_path.stat().st_size / (1024 * 1024) |
|
|
if file_size_mb > MAX_FILE_SIZE_MB: |
|
|
raise ValueError(f"File too large: {file_size_mb:.1f}MB > {MAX_FILE_SIZE_MB}MB limit") |
|
|
|
|
|
return True |
|
|
|
|
|
def load_audio(self, file_path, target_sr=None): |
|
|
"""Load audio file with proper error handling and resampling""" |
|
|
if target_sr is None: |
|
|
target_sr = self.sample_rate |
|
|
|
|
|
try: |
|
|
self.validate_audio_file(file_path) |
|
|
|
|
|
|
|
|
waveform, sr = librosa.load(file_path, sr=target_sr, mono=False) |
|
|
|
|
|
|
|
|
if waveform.ndim == 1: |
|
|
waveform = np.stack([waveform, waveform]) |
|
|
elif waveform.ndim == 2 and waveform.shape[0] > 2: |
|
|
|
|
|
waveform = waveform[:2, :] |
|
|
|
|
|
|
|
|
if waveform.shape[0] > waveform.shape[1]: |
|
|
waveform = waveform.T |
|
|
|
|
|
|
|
|
waveform = torch.FloatTensor(waveform) |
|
|
waveform = waveform / (waveform.abs().max() + 1e-8) |
|
|
|
|
|
return waveform, target_sr |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Error loading audio {file_path}: {str(e)}") |
|
|
|
|
|
def save_audio(self, waveform, file_path, sample_rate=None): |
|
|
"""Save waveform to file with proper formatting""" |
|
|
if sample_rate is None: |
|
|
sample_rate = self.sample_rate |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(waveform, torch.Tensor): |
|
|
waveform = waveform.detach().cpu().numpy() |
|
|
|
|
|
|
|
|
if waveform.ndim == 2 and waveform.shape[0] == 2: |
|
|
waveform = waveform.T |
|
|
|
|
|
|
|
|
waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) |
|
|
waveform = np.clip(waveform * 0.95, -1, 1) |
|
|
|
|
|
|
|
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
sf.write(file_path, waveform, sample_rate, subtype='PCM_16') |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Error saving audio {file_path}: {str(e)}") |
|
|
|
|
|
def get_audio_info(self, file_path): |
|
|
"""Get audio file information""" |
|
|
try: |
|
|
self.validate_audio_file(file_path) |
|
|
info = sf.info(file_path) |
|
|
return { |
|
|
'duration': info.duration, |
|
|
'sample_rate': info.samplerate, |
|
|
'channels': info.channels, |
|
|
'format': info.format |
|
|
} |
|
|
except Exception as e: |
|
|
raise Exception(f"Error getting audio info: {str(e)}") |