import numpy as np from data.eq_utils import apply_random_eq from pedalboard import Pedalboard, Resample, Compressor, Distortion, Reverb, Limiter, MP3Compressor, HighpassFilter, LowpassFilter import torch from scipy.signal import butter, lfilter, sosfilt try: import pyroomacoustics as pra except Exception as e: print(f"[WARN] Failed to import pyroomacoustics. Reverb effects will be disabled. Reason: {e}") else: from encodec import EncodecModel from encodec.utils import convert_audio def fix_length_to_duration(target: np.ndarray, duration: float) -> np.ndarray: target_duration = target.shape[-1] if target_duration < duration: target = np.pad(target, ((0, 0), (0, int(duration - target_duration))), mode='constant') elif target_duration > duration: target = target[:, :int(duration)] return target def calculate_rms(audio: np.ndarray) -> float: return np.sqrt(np.mean(audio**2)) def apply_fm_effect(audio: np.ndarray, sample_rate: int) -> np.ndarray: cutoff_freq = np.random.uniform(8000, 14000) order = 5 noise_level = np.random.uniform(0.0005, 0.005) def butter_lowpass(cutoff, fs, order=5): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = butter(order, normal_cutoff, btype='low', analog=False) return b, a b, a = butter_lowpass(cutoff_freq, sample_rate, order=order) filtered_audio = np.array([lfilter(b, a, channel) for channel in audio]) noise = np.random.normal(0, 1, filtered_audio.shape) * noise_level fm_audio = filtered_audio + noise np.clip(fm_audio, -1.0, 1.0, out=fm_audio) return fm_audio def apply_random_room_reverb(audio, sr): C, L = audio.shape room_dim = np.random.uniform(3, 9, size=3) room = pra.ShoeBox(room_dim, fs=sr, max_order=np.random.randint(4, 7), absorption=np.random.uniform(0.2, 0.7)) mic_loc = np.array([ np.random.uniform(0.5, room_dim[0]-0.5), np.random.uniform(0.5, room_dim[1]-0.5), np.random.uniform(1.0, 2.0), ]) source_loc = np.array([ np.random.uniform(0.5, room_dim[0]-0.5), np.random.uniform(0.5, room_dim[1]-0.5), np.random.uniform(1.0, 2.0), ]) room.add_microphone(mic_loc) room.add_source(source_loc, signal=audio.mean(axis=0)) room.compute_rir() WET_LEVEL = np.random.uniform(0.1, 0.6) DRY_LEVEL = np.random.uniform(0.5, 1.0) wet_audio = np.vstack([ np.convolve(audio[ch], room.rir[0][0], mode="full")[:L] for ch in range(C) ]) wet_norm = np.max(np.abs(wet_audio)) + 1e-8 out = (audio * DRY_LEVEL) + (wet_audio * (WET_LEVEL / wet_norm)) max_out = np.max(np.abs(out)) + 1e-8 out_normalized = out / max_out return out_normalized def apply_live_dt4_simple(audio: np.ndarray, sample_rate: int, snr_db: float = 20.0) -> np.ndarray: audio = apply_random_room_reverb(audio, sample_rate) audio = _apply_phone_filter(audio, sample_rate) audio = _add_environmental_noise(audio, sample_rate, snr_db) return audio def _apply_phone_filter(audio: np.ndarray, sample_rate: int) -> np.ndarray: lowcut = 300.0 highcut = 3400.0 nyq = 0.5 * sample_rate low = lowcut / nyq high = highcut / nyq sos = butter(4, [low, high], btype='band', output='sos') filtered = np.array([sosfilt(sos, channel) for channel in audio]) return filtered def _add_environmental_noise(audio: np.ndarray, sample_rate: int, snr_db: float) -> np.ndarray: C, L = audio.shape noise = _generate_noise(L, sample_rate) if C > 1: noise = np.tile(noise, (C, 1)) signal_power = np.mean(audio ** 2) noise_power = np.mean(noise ** 2) if noise_power > 0: target_noise_power = signal_power / (10 ** (snr_db / 10)) scale = np.sqrt(target_noise_power / noise_power) noise = noise * scale mixed = audio + noise max_val = np.max(np.abs(mixed)) if max_val > 1.0: mixed = mixed / max_val return mixed def _generate_noise(length: int, sample_rate: int) -> np.ndarray: t = np.arange(length) / sample_rate noise = np.random.normal(0, 1, length) low_freq = np.random.uniform(50, 120) noise += 0.3 * np.sin(2 * np.pi * low_freq * t) mid_freq = np.random.uniform(200, 800) noise += 0.2 * np.sin(2 * np.pi * mid_freq * t + np.random.uniform(0, 2*np.pi)) b = [0.1, 0.2, 0.4, 0.2, 0.1] noise = lfilter(b, 1, noise) return noise class MasteringEnhancer: def __init__(self): pass def __call__(self, audio: np.ndarray, sr: int): board = Pedalboard() if np.random.rand() < 0.5: board.append(LowpassFilter(np.random.uniform(14000, 19000))) if np.random.rand() < 0.5: board.append(HighpassFilter(np.random.uniform(20, 60))) if np.random.rand() < 0.7: board.append(Compressor( threshold_db=np.random.uniform(-12, -6), ratio=np.random.uniform(1.2, 2.0), attack_ms=np.random.uniform(10, 30), release_ms=np.random.uniform(100, 300) )) if np.random.rand() < 0.6: board.append(Distortion(drive_db=np.random.uniform(0.5, 2.0))) board.append(Limiter(threshold_db=np.random.uniform(-3, -0.1))) return board(audio, sample_rate=sr) class StemAugmentation: def __init__(self): pass def apply(self, audio: np.ndarray, sample_rate: int = 44100) -> np.ndarray: if np.max(np.abs(audio)) == 0: return audio original_length = audio.shape[-1] original_rms = calculate_rms(audio) if original_rms == 0: return audio normalize_scale = np.max(np.abs(audio)) + 1e-6 audio = audio / normalize_scale do_eq, do_resample, do_compressor, do_distortion, do_reverb = np.random.randint(0, 2, 5) # 5 random choices if do_eq: audio = apply_random_eq(audio, sample_rate) # Assuming this preserves length board = Pedalboard() if do_resample: board.append(Resample(target_sample_rate=np.random.randint(8000, 32000))) if do_compressor: board.append(Compressor( threshold_db=np.random.uniform(-20, 0), ratio=np.random.uniform(1.5, 10.0), attack_ms=np.random.uniform(1, 10), release_ms=np.random.uniform(50, 200) )) if do_distortion: board.append(Distortion(drive_db=np.random.uniform(0, 5))) if do_reverb: board.append(Reverb( room_size=np.random.uniform(0.1, 1.0), damping=np.random.uniform(0.1, 1.0), wet_level=np.random.uniform(0.1, 0.5), width=np.random.uniform(0.1, 1.0) )) if len(board) > 0: audio = board(audio, sample_rate=sample_rate) audio = fix_length_to_duration(audio, original_length) new_rms = calculate_rms(audio) return audio * (original_rms / new_rms) class MixtureAugmentation: def __init__(self): self.encodec_model = EncodecModel.encodec_model_48khz() self.encodec_model.eval() self.encodec_available = True self.encodec_bandwidths = [3.0, 6.0, 12.0, 24.0] self.p_resample = 0 self.p_mastering = 0 self.p_mp3 = 0 self.p_fm = 0 self.p_live = 0 self.p_encodec = 1.0 self.is_cuda_initialized = False self.mastering = MasteringEnhancer() def apply(self, audio: np.ndarray, sample_rate: int = 44100) -> np.ndarray: if np.max(np.abs(audio)) == 0: return audio original_length = audio.shape[-1] original_rms = calculate_rms(audio) if original_rms == 0: return audio normalize_scale = np.max(np.abs(audio)) + 1e-6 audio = audio / normalize_scale board = Pedalboard() if np.random.rand() < self.p_resample: board.append(Resample(target_sample_rate=np.random.randint(16000, 44100))) if np.random.rand() < self.p_mastering: audio = self.mastering(audio, sample_rate) if np.random.rand() < self.p_mp3: board.append(MP3Compressor(vbr_quality=np.random.uniform(1.0, 9.0))) if np.random.rand() < self.p_fm: audio = apply_fm_effect(audio, sample_rate) if np.random.rand() < self.p_live: audio = apply_live_dt4_simple(audio, sample_rate) if np.random.rand() < self.p_encodec: device = 'cpu' model = self.encodec_model target_bw = np.random.choice(self.encodec_bandwidths) model.set_target_bandwidth(target_bw) wav_tensor = torch.from_numpy(audio).float().to(device) wav_processed = convert_audio(wav_tensor, sample_rate, model.sample_rate, model.channels) wav_input = wav_processed.unsqueeze(0) with torch.no_grad(): reconstructed_tensor = model(wav_input).squeeze(0) audio = reconstructed_tensor.cpu().numpy() sample_rate = model.sample_rate if len(board) > 0: audio = board(audio, sample_rate=sample_rate) audio = fix_length_to_duration(audio, original_length) new_rms = calculate_rms(audio) return audio * (original_rms / new_rms)