swc2 commited on
Commit
ab3af29
·
1 Parent(s): 8c575ce
Files changed (2) hide show
  1. datahandler.py +22 -65
  2. decode.py +4 -22
datahandler.py CHANGED
@@ -30,9 +30,7 @@ class AudioMixer(object):
30
  mean_loudness=-24,
31
  var_loudness=20
32
  ):
33
- """
34
- 初始化一些参数、随机种子和响度计算工具等。
35
- """
36
  self.sample_rate = sample_rate
37
  self.mean_snr = mean_snr
38
  self.var_snr = var_snr
@@ -42,41 +40,37 @@ class AudioMixer(object):
42
  self.EPS = 1e-10
43
  self.MAX_AMP = 0.9
44
 
45
- # pyloudnorm 的 Meter,用于计算音频响度
46
  self.meter = pyloudnorm.Meter(self.sample_rate)
47
 
48
- # # 也可固定随机种子,保证每次混合一致(如果想要可复现)
49
  # self.seed = 1453
50
  # random.seed(self.seed)
51
  # np.random.seed(self.seed)
52
 
53
  def read_wav(self, wav_path):
54
- """
55
- 读取音频文件并返回 wave 数据和采样率
56
- """
57
  data, sr = sf.read(wav_path, dtype='float32')
58
- # 如果读到的是多通道,可只取其中一个通道
59
  if data.ndim > 1:
60
  data = data[:, 0]
61
  return data, sr
62
 
63
  def normalize(self, signal, is_noise=False):
64
- """
65
- 对输入的 signal 做响度归一化,并确保不会过载失真。
66
- """
67
  c_loudness = self.meter.integrated_loudness(signal)
68
  if is_noise:
69
- # 噪声的目标响度可以偏高一些或随便设置
70
  target_loudness = np.random.normal(self.MEAN_LOUNDNESS + 4, self.VAR_LOUNDNESS**0.5)
71
  else:
72
- # mix 或者语音的目标响度
73
  target_loudness = np.random.normal(self.MEAN_LOUNDNESS, self.VAR_LOUNDNESS**0.5)
74
 
75
  with warnings.catch_warnings():
76
  warnings.filterwarnings("error", category=RuntimeWarning)
77
  signal = pyloudnorm.normalize.loudness(signal, c_loudness, target_loudness)
78
 
79
- # # 再检查是否会 clipping
80
  # peak = np.max(np.abs(signal))
81
  # if peak >= 1.0:
82
  # signal = signal * self.MAX_AMP / peak
@@ -84,14 +78,11 @@ class AudioMixer(object):
84
  return signal
85
 
86
  def snr_norm(self, signal, noise, is_noise=True):
87
- """
88
- 根据预设的 mean_snr、var_snr 来随机决定一个目标 SNR,然后
89
- 以此对 noise 做缩放,得到与 signal 相匹配的噪声幅度。
90
- """
91
  if is_noise:
92
  desired_snr = np.random.normal(self.mean_snr, self.var_snr**0.5)
93
  else:
94
- # 如果你还有别的需求,比如想做正 SNR 范围,可以改这里
95
  desired_snr = np.random.uniform(2, 10)
96
 
