Target-speaker-extraction / datahandler.py
swc2's picture
change to v3.0
956c248
import os
import random
import warnings
import numpy as np
import soundfile as sf
import pyloudnorm
import glob
import librosa
def fix_audio_format(audio_path, out_sr=16000):
data, sr = librosa.load(audio_path, sr=out_sr, mono=True)
return data
class AudioMixer(object):
def __init__(
self,
sample_rate=16000,
mean_snr=-3,
var_snr=8,
mean_loudness=-24,
var_loudness=10
):
self.sample_rate = sample_rate
self.mean_snr = mean_snr
self.var_snr = var_snr
self.MEAN_LOUNDNESS = mean_loudness
self.VAR_LOUNDNESS = var_loudness
self.EPS = 1e-10
self.MAX_AMP = 0.9
self.meter = pyloudnorm.Meter(self.sample_rate)
# self.seed = 1453
# random.seed(self.seed)
# np.random.seed(self.seed)
def read_wav(self, wav_path):
data, sr = sf.read(wav_path, dtype='float32')
if data.ndim > 1:
data = data[:, 0]
return data, sr
def normalize(self, signal, is_noise=False):
c_loudness = self.meter.integrated_loudness(signal)
if is_noise:
target_loudness = np.random.normal(self.MEAN_LOUNDNESS + 4, self.VAR_LOUNDNESS**0.5)
else:
target_loudness = np.random.normal(self.MEAN_LOUNDNESS, self.VAR_LOUNDNESS**0.5)
with warnings.catch_warnings():
warnings.filterwarnings("error", category=RuntimeWarning)
signal = pyloudnorm.normalize.loudness(signal, c_loudness, target_loudness)
# peak = np.max(np.abs(signal))
# if peak >= 1.0:
# signal = signal * self.MAX_AMP / peak
return signal
def snr_norm(self, signal, noise, is_noise=True):
if is_noise:
desired_snr = np.random.normal(self.mean_snr, self.var_snr**0.5)
else:
desired_snr = np.random.uniform(2, 10)
current_snr = 10 * np.log10(
np.mean(signal ** 2) / (np.mean(noise ** 2) + self.EPS) + self.EPS
)
scale_factor = 10 ** ((current_snr - desired_snr) / 20)
scaled_noise = noise * scale_factor
# peak = np.max(np.abs(scaled_noise))
# if peak >= 1.0:
# scaled_noise = scaled_noise * self.MAX_AMP / peak
return scaled_noise
def _mix(self, sources_list):
mix_length = len(sources_list[0])
mixture = np.zeros(mix_length, dtype=np.float32)
for s in sources_list:
mixture += s[:mix_length] # 仅叠加到 mix 的长度
peak = np.max(np.abs(mixture))
if peak >= 1.0:
mixture = mixture * self.MAX_AMP / peak
return mixture
def _prepare_noise_for_mix(self, noise_files, mix_length):
random.shuffle(noise_files)
noise_all = []
total_len = 0
while total_len < mix_length:
for nf in noise_files:
noise_data, _ = self.read_wav(nf)
noise_all.append(noise_data)
total_len += len(noise_data)
if total_len >= mix_length:
break
concatenated_noise = np.concatenate(noise_all)[:mix_length]
return concatenated_noise
def mix_with_noise_folder(self, mix_path_test,noise_folder):
mix_wave, sr_mix = self.read_wav(mix_path_test)
noise_files = sorted(glob.glob(os.path.join(noise_folder, "*.wav")))
if not noise_files:
raise RuntimeError(f"噪声文件夹 {noise_folder} 内未发现 .wav 文件")
mix_wave = self.normalize(mix_wave, is_noise=False)
mix_length = len(mix_wave)
noise_ready = self._prepare_noise_for_mix(noise_files, mix_length)
noise_ready = self.snr_norm(mix_wave, noise_ready, is_noise=True)
mixture = self._mix([mix_wave, noise_ready])
out_noisy = "temp_noisy.wav"
sf.write(out_noisy, mixture, sr_mix)
return out_noisy
if __name__ == "__main__":
mix_path_test = "test_mix.wav"
noise_folder_test = "noises/"
mixer = AudioMixer()
mixed_wav_path= mixer.mix_with_noise_folder(mix_path_test, noise_folder_test)
# sf.write("test_output_mixture.wav", mixed_wav, sr)
print("混合完成,已输出到 test_output_mixture.wav")