Spaces:
Running
Running
| import os | |
| import random | |
| import warnings | |
| import numpy as np | |
| import soundfile as sf | |
| import pyloudnorm | |
| import glob | |
| import librosa | |
| def fix_audio_format(audio_path, out_sr=16000): | |
| data, sr = librosa.load(audio_path, sr=out_sr, mono=True) | |
| return data | |
| class AudioMixer(object): | |
| def __init__( | |
| self, | |
| sample_rate=16000, | |
| mean_snr=-3, | |
| var_snr=8, | |
| mean_loudness=-24, | |
| var_loudness=10 | |
| ): | |
| self.sample_rate = sample_rate | |
| self.mean_snr = mean_snr | |
| self.var_snr = var_snr | |
| self.MEAN_LOUNDNESS = mean_loudness | |
| self.VAR_LOUNDNESS = var_loudness | |
| self.EPS = 1e-10 | |
| self.MAX_AMP = 0.9 | |
| self.meter = pyloudnorm.Meter(self.sample_rate) | |
| # self.seed = 1453 | |
| # random.seed(self.seed) | |
| # np.random.seed(self.seed) | |
| def read_wav(self, wav_path): | |
| data, sr = sf.read(wav_path, dtype='float32') | |
| if data.ndim > 1: | |
| data = data[:, 0] | |
| return data, sr | |
| def normalize(self, signal, is_noise=False): | |
| c_loudness = self.meter.integrated_loudness(signal) | |
| if is_noise: | |
| target_loudness = np.random.normal(self.MEAN_LOUNDNESS + 4, self.VAR_LOUNDNESS**0.5) | |
| else: | |
| target_loudness = np.random.normal(self.MEAN_LOUNDNESS, self.VAR_LOUNDNESS**0.5) | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings("error", category=RuntimeWarning) | |
| signal = pyloudnorm.normalize.loudness(signal, c_loudness, target_loudness) | |
| # peak = np.max(np.abs(signal)) | |
| # if peak >= 1.0: | |
| # signal = signal * self.MAX_AMP / peak | |
| return signal | |
| def snr_norm(self, signal, noise, is_noise=True): | |
| if is_noise: | |
| desired_snr = np.random.normal(self.mean_snr, self.var_snr**0.5) | |
| else: | |
| desired_snr = np.random.uniform(2, 10) | |
| current_snr = 10 * np.log10( | |
| np.mean(signal ** 2) / (np.mean(noise ** 2) + self.EPS) + self.EPS | |
| ) | |
| scale_factor = 10 ** ((current_snr - desired_snr) / 20) | |
| scaled_noise = noise * scale_factor | |
| # peak = np.max(np.abs(scaled_noise)) | |
| # if peak >= 1.0: | |
| # scaled_noise = scaled_noise * self.MAX_AMP / peak | |
| return scaled_noise | |
| def _mix(self, sources_list): | |
| mix_length = len(sources_list[0]) | |
| mixture = np.zeros(mix_length, dtype=np.float32) | |
| for s in sources_list: | |
| mixture += s[:mix_length] # 仅叠加到 mix 的长度 | |
| peak = np.max(np.abs(mixture)) | |
| if peak >= 1.0: | |
| mixture = mixture * self.MAX_AMP / peak | |
| return mixture | |
| def _prepare_noise_for_mix(self, noise_files, mix_length): | |
| random.shuffle(noise_files) | |
| noise_all = [] | |
| total_len = 0 | |
| while total_len < mix_length: | |
| for nf in noise_files: | |
| noise_data, _ = self.read_wav(nf) | |
| noise_all.append(noise_data) | |
| total_len += len(noise_data) | |
| if total_len >= mix_length: | |
| break | |
| concatenated_noise = np.concatenate(noise_all)[:mix_length] | |
| return concatenated_noise | |
| def mix_with_noise_folder(self, mix_path_test,noise_folder): | |
| mix_wave, sr_mix = self.read_wav(mix_path_test) | |
| noise_files = sorted(glob.glob(os.path.join(noise_folder, "*.wav"))) | |
| if not noise_files: | |
| raise RuntimeError(f"噪声文件夹 {noise_folder} 内未发现 .wav 文件") | |
| mix_wave = self.normalize(mix_wave, is_noise=False) | |
| mix_length = len(mix_wave) | |
| noise_ready = self._prepare_noise_for_mix(noise_files, mix_length) | |
| noise_ready = self.snr_norm(mix_wave, noise_ready, is_noise=True) | |
| mixture = self._mix([mix_wave, noise_ready]) | |
| out_noisy = "temp_noisy.wav" | |
| sf.write(out_noisy, mixture, sr_mix) | |
| return out_noisy | |
| if __name__ == "__main__": | |
| mix_path_test = "test_mix.wav" | |
| noise_folder_test = "noises/" | |
| mixer = AudioMixer() | |
| mixed_wav_path= mixer.mix_with_noise_folder(mix_path_test, noise_folder_test) | |
| # sf.write("test_output_mixture.wav", mixed_wav, sr) | |
| print("混合完成,已输出到 test_output_mixture.wav") | |