97
  current_snr = 10 * np.log10(
@@ -101,7 +92,6 @@ class AudioMixer(object):
101
 
102
  scaled_noise = noise * scale_factor
103
 
104
- # # 防止噪声自身 clipping
105
  # peak = np.max(np.abs(scaled_noise))
106
  # if peak >= 1.0:
107
  # scaled_noise = scaled_noise * self.MAX_AMP / peak
@@ -109,16 +99,14 @@ class AudioMixer(object):
109
  return scaled_noise
110
 
111
  def _mix(self, sources_list):
112
- """
113
- 将多路音频进行叠加,防止溢出。
114
- """
115
- # 假设 sources_list[0] 是 mix 音频,sources_list[1] 是已拼好长度的 noise
116
  mix_length = len(sources_list[0])
117
  mixture = np.zeros(mix_length, dtype=np.float32)
118
  for s in sources_list:
119
  mixture += s[:mix_length] # 仅叠加到 mix 的长度
120
 
121
- # 再做一次峰值校正,避免溢出
122
  peak = np.max(np.abs(mixture))
123
  if peak >= 1.0:
124
  mixture = mixture * self.MAX_AMP / peak
@@ -126,30 +114,16 @@ class AudioMixer(object):
126
  return mixture
127
 
128
  def _prepare_noise_for_mix(self, noise_files, mix_length):
129
- """
130
- 传入一组 noise 文件路径,先对它们打乱,再依次读取、拼接。
131
- 如果总长度还不够覆盖 mix_length,可以再次拼接自己(循环)。
132
-
133
- - noise_files: 存储多个噪声文件路径的列表
134
- - mix_length: 需要的总长度(采样点数)
135
-
136
- 返回: 拼接后的 noise 波形
137
- """
138
- # 先随机打乱
139
  random.shuffle(noise_files)
140
 
141
- # 依次读取并拼接
142
  noise_all = []
143
  total_len = 0
144
 
145
- # 第一次先拼完所有 noise 文件,如果还不够,就重复拼接
146
  while total_len < mix_length:
147
  for nf in noise_files:
148
  noise_data, _ = self.read_wav(nf)
149
 
150
- # 可选:对每条 noise 做一次 normalize,提升多样性
151
- # (或者只在外部做一次统一的 normalize)
152
- #noise_data = self.normalize(noise_data, is_noise=True)
153
 
154
  noise_all.append(noise_data)
155
  total_len += len(noise_data)
@@ -157,24 +131,12 @@ class AudioMixer(object):
157
  if total_len >= mix_length:
158
  break
159
 
160
- # 如果已经拼完一轮,可能还不够,就继续 while 循环再拼一轮
161
-
162
- # 拼接后截断到 mix_length
163
  concatenated_noise = np.concatenate(noise_all)[:mix_length]
164
  return concatenated_noise
165
 
166
  def mix_with_noise_folder(self, mix_wave,sr_mix,noise_folder):
167
- """
168
- 读取一条 mix 文件和一个 noise 文件夹,做如下处理:
169
- 1. 读取 mix wave,并做响度归一化
170
- 2. 根据 mix 的长度,在 noise 文件夹中随机打乱全部 wav,依次拼接满足同长度
171
- 3. 对最终拼好的 noise 做 snr_norm
172
- 4. 叠加输出
173
- """
174
- # 1. 读取 mix
175
- # mix_wave, sr_mix = self.read_wav(mix_path)
176
-
177
- # 如果文件夹下找不到任何 noise 文件,就直接返回原音频
178
  noise_files = sorted(glob.glob(os.path.join(noise_folder, "*.wav")))
179
  if not noise_files:
180
  raise RuntimeError(f"噪声文件夹 {noise_folder} 内未发现 .wav 文件")
@@ -182,35 +144,30 @@ class AudioMixer(object):
182
  mix_wave = self.normalize(mix_wave, is_noise=False)
183
  mix_length = len(mix_wave)
184
 
185
- # 2. 先把 noise 文件拼接到 match mix_length
186
- # (会将 noise_files 打乱后依次读、拼接)
187
  noise_ready = self._prepare_noise_for_mix(noise_files, mix_length)
188
 
189
- # 3. SNR 调整
190
  noise_ready = self.snr_norm(mix_wave, noise_ready, is_noise=True)
191
 
192
- # 4. 叠加
193
  mixture = self._mix([mix_wave, noise_ready])
194
 
195
- out_noisy = "temp_noisy.wav" # 可以理解为把输入的混合音频直接另存为
196
 
197
- # 返回混合后的音频以及采样率
198
  sf.write(out_noisy, mixture, sr_mix)
199
 
200
  return out_noisy
201
 
202
 
203
  if __name__ == "__main__":
204
- # 假设你有一个 mix.wav 以及一个 noise 文件夹(含若干个 .wav 噪声文件)
205
  mix_path_test = "test_mix.wav"
206
  mix_wave, sr_mix = self.read_wav(mix_path_test)
207
- noise_folder_test = "noises/" # 比如里面有 10 条 noise*.wav
208
 
209
  mixer = AudioMixer()
210
 
211
- # 执行混合
212
  mixed_wav, sr = mixer.mix_with_noise_folder(mix_wave, sr_mix, noise_folder_test)
213
 
214
- # 这里你可以选择把结果写回本地文件,或直接返回 numpy 数组做后续处理
215
  sf.write("test_output_mixture.wav", mixed_wav, sr)
216
  print("混合完成,已输出到 test_output_mixture.wav")
 
30
  mean_loudness=-24,
31
  var_loudness=20
32
  ):
33
+
 
 
34
  self.sample_rate = sample_rate
35
  self.mean_snr = mean_snr
36
  self.var_snr = var_snr
 
40
  self.EPS = 1e-10
41
  self.MAX_AMP = 0.9
42
 
43
+
44
  self.meter = pyloudnorm.Meter(self.sample_rate)
45
 
46
+
47
  # self.seed = 1453
48
  # random.seed(self.seed)
49
  # np.random.seed(self.seed)
50
 
51
  def read_wav(self, wav_path):
52
+
 
 
53
  data, sr = sf.read(wav_path, dtype='float32')
54
+
55
  if data.ndim > 1:
56
  data = data[:, 0]
57
  return data, sr
58
 
59
  def normalize(self, signal, is_noise=False):
60
+
 
 
61
  c_loudness = self.meter.integrated_loudness(signal)
62
  if is_noise:
63
+
64
  target_loudness = np.random.normal(self.MEAN_LOUNDNESS + 4, self.VAR_LOUNDNESS**0.5)
65
  else:
66
+
67
  target_loudness = np.random.normal(self.MEAN_LOUNDNESS, self.VAR_LOUNDNESS**0.5)
68
 
69
  with warnings.catch_warnings():
70
  warnings.filterwarnings("error", category=RuntimeWarning)
71
  signal = pyloudnorm.normalize.loudness(signal, c_loudness, target_loudness)
72
 
73
+
74
  # peak = np.max(np.abs(signal))
75
  # if peak >= 1.0:
76
  # signal = signal * self.MAX_AMP / peak
 
78
  return signal
79
 
80
  def snr_norm(self, signal, noise, is_noise=True):
81
+
 
 
 
82
  if is_noise:
83
  desired_snr = np.random.normal(self.mean_snr, self.var_snr**0.5)
84
  else:
85
+
86
  desired_snr = np.random.uniform(2, 10)
87
 
88
  current_snr = 10 * np.log10(
 
92
 
93
  scaled_noise = noise * scale_factor
94
 
 
95
  # peak = np.max(np.abs(scaled_noise))
96
  # if peak >= 1.0:
97
  # scaled_noise = scaled_noise * self.MAX_AMP / peak
 
99
  return scaled_noise
100
 
101
  def _mix(self, sources_list):
102
+
103
+
 
 
104
  mix_length = len(sources_list[0])
105
  mixture = np.zeros(mix_length, dtype=np.float32)
106
  for s in sources_list:
107
  mixture += s[:mix_length] # 仅叠加到 mix 的长度
108
 
109
+
110
  peak = np.max(np.abs(mixture))
111
  if peak >= 1.0:
112
  mixture = mixture * self.MAX_AMP / peak
 
114
  return mixture
115
 
116
  def _prepare_noise_for_mix(self, noise_files, mix_length):
117
+
 
 
 
 
 
 
 
 
 
118
  random.shuffle(noise_files)
119
 
 
120
  noise_all = []
121
  total_len = 0
122
 
 
123
  while total_len < mix_length:
124
  for nf in noise_files:
125
  noise_data, _ = self.read_wav(nf)
126
 
 
 
 
127
 
128
  noise_all.append(noise_data)
129
  total_len += len(noise_data)
 
131
  if total_len >= mix_length:
132
  break
133
 
 
 
 
134
  concatenated_noise = np.concatenate(noise_all)[:mix_length]
135
  return concatenated_noise
136
 
137
  def mix_with_noise_folder(self, mix_wave,sr_mix,noise_folder):
138
+
139
+
 
 
 
 
 
 
 
 
 
140
  noise_files = sorted(glob.glob(os.path.join(noise_folder, "*.wav")))
141
  if not noise_files:
142
  raise RuntimeError(f"噪声文件夹 {noise_folder} 内未发现 .wav 文件")
 
144
  mix_wave = self.normalize(mix_wave, is_noise=False)
145
  mix_length = len(mix_wave)
146
 
147
+
 
148
  noise_ready = self._prepare_noise_for_mix(noise_files, mix_length)
149
 
 
150
  noise_ready = self.snr_norm(mix_wave, noise_ready, is_noise=True)
151
 
 
152
  mixture = self._mix([mix_wave, noise_ready])
153
 
154
+ out_noisy = "temp_noisy.wav"
155
 
 
156
  sf.write(out_noisy, mixture, sr_mix)
157
 
158
  return out_noisy
159
 
160
 
161
  if __name__ == "__main__":
162
+
163
  mix_path_test = "test_mix.wav"
164
  mix_wave, sr_mix = self.read_wav(mix_path_test)
165
+ noise_folder_test = "noises/"
166
 
167
  mixer = AudioMixer()
168
 
169
+
170
  mixed_wav, sr = mixer.mix_with_noise_folder(mix_wave, sr_mix, noise_folder_test)
171
 
 
172
  sf.write("test_output_mixture.wav", mixed_wav, sr)
173
  print("混合完成,已输出到 test_output_mixture.wav")
decode.py CHANGED
@@ -10,7 +10,6 @@ from omegaconf import OmegaConf
10
 
11
 
12
 
13
- # ================ 网络推理类 ================
14
  class NnetComputer(object):
15
  def __init__(self, cpt_dir, gpuid, nnet_conf):
16
  self.device = th.device(f"cuda:{gpuid}") if gpuid >= 0 else th.device("cpu")
@@ -37,41 +36,24 @@ class NnetComputer(object):
37
  return sp_samps
38
 
39
  class InferencePipeline:
40
- """
41
- 外部只需传入 config,即可完成:
42
- 1) 模型实例化 (含 hydra.instantiate 逻辑)
43
- 2) 加载 checkpoint
44
- 3) 推理
45
- """
46
  def __init__(self, config):
47
- """
48
- 在构造时就把所有初始化做好,包括:
49
- - hydra.instantiate(config.model) -> 得到一个 nn.Module
50
- - 用 NnetComputer(...) 封装
51
- """
52
- # 如果 config.model 里含有 _target_ 字段,可以用 hydra.instantiate
53
- # 注意: hydra.instantiate 需要在这里显式地导入 hydra.utils
54
-
55
- # 1. 根据 config.model 构建模型
56
  model_inst = hydra.utils.instantiate(config.model)
57
 
58
  self.computer_ = NnetComputer(config.test.checkpoint,config.test.gpu, model_inst)
59
 
60
  def run_inference(self, input_audio_path: str, enroll_audio_path: str) -> str:
61
- """
62
- 给定混合音频 + enroll 音频,执行推理并返回输出文件路径。
63
- """
64
- # 1. 读取音频
65
  mix_samps, sr = sf.read(input_audio_path)
66
  aux_samps, sr2 = sf.read(enroll_audio_path)
67
 
68
- # 2. 调用底层 compute
69
  samps = self.computer_.compute(mix_samps, aux_samps, len(aux_samps))
70
  norm = np.linalg.norm(mix_samps, np.inf)
71
  samps = samps[:mix_samps.size]
72
  samps = samps * norm / np.max(np.abs(samps))
73
 
74
- # 3. 写到临时文件
75
  out_wav = "temp_extracted.wav"
76
  sf.write(out_wav, samps, sr)
77
  return out_wav
 
10
 
11
 
12
 
 
13
  class NnetComputer(object):
14
  def __init__(self, cpt_dir, gpuid, nnet_conf):
15
  self.device = th.device(f"cuda:{gpuid}") if gpuid >= 0 else th.device("cpu")
 
36
  return sp_samps
37
 
38
  class InferencePipeline:
39
+
 
 
 
 
 
40
  def __init__(self, config):
41
+
 
 
 
 
 
 
 
 
42
  model_inst = hydra.utils.instantiate(config.model)
43
 
44
  self.computer_ = NnetComputer(config.test.checkpoint,config.test.gpu, model_inst)
45
 
46
  def run_inference(self, input_audio_path: str, enroll_audio_path: str) -> str:
47
+
 
 
 
48
  mix_samps, sr = sf.read(input_audio_path)
49
  aux_samps, sr2 = sf.read(enroll_audio_path)
50
 
 
51
  samps = self.computer_.compute(mix_samps, aux_samps, len(aux_samps))
52
  norm = np.linalg.norm(mix_samps, np.inf)
53
  samps = samps[:mix_samps.size]
54
  samps = samps * norm / np.max(np.abs(samps))
55
 
56
+
57
  out_wav = "temp_extracted.wav"
58
  sf.write(out_wav, samps, sr)
59
  return out_wav