WeixuanYuan commited on
Commit
1ecb721
·
verified ·
1 Parent(s): cf4423e

Upload 8 files

Browse files
metrics/FD.py ADDED
@@ -0,0 +1,293 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+
4
+ import librosa
5
+ import numpy as np
6
+ import torch
7
+ from tqdm import tqdm
8
+ from scipy.linalg import sqrtm
9
+
10
+ from metrics.pipelines import sample_pipeline, sample_pipeline_GAN
11
+ from metrics.pipelines_STFT import sample_pipeline_STFT, sample_pipeline_GAN_STFT
12
+ from tools import rms_normalize
13
+
14
+
15
+ def ASTaudio2feature(device, signal, processor, AST, sampling_rate):
16
+ # audio file is decoded on the fly
17
+ inputs = processor(signal, sampling_rate=sampling_rate, return_tensors="pt").to(device)
18
+ with torch.no_grad():
19
+ outputs = AST(**inputs)
20
+
21
+ last_hidden_states = outputs.last_hidden_state[:, 0, :].to("cpu").detach().numpy()
22
+ return last_hidden_states
23
+
24
+
25
+ # 计算两个numpy数组的均值和协方差矩阵
26
+ def calculate_statistics(features):
27
+ mu = np.mean(features, axis=0)
28
+ sigma = np.cov(features, rowvar=False)
29
+ return mu, sigma
30
+
31
+
32
+ # 计算FID
33
+ def calculate_fid(mu1, sigma1, mu2, sigma2, eps=1e-6):
34
+ # 在协方差矩阵对角线上添加一个小的正值
35
+ sigma1 += np.eye(sigma1.shape[0]) * eps
36
+ sigma2 += np.eye(sigma2.shape[0]) * eps
37
+
38
+ ssdiff = np.sum((mu1 - mu2) ** 2.0)
39
+ covmean = sqrtm(sigma1.dot(sigma2))
40
+
41
+ # 由于数值问题,有时可能会得到复数,只取实部
42
+ if np.iscomplexobj(covmean):
43
+ covmean = covmean.real
44
+
45
+ fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
46
+ return fid
47
+
48
+
49
+ # 计算FID
50
+ def calculate_fid_dict(dict1, dict2, eps=1e-6):
51
+ # 在协方差矩阵对角线上添加一个小的正值
52
+ mu1, sigma1 = dict1["mu"], dict1["sigma"]
53
+ mu2, sigma2 = dict2["mu"], dict2["sigma"]
54
+ sigma1 += np.eye(sigma1.shape[0]) * eps
55
+ sigma2 += np.eye(sigma2.shape[0]) * eps
56
+
57
+ ssdiff = np.sum((mu1 - mu2) ** 2.0)
58
+ covmean = sqrtm(sigma1.dot(sigma2))
59
+
60
+ # 由于数值问题,有时可能会得到复数,只取实部
61
+ if np.iscomplexobj(covmean):
62
+ covmean = covmean.real
63
+
64
+ fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
65
+ return fid
66
+
67
+
68
+ # Todo: AudioLDM
69
+ # def generate_features_with_AudioLDM_and_AST(device, processor, AST, AudioLDM_signals_directory_path, return_feature=False):
70
+
71
+ # diffuSynth_features = []
72
+
73
+ # # Step 1: Load all wav files in AudioLDM_signals_directory_path
74
+ # AudioLDM_signals = []
75
+ # signal_lengths = set()
76
+
77
+ # for file_name in os.listdir(AudioLDM_signals_directory_path):
78
+ # if file_name.endswith('.wav'):
79
+ # file_path = os.path.join(AudioLDM_signals_directory_path, file_name)
80
+ # signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000
81
+ # # Normalize
82
+ # AudioLDM_signals.append(rms_normalize(signal))
83
+ # signal_lengths.add(len(signal))
84
+
85
+ # # Step 2: Check if all signals have the same length
86
+ # if len(signal_lengths) != 1:
87
+ # raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.")
88
+
89
+ # # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length]
90
+ # batch_size = 8
91
+ # signal_length = signal_lengths.pop() # All lengths are the same, get one of them
92
+
93
+ # # Create batches
94
+ # signal_batches = [AudioLDM_signals[i:i + batch_size] for i in range(0, len(AudioLDM_signals), batch_size)]
95
+
96
+ # for signal_batch in tqdm(signal_batches):
97
+
98
+ # features = ASTaudio2feature(device, signal_batch, processor, AST, sampling_rate=16000)
99
+ # diffuSynth_features.extend(features)
100
+
101
+ # if return_feature:
102
+ # return diffuSynth_features
103
+ # else:
104
+ # mu, sigma = calculate_statistics(diffuSynth_features)
105
+ # return {"mu": mu, "sigma": sigma}
106
+
107
+ def generate_features_with_AudioLDM_and_AST(device, processor, AST, AudioLDM_signals_directory_path, return_feature=False):
108
+
109
+ diffuSynth_features = []
110
+
111
+ # Step 1: Load all wav files in AudioLDM_signals_directory_path
112
+ AudioLDM_signals = []
113
+ signal_lengths = set()
114
+ target_length = 4 * 16000 # 4 seconds * 16000 samples per second
115
+
116
+ for file_name in os.listdir(AudioLDM_signals_directory_path):
117
+ if file_name.endswith('.wav') and not file_name.startswith('._'):
118
+ file_path = os.path.join(AudioLDM_signals_directory_path, file_name)
119
+ try:
120
+ signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000
121
+ if len(signal) >= target_length:
122
+ signal = signal[:target_length] # Take only the first 4 seconds
123
+ else:
124
+ raise ValueError(f"The file {file_name} is shorter than 4 seconds.")
125
+ # Normalize
126
+ AudioLDM_signals.append(rms_normalize(signal))
127
+ signal_lengths.add(len(signal))
128
+ except Exception as e:
129
+ print(f"Error loading {file_name}: {e}")
130
+
131
+ # Step 2: Check if all signals have the same length
132
+ if len(signal_lengths) != 1:
133
+ raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.")
134
+
135
+ # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length]
136
+ batch_size = 8
137
+ signal_length = signal_lengths.pop() # All lengths are the same, get one of them
138
+
139
+ # Create batches
140
+ signal_batches = [AudioLDM_signals[i:i + batch_size] for i in range(0, len(AudioLDM_signals), batch_size)]
141
+
142
+ for signal_batch in tqdm(signal_batches):
143
+ features = ASTaudio2feature(device, signal_batch, processor, AST, sampling_rate=16000)
144
+ diffuSynth_features.extend(features)
145
+
146
+ if return_feature:
147
+ return diffuSynth_features
148
+ else:
149
+ mu, sigma = calculate_statistics(diffuSynth_features)
150
+ return {"mu": mu, "sigma": sigma}
151
+
152
+
153
+
154
+
155
+ def generate_features_with_diffuSynth_and_AST(device, uNet, VAE, mmm, CLAP_tokenizer, processor, AST, num_batches,
156
+ positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms", return_feature=False):
157
+ diffuSynth_features = []
158
+
159
+ if task == "spectrograms":
160
+ pipe = sample_pipeline
161
+ elif task == "STFT":
162
+ pipe = sample_pipeline_STFT
163
+ else:
164
+ raise NotImplementedError
165
+
166
+ for _ in tqdm(range(num_batches)):
167
+ quantized_latent_representations, reconstruction_batch, signals = pipe(device, uNet, VAE, mmm,
168
+ CLAP_tokenizer,
169
+ positive_prompts=positive_prompts,
170
+ negative_prompts=negative_prompts,
171
+ batchsize=8,
172
+ sample_steps=sample_steps,
173
+ CFG=CFG, seed=None,
174
+ return_latent=False)
175
+
176
+ features = ASTaudio2feature(device, signals, processor, AST, sampling_rate=16000)
177
+ diffuSynth_features.extend(features)
178
+
179
+ if return_feature:
180
+ return diffuSynth_features
181
+ else:
182
+ mu, sigma = calculate_statistics(diffuSynth_features)
183
+ return {"mu": mu, "sigma": sigma}
184
+
185
+
186
+ def generate_features_with_GAN_and_AST(device, gan_generator, VAE, mmm, CLAP_tokenizer, processor, AST, num_batches,
187
+ positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms", return_feature=False):
188
+ diffuSynth_features = []
189
+
190
+ if task == "spectrograms":
191
+ pipe = sample_pipeline_GAN
192
+ elif task == "STFT":
193
+ pipe = sample_pipeline_GAN_STFT
194
+ else:
195
+ raise NotImplementedError
196
+
197
+ for _ in tqdm(range(num_batches)):
198
+ quantized_latent_representations, reconstruction_batch, signals = pipe(device, gan_generator, VAE, mmm,
199
+ CLAP_tokenizer,
200
+ positive_prompts=positive_prompts,
201
+ negative_prompts=negative_prompts,
202
+ batchsize=8,
203
+ sample_steps=sample_steps,
204
+ CFG=CFG, seed=None,
205
+ return_latent=False)
206
+
207
+ features = ASTaudio2feature(device, signals, processor, AST, sampling_rate=16000)
208
+ diffuSynth_features.extend(features)
209
+
210
+ if return_feature:
211
+ return diffuSynth_features
212
+ else:
213
+ mu, sigma = calculate_statistics(diffuSynth_features)
214
+ return {"mu": mu, "sigma": sigma}
215
+
216
+
217
+ def get_FD(train_features, device, uNet, VAE, mmm, CLAP_tokenizer, processor, AST, num_batches, positive_prompts,
218
+ negative_prompts="", CFG=1, sample_steps=10):
219
+ diffuSynth_features = generate_features_with_diffuSynth_and_AST(device, uNet, VAE, mmm, CLAP_tokenizer, processor,
220
+ AST, num_batches, positive_prompts,
221
+ negative_prompts=negative_prompts, CFG=CFG,
222
+ sample_steps=sample_steps)
223
+
224
+ mu_real, sigma_real = calculate_statistics(train_features)
225
+ mu_gen, sigma_gen = calculate_statistics(diffuSynth_features)
226
+
227
+ fid_score = calculate_fid(mu_real, sigma_real, mu_gen, sigma_gen)
228
+ print('FID score:', fid_score)
229
+
230
+
231
+ def get_fid_score(feature1, features2):
232
+ mu_real, sigma_real = calculate_statistics(feature1)
233
+ mu_gen, sigma_gen = calculate_statistics(features2)
234
+
235
+ fid_score = calculate_fid(mu_real, sigma_real, mu_gen, sigma_gen)
236
+ # print('FID score:', fid_score)
237
+ return fid_score
238
+
239
+
240
+ def calculate_fid_matrix(features_list_1, features_list_2, get_fid_score):
241
+ # 初始化一个矩阵来存储FID分数
242
+ # 矩阵的大小为 len(features_list_1) x len(features_list_2)
243
+ fid_scores = [[0 for _ in range(len(features_list_2))] for _ in range(len(features_list_1))]
244
+
245
+ # 遍历两个列表,并计算每一对特征集合的FID分数
246
+ for i, feature1 in enumerate(features_list_1):
247
+ for j, feature2 in enumerate(features_list_2):
248
+ fid_scores[i][j] = get_fid_score(feature1, feature2)
249
+
250
+ return fid_scores
251
+
252
+
253
+ def save_AST_feature(key, mu, sigma, path='results/AST_metric/pre_calculated_features/AST_features.json'):
254
+ # 尝试打开并读取现有的JSON文件
255
+ try:
256
+ with open(path, 'r') as file:
257
+ data = json.load(file)
258
+ except FileNotFoundError:
259
+ # 如果文件不存在,创建一个新的字典
260
+ data = {}
261
+
262
+ if isinstance(mu, np.ndarray):
263
+ mu = mu.tolist()
264
+ if isinstance(sigma, np.ndarray):
265
+ sigma = sigma.tolist()
266
+
267
+ # 添加新数据
268
+ data[key] = {"mu": mu, "sigma": sigma}
269
+
270
+ # 将更新后的数据写回文件
271
+ with open(path, 'w') as file:
272
+ json.dump(data, file, indent=4)
273
+
274
+
275
+ def read_AST_features(path='results/AST_metric/pre_calculated_features/AST_features.json'):
276
+ try:
277
+ # 尝试打开并读取JSON文件
278
+ with open(path, 'r') as file:
279
+ AST_features = json.load(file)
280
+
281
+ for AST_feature_name in AST_features.keys():
282
+ AST_features[AST_feature_name]["mu"] = np.array(AST_features[AST_feature_name]["mu"])
283
+ AST_features[AST_feature_name]["sigma"] = np.array(AST_features[AST_feature_name]["sigma"])
284
+
285
+ return AST_features
286
+ except FileNotFoundError:
287
+ # 如果文件不存在,返回一个空字典
288
+ print(f"文件 {path} 未找到.")
289
+ return {}
290
+ except json.JSONDecodeError:
291
+ # 如果文件不是有效的JSON,返回一个空字典
292
+ print(f"文件 {path} 不是有效的JSON格式.")
293
+ return {}
metrics/IS.py ADDED
@@ -0,0 +1,218 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+
3
+ import librosa
4
+ import numpy as np
5
+ import torch
6
+ from tqdm import tqdm
7
+
8
+ from metrics.pipelines import sample_pipeline, inpaint_pipeline, sample_pipeline_GAN
9
+ from metrics.pipelines_STFT import sample_pipeline_STFT, sample_pipeline_GAN_STFT
10
+ from tools import rms_normalize, pad_STFT, encode_stft
11
+ from webUI.natural_language_guided.utils import InputBatch2Encode_STFT
12
+
13
+ def get_inception_score_for_AudioLDM(device, timbre_encoder, VAE, AudioLDM_signals_directory_path):
14
+ VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
15
+
16
+ diffuSynth_probabilities = []
17
+
18
+ # Step 1: Load all wav files in AudioLDM_signals_directory_path
19
+ AudioLDM_signals = []
20
+ signal_lengths = set()
21
+ target_length = 4 * 16000 # 4 seconds * 16000 samples per second
22
+
23
+ for file_name in os.listdir(AudioLDM_signals_directory_path):
24
+ if file_name.endswith('.wav') and not file_name.startswith('._'):
25
+ file_path = os.path.join(AudioLDM_signals_directory_path, file_name)
26
+ signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000
27
+ if len(signal) >= target_length:
28
+ signal = signal[:target_length] # Take only the first 4 seconds
29
+ else:
30
+ raise ValueError(f"The file {file_name} is shorter than 4 seconds.")
31
+ # Normalize
32
+ AudioLDM_signals.append(rms_normalize(signal))
33
+ signal_lengths.add(len(signal))
34
+
35
+ # Step 2: Check if all signals have the same length
36
+ if len(signal_lengths) != 1:
37
+ raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.")
38
+
39
+ encoded_audios = []
40
+ for origin_audio in AudioLDM_signals:
41
+ D = librosa.stft(origin_audio, n_fft=1024, hop_length=256, win_length=1024)
42
+ padded_D = pad_STFT(D)
43
+ encoded_D = encode_stft(padded_D)
44
+ encoded_audios.append(encoded_D)
45
+ encoded_audios_np = np.array(encoded_audios)
46
+ origin_spectrogram_batch_tensor = torch.from_numpy(encoded_audios_np).float().to(device)
47
+
48
+ # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length]
49
+ batch_size = 8
50
+ num_batches = int(np.ceil(origin_spectrogram_batch_tensor.shape[0] / batch_size))
51
+ spectrogram_batches = []
52
+ for i in range(num_batches):
53
+ batch = origin_spectrogram_batch_tensor[i * batch_size:(i + 1) * batch_size]
54
+ spectrogram_batches.append(batch)
55
+
56
+ for spectrogram_batch in tqdm(spectrogram_batches):
57
+ spectrogram_batch = spectrogram_batch.to(device)
58
+ _, _, _, _, quantized_latent_representations = InputBatch2Encode_STFT(VAE_encoder, spectrogram_batch, quantizer=VAE_quantizer, squared=False)
59
+ quantized_latent_representations = quantized_latent_representations
60
+ feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations)
61
+ probabilities = torch.nn.functional.softmax(instrument_logits, dim=1)
62
+
63
+ diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy())
64
+
65
+ return inception_score(np.array(diffuSynth_probabilities))
66
+
67
+
68
+ # def get_inception_score_for_AudioLDM(device, timbre_encoder, VAE, AudioLDM_signals_directory_path):
69
+ # VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
70
+ #
71
+ # diffuSynth_probabilities = []
72
+ #
73
+ # # Step 1: Load all wav files in AudioLDM_signals_directory_path
74
+ # AudioLDM_signals = []
75
+ # signal_lengths = set()
76
+ #
77
+ # for file_name in os.listdir(AudioLDM_signals_directory_path):
78
+ # if file_name.endswith('.wav'):
79
+ # file_path = os.path.join(AudioLDM_signals_directory_path, file_name)
80
+ # signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000
81
+ # # Normalize
82
+ # AudioLDM_signals.append(rms_normalize(signal))
83
+ # signal_lengths.add(len(signal))
84
+ #
85
+ # # Step 2: Check if all signals have the same length
86
+ # if len(signal_lengths) != 1:
87
+ # raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.")
88
+ #
89
+ # encoded_audios = []
90
+ # for origin_audio in AudioLDM_signals:
91
+ # D = librosa.stft(origin_audio, n_fft=1024, hop_length=256, win_length=1024)
92
+ # padded_D = pad_STFT(D)
93
+ # encoded_D = encode_stft(padded_D)
94
+ # encoded_audios.append(encoded_D)
95
+ # encoded_audios_np = np.array(encoded_audios)
96
+ # origin_spectrogram_batch_tensor = torch.from_numpy(encoded_audios_np).float().to(device)
97
+ #
98
+ #
99
+ # # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length]
100
+ # batch_size = 8
101
+ # num_batches = int(np.ceil(origin_spectrogram_batch_tensor.shape[0] / batch_size))
102
+ # spectrogram_batches = []
103
+ # for i in range(num_batches):
104
+ # batch = origin_spectrogram_batch_tensor[i * batch_size:(i + 1) * batch_size]
105
+ # spectrogram_batches.append(batch)
106
+ #
107
+ #
108
+ # for spectrogram_batch in tqdm(spectrogram_batches):
109
+ # spectrogram_batch = spectrogram_batch.to(device)
110
+ # _, _, _, _, quantized_latent_representations = InputBatch2Encode_STFT(VAE_encoder, spectrogram_batch, quantizer=VAE_quantizer,squared=False)
111
+ # quantized_latent_representations = quantized_latent_representations
112
+ # feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations)
113
+ # probabilities = torch.nn.functional.softmax(instrument_logits, dim=1)
114
+ #
115
+ # diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy())
116
+ #
117
+ # return inception_score(np.array(diffuSynth_probabilities))
118
+
119
+
120
+ def get_inception_score(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms"):
121
+ diffuSynth_probabilities = []
122
+
123
+ if task == "spectrograms":
124
+ pipe = sample_pipeline
125
+ elif task == "STFT":
126
+ pipe = sample_pipeline_STFT
127
+ else:
128
+ raise NotImplementedError
129
+
130
+ for _ in tqdm(range(num_batches)):
131
+ quantized_latent_representations = pipe(device, uNet, VAE, MMM, CLAP_tokenizer,
132
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts,
133
+ batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None)
134
+
135
+ quantized_latent_representations = quantized_latent_representations.to(device)
136
+ feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations)
137
+ probabilities = torch.nn.functional.softmax(instrument_logits, dim=1)
138
+
139
+ diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy())
140
+
141
+ return inception_score(np.array(diffuSynth_probabilities))
142
+
143
+
144
+ def get_inception_score_GAN(device, gan_generator, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms"):
145
+ diffuSynth_probabilities = []
146
+
147
+ if task == "spectrograms":
148
+ pipe = sample_pipeline_GAN
149
+ elif task == "STFT":
150
+ pipe = sample_pipeline_GAN_STFT
151
+ else:
152
+ raise NotImplementedError
153
+
154
+ for _ in tqdm(range(num_batches)):
155
+ quantized_latent_representations = pipe(device, gan_generator, VAE, MMM, CLAP_tokenizer,
156
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts,
157
+ batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None)
158
+
159
+ quantized_latent_representations = quantized_latent_representations.to(device)
160
+ feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations)
161
+ probabilities = torch.nn.functional.softmax(instrument_logits, dim=1)
162
+
163
+ diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy())
164
+
165
+ return inception_score(np.array(diffuSynth_probabilities))
166
+
167
+
168
+ def predict_qualities_with_diffuSynth_sample(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=6, sample_steps=10):
169
+ diffuSynth_qualities = []
170
+ for _ in tqdm(range(num_batches)):
171
+ quantized_latent_representations = sample_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer,
172
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts,
173
+ batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None)
174
+
175
+ quantized_latent_representations = quantized_latent_representations.to(device)
176
+ feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations)
177
+ qualities = qualities.to("cpu").detach().numpy()
178
+ # qualities = np.where(qualities > 0.5, 1, 0)
179
+
180
+ diffuSynth_qualities.extend(qualities)
181
+
182
+ return np.mean(diffuSynth_qualities, axis=0)
183
+
184
+
185
+ def generate_probabilities_with_diffuSynth_inpaint(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, guidance, duration, use_dynamic_mask, noising_strength, positive_prompts, negative_prompts="", CFG=6, sample_steps=10):
186
+
187
+ inpaint_probabilities, signals = [], []
188
+ for _ in tqdm(range(num_batches)):
189
+ quantized_latent_representations, _, rec_signals = inpaint_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer,
190
+ use_dynamic_mask=use_dynamic_mask, noising_strength=noising_strength, guidance=guidance,
191
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None, duration=duration, mask_flexivity=0.999,
192
+ return_latent=False)
193
+
194
+ quantized_latent_representations = quantized_latent_representations.to(device)
195
+ feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations)
196
+ probabilities = torch.nn.functional.softmax(instrument_logits, dim=1)
197
+
198
+ inpaint_probabilities.extend(probabilities.to("cpu").detach().numpy())
199
+ signals.extend(rec_signals)
200
+
201
+ return np.array(inpaint_probabilities), signals
202
+
203
+
204
+ def inception_score(pred):
205
+
206
+ # 计算每个图像的条件概率分布 P(y|x)
207
+ pyx = pred / np.sum(pred, axis=1, keepdims=True)
208
+
209
+ # 计算整个数据集的边缘概率分布 P(y)
210
+ py = np.mean(pyx, axis=0, keepdims=True)
211
+
212
+ # 计算KL散度
213
+ kl_div = pyx * (np.log(pyx + 1e-11) - np.log(py + 1e-11))
214
+
215
+ # 对所有图像求和并平均
216
+ kl_div_sum = np.sum(kl_div, axis=1)
217
+ score = np.exp(np.mean(kl_div_sum))
218
+ return score
metrics/P_C_T.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ from metrics.precision_recall import knn_precision_recall_features
3
+
4
+
5
+ # 生成样本
6
+ real_features = np.random.normal(0, 1, size=(1600, 512))
7
+ generated_features = np.random.normal(0, 1, size=(1600, 512))
8
+
9
+ state = knn_precision_recall_features(real_features, generated_features, nhood_sizes=[1, 2, 3, 4, 5, 10],
10
+ row_batch_size=16, col_batch_size=16)
11
+
12
+ print(state)
metrics/get_reference_AST_features.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import librosa
3
+ import numpy as np
4
+ from tqdm import tqdm
5
+ from metrics.FD import ASTaudio2feature, calculate_statistics, save_AST_feature
6
+ from tools import rms_normalize
7
+ from transformers import AutoProcessor, ASTModel
8
+
9
+ device = "cpu"
10
+ processor = AutoProcessor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
11
+ AST = ASTModel.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593").to(device)
12
+
13
+
14
+ data_split = "train"
15
+ with open(f'data/NSynth/{data_split}_examples.json') as f:
16
+ data = json.load(f)
17
+
18
+ def read_signal(note_str):
19
+ y, sr = librosa.load(f"data/NSynth/nsynth-{data_split}-52/audio/{note_str}.wav", sr=16000)
20
+ if len(y) >= 64000:
21
+ y = y[:64000]
22
+ else:
23
+ y_extend = [0.0] * 64000
24
+ y_extend[:len(y)] = y
25
+ y = y_extend
26
+
27
+ return rms_normalize(y)
28
+
29
+ for quality in ["bright", "dark", "distortion", "fast_decay", "long_release", "multiphonic", "nonlinear_env", "percussive", "reverb", "tempo-synced"]:
30
+ features = []
31
+ for i, (note_str, attributes) in tqdm(enumerate(data.items())):
32
+ if not attributes["pitch"] == 52:
33
+ continue
34
+ if not (quality in attributes['qualities_str']):
35
+ continue
36
+
37
+ signal = read_signal(note_str)
38
+ feature_for_one_signal = ASTaudio2feature(device, [signal], processor, AST, sampling_rate=16000)[0]
39
+ features.append(feature_for_one_signal)
40
+
41
+ mu, sigma = calculate_statistics(features)
42
+ print(np.shape(mu))
43
+ print(np.shape(sigma))
44
+
45
+ save_AST_feature(f'{data_split}_{quality}', mu.tolist(), sigma.tolist())
46
+
47
+ for instrument_name in ["bass", "brass", "flute", "guitar", "keyboard", "mallet", "organ", "reed", "string", "synth_lead", "vocal"]:
48
+ features = []
49
+ for i, (note_str, attributes) in tqdm(enumerate(data.items())):
50
+ if not attributes["pitch"] == 52:
51
+ continue
52
+ if not (attributes["instrument_family_str"] == instrument_name):
53
+ continue
54
+
55
+ signal = read_signal(note_str)
56
+ feature_for_one_signal = ASTaudio2feature(device, [signal], processor, AST, sampling_rate=16000)[0]
57
+ features.append(feature_for_one_signal)
58
+
59
+ mu, sigma = calculate_statistics(features)
60
+ print(np.shape(mu))
61
+ print(np.shape(sigma))
62
+
63
+ save_AST_feature(f'{data_split}_{instrument_name}', mu.tolist(), sigma.tolist())
metrics/pipelines.py ADDED
@@ -0,0 +1,144 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import librosa
2
+ import numpy as np
3
+ import torch
4
+ from tqdm import tqdm
5
+
6
+ from tools import VAE_out_put_to_spc, rms_normalize, nnData2Audio
7
+ from model.DiffSynthSampler import DiffSynthSampler
8
+
9
+ def sample_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer,
10
+ positive_prompts, negative_prompts, batchsize, sample_steps, CFG, seed=None, duration=3.0,
11
+ freq_resolution=512, time_resolution=256, channels=4, VAE_scale=4, timesteps=1000, noise_strategy="repeat", sampler="ddim", return_latent=True):
12
+
13
+ height = int(freq_resolution/VAE_scale)
14
+ width = int(time_resolution/VAE_scale)
15
+ VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
16
+
17
+ text2sound_embedding = \
18
+ MMM.get_text_features(**CLAP_tokenizer([positive_prompts], padding=True, return_tensors="pt"))[0].to(device)
19
+ negative_condition = \
20
+ MMM.get_text_features(**CLAP_tokenizer([negative_prompts], padding=True, return_tensors="pt"))[0].to(device)
21
+
22
+ mySampler = DiffSynthSampler(timesteps, height=height, channels=channels, noise_strategy=noise_strategy, mute=True)
23
+ mySampler.activate_classifier_free_guidance(CFG, negative_condition)
24
+
25
+ mySampler.respace(list(np.linspace(0, timesteps - 1, sample_steps, dtype=np.int32)))
26
+
27
+ condition = text2sound_embedding.repeat(batchsize, 1)
28
+
29
+ latent_representations, initial_noise = \
30
+ mySampler.sample(model=uNet, shape=(batchsize, channels, height, width), seed=seed,
31
+ return_tensor=True, condition=condition, sampler=sampler)
32
+
33
+ latent_representations = latent_representations[-1]
34
+
35
+ quantized_latent_representations, _, (_, _, _) = VAE_quantizer(latent_representations)
36
+
37
+ if return_latent:
38
+ return quantized_latent_representations.detach()
39
+ reconstruction_batch = VAE_decoder(quantized_latent_representations).to("cpu").detach().numpy()
40
+ time_resolution = int(time_resolution * ((duration+1) / 4))
41
+
42
+ rec_signals = nnData2Audio(reconstruction_batch, resolution=(freq_resolution, time_resolution))
43
+ rec_signals = [rms_normalize(rec_signal) for rec_signal in rec_signals]
44
+
45
+ return quantized_latent_representations.detach(), reconstruction_batch, rec_signals
46
+
47
+ def sample_pipeline_GAN(device, gan_generator, VAE, MMM, CLAP_tokenizer,
48
+ positive_prompts, negative_prompts, batchsize, sample_steps, CFG, seed=None, duration=3.0,
49
+ freq_resolution=512, time_resolution=256, channels=4, VAE_scale=4, timesteps=1000, noise_strategy="repeat", sampler="ddim", return_latent=True):
50
+
51
+ height = int(freq_resolution/VAE_scale)
52
+ width = int(time_resolution/VAE_scale)
53
+ VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
54
+
55
+ text2sound_embedding = \
56
+ MMM.get_text_features(**CLAP_tokenizer([positive_prompts], padding=True, return_tensors="pt"))[0].to(device)
57
+
58
+ condition = text2sound_embedding.repeat(batchsize, 1)
59
+
60
+ noise = torch.randn(batchsize, channels, height, width).to(device)
61
+ latent_representations = gan_generator(noise, condition)
62
+
63
+ quantized_latent_representations, _, (_, _, _) = VAE_quantizer(latent_representations)
64
+
65
+ if return_latent:
66
+ return quantized_latent_representations.detach()
67
+ reconstruction_batch = VAE_decoder(quantized_latent_representations).to("cpu").detach().numpy()
68
+ time_resolution = int(time_resolution * ((duration+1) / 4))
69
+
70
+ rec_signals = nnData2Audio(reconstruction_batch, resolution=(freq_resolution, time_resolution))
71
+ rec_signals = [rms_normalize(rec_signal) for rec_signal in rec_signals]
72
+
73
+ return quantized_latent_representations.detach(), reconstruction_batch, rec_signals
74
+
75
+ def inpaint_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer, use_dynamic_mask, noising_strength, guidance,
76
+ positive_prompts, negative_prompts, batchsize, sample_steps, CFG, seed=None, duration=3.0, mask_flexivity=0.99,
77
+ freq_resolution=512, time_resolution=256, channels=4, VAE_scale=4, timesteps=1000, noise_strategy="repeat", sampler="ddim", return_latent=True):
78
+
79
+ height = int(freq_resolution/VAE_scale)
80
+ width = int(time_resolution * ((duration + 1) / 4) / VAE_scale)
81
+ VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
82
+
83
+
84
+ text2sound_embedding = \
85
+ MMM.get_text_features(**CLAP_tokenizer([positive_prompts], padding=True, return_tensors="pt"))[0]
86
+ negative_condition = \
87
+ MMM.get_text_features(**CLAP_tokenizer([negative_prompts], padding=True, return_tensors="pt"))[0]
88
+
89
+
90
+ mySampler = DiffSynthSampler(timesteps, height=height, channels=channels, noise_strategy=noise_strategy, mute=True)
91
+ mySampler.activate_classifier_free_guidance(CFG, negative_condition)
92
+ mySampler.respace(list(np.linspace(0, timesteps - 1, sample_steps, dtype=np.int32)))
93
+
94
+ condition = text2sound_embedding.repeat(batchsize, 1)
95
+ guidance = guidance.repeat(batchsize, 1, 1, 1).to(device)
96
+
97
+ # mask = 1, freeze
98
+ latent_mask = torch.zeros((batchsize, 1, height, width), dtype=torch.float32).to(device)
99
+ latent_mask[:, :, :, -int(time_resolution * (1 / 4) / VAE_scale):] = 1.0
100
+
101
+ latent_representations, initial_noise = \
102
+ mySampler.inpaint_sample(model=uNet, shape=(batchsize, channels, height, width),
103
+ noising_strength=noising_strength,
104
+ guide_img=guidance, mask=latent_mask, return_tensor=True,
105
+ condition=condition, sampler=sampler,
106
+ use_dynamic_mask=use_dynamic_mask,
107
+ end_noise_level_ratio=0.0,
108
+ mask_flexivity=mask_flexivity)
109
+
110
+ latent_representations = latent_representations[-1]
111
+
112
+ quantized_latent_representations, _, (_, _, _) = VAE_quantizer(latent_representations)
113
+
114
+ if return_latent:
115
+ return quantized_latent_representations.detach()
116
+ reconstruction_batch = VAE_decoder(quantized_latent_representations).to("cpu").detach().numpy()
117
+ time_resolution = int(time_resolution * ((duration+1) / 4))
118
+
119
+ rec_signals = nnData2Audio(reconstruction_batch, resolution=(freq_resolution, time_resolution))
120
+ rec_signals = [rms_normalize(rec_signal) for rec_signal in rec_signals]
121
+
122
+ return quantized_latent_representations.detach(), reconstruction_batch, rec_signals
123
+
124
+
125
+ def generate_audios_with_diffuSynth_sample(device, uNet, VAE, MMM, CLAP_tokenizer, num_batches, positive_prompts, negative_prompts="", CFG=6, sample_steps=10):
126
+ diffuSynth_signals = []
127
+ for _ in tqdm(range(num_batches)):
128
+ _, _, signals = sample_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer,
129
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts,
130
+ batchsize=16, sample_steps=sample_steps, CFG=CFG, seed=None, return_latent=False)
131
+ diffuSynth_signals.extend(signals)
132
+ return np.array(diffuSynth_signals)
133
+
134
+
135
+ def generate_audios_with_diffuSynth_inpaint(device, uNet, VAE, MMM, CLAP_tokenizer, num_batches, guidance, duration, use_dynamic_mask, noising_strength, positive_prompts, negative_prompts="", CFG=6, sample_steps=10):
136
+
137
+ diffuSynth_signals = []
138
+ for _ in tqdm(range(num_batches)):
139
+ _, _, signals = inpaint_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer,
140
+ use_dynamic_mask=use_dynamic_mask, noising_strength=noising_strength, guidance=guidance,
141
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=16, sample_steps=sample_steps, CFG=CFG, seed=None, duration=duration, mask_flexivity=0.999,
142
+ return_latent=False)
143
+ diffuSynth_signals.extend(signals)
144
+ return np.array(diffuSynth_signals)
metrics/pipelines_STFT.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import librosa
2
+ import numpy as np
3
+ import torch
4
+ from tqdm import tqdm
5
+
6
+ from tools import rms_normalize, decode_stft, depad_STFT
7
+ from model.DiffSynthSampler import DiffSynthSampler
8
+
9
+ def sample_pipeline_STFT(device, uNet, VAE, MMM, CLAP_tokenizer,
10
+ positive_prompts, negative_prompts, batchsize, sample_steps, CFG, seed=None,
11
+ freq_resolution=512, time_resolution=256, channels=4, VAE_scale=4, timesteps=1000, noise_strategy="repeat", sampler="ddim", return_latent=True):
12
+ "Sample a fix-length audio using a diffusion model, including 'ISTFT+' post-processing."
13
+
14
+ height = int(freq_resolution/VAE_scale)
15
+ width = int(time_resolution/VAE_scale)
16
+ VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
17
+
18
+ text2sound_embedding = \
19
+ MMM.get_text_features(**CLAP_tokenizer([positive_prompts], padding=True, return_tensors="pt"))[0].to(device)
20
+ negative_condition = \
21
+ MMM.get_text_features(**CLAP_tokenizer([negative_prompts], padding=True, return_tensors="pt"))[
22
+ 0].to(device)
23
+
24
+ mySampler = DiffSynthSampler(timesteps, height=height, channels=channels, noise_strategy=noise_strategy, mute=True)
25
+ mySampler.activate_classifier_free_guidance(CFG, negative_condition)
26
+
27
+ mySampler.respace(list(np.linspace(0, timesteps - 1, sample_steps, dtype=np.int32)))
28
+
29
+ condition = text2sound_embedding.repeat(batchsize, 1)
30
+
31
+ latent_representations, initial_noise = \
32
+ mySampler.sample(model=uNet, shape=(batchsize, channels, height, width), seed=seed,
33
+ return_tensor=True, condition=condition, sampler=sampler)
34
+
35
+ latent_representations = latent_representations[-1]
36
+
37
+ quantized_latent_representations, _, (_, _, _) = VAE_quantizer(latent_representations)
38
+
39
+ if return_latent:
40
+ return quantized_latent_representations.detach()
41
+
42
+ reconstruction_batch = VAE_decoder(quantized_latent_representations).to("cpu").detach().numpy()
43
+
44
+ rec_signals = []
45
+
46
+ for index, STFT in enumerate(reconstruction_batch):
47
+ padded_D_rec = decode_stft(STFT)
48
+ D_rec = depad_STFT(padded_D_rec)
49
+ # get_audio
50
+ rec_signal = librosa.istft(D_rec, hop_length=256, win_length=1024)
51
+ rec_signals.append(rms_normalize(rec_signal))
52
+
53
+ return quantized_latent_representations.detach(), reconstruction_batch, rec_signals
54
+
55
+ def sample_pipeline_GAN_STFT(device, gan_generator, VAE, MMM, CLAP_tokenizer,
56
+ positive_prompts, negative_prompts, batchsize, sample_steps, CFG, seed=None,
57
+ freq_resolution=512, time_resolution=256, channels=4, VAE_scale=4, timesteps=1000, noise_strategy="repeat", sampler="ddim", return_latent=True):
58
+ "Sample fix-length audio using a GAN, including 'ISTFT+' post-processing."
59
+
60
+ height = int(freq_resolution/VAE_scale)
61
+ width = int(time_resolution/VAE_scale)
62
+ VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder
63
+
64
+ text2sound_embedding = \
65
+ MMM.get_text_features(**CLAP_tokenizer([positive_prompts], padding=True, return_tensors="pt"))[0].to(device)
66
+
67
+ condition = text2sound_embedding.repeat(batchsize, 1)
68
+
69
+ noise = torch.randn(batchsize, channels, height, width).to(device)
70
+ latent_representations = gan_generator(noise, condition)
71
+
72
+ quantized_latent_representations, _, (_, _, _) = VAE_quantizer(latent_representations)
73
+
74
+ if return_latent:
75
+ return quantized_latent_representations.detach()
76
+ reconstruction_batch = VAE_decoder(quantized_latent_representations).to("cpu").detach().numpy()
77
+
78
+ rec_signals = []
79
+
80
+ for index, STFT in enumerate(reconstruction_batch):
81
+ padded_D_rec = decode_stft(STFT)
82
+ D_rec = depad_STFT(padded_D_rec)
83
+ # get_audio
84
+ rec_signal = librosa.istft(D_rec, hop_length=256, win_length=1024)
85
+ rec_signals.append(rms_normalize(rec_signal))
86
+
87
+ return quantized_latent_representations.detach(), reconstruction_batch, rec_signals
88
+
89
+
90
+ def generate_audios_with_diffuSynth_sample(device, uNet, VAE, MMM, CLAP_tokenizer, num_batches, positive_prompts, negative_prompts="", CFG=6, sample_steps=10):
91
+ "Sample audios using a diffusion model, including 'ISTFT+' post-processing."
92
+
93
+ diffuSynth_signals = []
94
+ for _ in tqdm(range(num_batches)):
95
+ _, _, signals = sample_pipeline_STFT(device, uNet, VAE, MMM, CLAP_tokenizer,
96
+ positive_prompts=positive_prompts, negative_prompts=negative_prompts,
97
+ batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None, return_latent=False)
98
+ diffuSynth_signals.extend(signals)
99
+ return np.array(diffuSynth_signals)
100
+
metrics/precision_recall.py ADDED
@@ -0,0 +1,204 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
2
+ #
3
+ # This work is licensed under the Creative Commons Attribution-NonCommercial
4
+ # 4.0 International License. To view a copy of this license, visit
5
+ # http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to
6
+ # Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
7
+
8
+ """k-NN precision and recall."""
9
+
10
+ from time import time
11
+
12
+
13
+ # ----------------------------------------------------------------------------
14
+
15
+ import numpy as np
16
+ from tqdm import tqdm
17
+
18
+
19
+ def batch_pairwise_distances(U, V):
20
+ """Compute pair-wise distance in a batch of feature."""
21
+
22
+ norm_u = np.sum(np.square(U), axis=1)
23
+ norm_v = np.sum(np.square(V), axis=1)
24
+
25
+ norm_u = np.reshape(norm_u, [-1, 1])
26
+ norm_v = np.reshape(norm_v, [1, -1])
27
+
28
+ D = np.maximum(norm_u - 2 * np.dot(U, V.T) + norm_v, 0.0)
29
+ return D
30
+
31
+
32
+ # ----------------------------------------------------------------------------
33
+
34
+ class DistanceBlock():
35
+ """Compute pair-wise distance in a batch of feature."""
36
+
37
+ def __init__(self, num_features):
38
+ self.num_features = num_features
39
+
40
+ def pairwise_distances(self, U, V):
41
+ return batch_pairwise_distances(U, V)
42
+
43
+
44
+
45
+ # ----------------------------------------------------------------------------
46
+
47
+ class ManifoldEstimator():
48
+ """Estimates the manifold of given feature vectors."""
49
+
50
+ def __init__(self, distance_block, features, row_batch_size=16, col_batch_size=16,
51
+ nhood_sizes=[3], clamp_to_percentile=None, eps=1e-5, mute=False):
52
+ """Estimate the manifold of given feature vectors.
53
+
54
+ Args:
55
+ distance_block: DistanceBlock object that distributes pairwise distance
56
+ calculation to multiple GPUs.
57
+ features (np.array/tf.Tensor): Matrix of feature vectors to estimate their manifold.
58
+ row_batch_size (int): Row batch size to compute pairwise distances
59
+ (parameter to trade-off between memory usage and performance).
60
+ col_batch_size (int): Column batch size to compute pairwise distances.
61
+ nhood_sizes (list): Number of neighbors used to estimate the manifold.
62
+ clamp_to_percentile (float): Prune hyperspheres that have radius larger than
63
+ the given percentile.
64
+ eps (float): Small number for numerical stability.
65
+ """
66
+ num_images = features.shape[0]
67
+ self.nhood_sizes = nhood_sizes
68
+ self.num_nhoods = len(nhood_sizes)
69
+ self.eps = eps
70
+ self.row_batch_size = row_batch_size
71
+ self.col_batch_size = col_batch_size
72
+ self._ref_features = features
73
+ self._distance_block = distance_block
74
+ self.mute = mute
75
+
76
+ # Estimate manifold of features by calculating distances to k-NN of each sample.
77
+ self.D = np.zeros([num_images, self.num_nhoods], dtype=np.float32)
78
+ distance_batch = np.zeros([row_batch_size, num_images], dtype=np.float32)
79
+ seq = np.arange(max(self.nhood_sizes) + 1, dtype=np.int32)
80
+
81
+ if mute:
82
+ for begin1 in range(0, num_images, row_batch_size):
83
+ end1 = min(begin1 + row_batch_size, num_images)
84
+ row_batch = features[begin1:end1]
85
+
86
+ for begin2 in range(0, num_images, col_batch_size):
87
+ end2 = min(begin2 + col_batch_size, num_images)
88
+ col_batch = features[begin2:end2]
89
+
90
+ # Compute distances between batches.
91
+ distance_batch[0:end1 - begin1, begin2:end2] = self._distance_block.pairwise_distances(row_batch,
92
+ col_batch)
93
+
94
+ # Find the k-nearest neighbor from the current batch.
95
+ self.D[begin1:end1, :] = np.partition(distance_batch[0:end1 - begin1, :], seq, axis=1)[:, self.nhood_sizes]
96
+ else:
97
+ for begin1 in tqdm(range(0, num_images, row_batch_size)):
98
+ end1 = min(begin1 + row_batch_size, num_images)
99
+ row_batch = features[begin1:end1]
100
+
101
+ for begin2 in range(0, num_images, col_batch_size):
102
+ end2 = min(begin2 + col_batch_size, num_images)
103
+ col_batch = features[begin2:end2]
104
+
105
+ # Compute distances between batches.
106
+ distance_batch[0:end1 - begin1, begin2:end2] = self._distance_block.pairwise_distances(row_batch,
107
+ col_batch)
108
+
109
+ # Find the k-nearest neighbor from the current batch.
110
+ self.D[begin1:end1, :] = np.partition(distance_batch[0:end1 - begin1, :], seq, axis=1)[:, self.nhood_sizes]
111
+
112
+ if clamp_to_percentile is not None:
113
+ max_distances = np.percentile(self.D, clamp_to_percentile, axis=0)
114
+ self.D[self.D > max_distances] = 0
115
+
116
+ def evaluate(self, eval_features, return_realism=False, return_neighbors=False):
117
+ """Evaluate if new feature vectors are at the manifold."""
118
+ num_eval_images = eval_features.shape[0]
119
+ num_ref_images = self.D.shape[0]
120
+ distance_batch = np.zeros([self.row_batch_size, num_ref_images], dtype=np.float32)
121
+ batch_predictions = np.zeros([num_eval_images, self.num_nhoods], dtype=np.int32)
122
+ max_realism_score = np.zeros([num_eval_images, ], dtype=np.float32)
123
+ nearest_indices = np.zeros([num_eval_images, ], dtype=np.int32)
124
+
125
+ for begin1 in range(0, num_eval_images, self.row_batch_size):
126
+ end1 = min(begin1 + self.row_batch_size, num_eval_images)
127
+ feature_batch = eval_features[begin1:end1]
128
+
129
+ for begin2 in range(0, num_ref_images, self.col_batch_size):
130
+ end2 = min(begin2 + self.col_batch_size, num_ref_images)
131
+ ref_batch = self._ref_features[begin2:end2]
132
+
133
+ distance_batch[0:end1 - begin1, begin2:end2] = self._distance_block.pairwise_distances(feature_batch,
134
+ ref_batch)
135
+
136
+ # From the minibatch of new feature vectors, determine if they are in the estimated manifold.
137
+ # If a feature vector is inside a hypersphere of some reference sample, then
138
+ # the new sample lies at the estimated manifold.
139
+ # The radii of the hyperspheres are determined from distances of neighborhood size k.
140
+ samples_in_manifold = distance_batch[0:end1 - begin1, :, None] <= self.D
141
+ batch_predictions[begin1:end1] = np.any(samples_in_manifold, axis=1).astype(np.int32)
142
+
143
+ max_realism_score[begin1:end1] = np.max(self.D[:, 0] / (distance_batch[0:end1 - begin1, :] + self.eps),
144
+ axis=1)
145
+ nearest_indices[begin1:end1] = np.argmin(distance_batch[0:end1 - begin1, :], axis=1)
146
+
147
+ if return_realism and return_neighbors:
148
+ return batch_predictions, max_realism_score, nearest_indices
149
+ elif return_realism:
150
+ return batch_predictions, max_realism_score
151
+ elif return_neighbors:
152
+ return batch_predictions, nearest_indices
153
+
154
+ return batch_predictions
155
+
156
+
157
+ # ----------------------------------------------------------------------------
158
+
159
+ def knn_precision_recall_features(ref_features, eval_features, nhood_sizes=[3],
160
+ row_batch_size=10000, col_batch_size=50000, mute=False):
161
+ """Calculates k-NN precision and recall for two sets of feature vectors.
162
+
163
+ Args:
164
+ ref_features (np.array/tf.Tensor): Feature vectors of reference images.
165
+ eval_features (np.array/tf.Tensor): Feature vectors of generated images.
166
+ nhood_sizes (list): Number of neighbors used to estimate the manifold.
167
+ row_batch_size (int): Row batch size to compute pairwise distances
168
+ (parameter to trade-off between memory usage and performance).
169
+ col_batch_size (int): Column batch size to compute pairwise distances.
170
+ num_gpus (int): Number of GPUs used to evaluate precision and recall.
171
+
172
+ Returns:
173
+ State (dict): Dict that contains precision and recall calculated from
174
+ ref_features and eval_features.
175
+ """
176
+ state = dict()
177
+ num_images = ref_features.shape[0]
178
+ num_features = ref_features.shape[1]
179
+
180
+ # Initialize DistanceBlock and ManifoldEstimators.
181
+ distance_block = DistanceBlock(num_features)
182
+ ref_manifold = ManifoldEstimator(distance_block, ref_features, row_batch_size, col_batch_size, nhood_sizes, mute=mute)
183
+ eval_manifold = ManifoldEstimator(distance_block, eval_features, row_batch_size, col_batch_size, nhood_sizes, mute=mute)
184
+
185
+ # Evaluate precision and recall using k-nearest neighbors.
186
+ if not mute:
187
+ print('Evaluating k-NN precision and recall with %i samples...' % num_images)
188
+ start = time()
189
+
190
+ # Precision: How many points from eval_features are in ref_features manifold.
191
+ precision = ref_manifold.evaluate(eval_features)
192
+ state['precision'] = precision.mean(axis=0)
193
+
194
+ # Recall: How many points from ref_features are in eval_features manifold.
195
+ recall = eval_manifold.evaluate(ref_features)
196
+ state['recall'] = recall.mean(axis=0)
197
+
198
+ if not mute:
199
+ print('Evaluated k-NN precision and recall in: %gs' % (time() - start))
200
+
201
+ return state
202
+
203
+ # ----------------------------------------------------------------------------
204
+
metrics/visualizations.py ADDED
@@ -0,0 +1,123 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ from matplotlib import pyplot as plt
3
+ from scipy.fft import fft
4
+ from scipy.signal import savgol_filter
5
+ from tools import rms_normalize
6
+
7
+ colors = [
8
+ # (0, 0, 0), # Black
9
+ # (86, 180, 233), # Sky blue
10
+ # (240, 228, 66), # Yellow
11
+ # (204, 121, 167), # Reddish purple
12
+ (213, 94, 0), # Vermilion
13
+ (0, 114, 178), # Blue
14
+ (230, 159, 0), # Orange
15
+ (0, 158, 115), # Bluish green
16
+ ]
17
+
18
+
19
+ def plot_psd_multiple_signals(signals_list, labels_list, sample_rate=16000, window_size=500,
20
+ figsize=(10, 6), save_path=None, normalize=False):
21
+ """
22
+ 在同一张图上绘制多组音频信号的功率谱密度比较图,使用对数刻度的响度轴(以2为底),并应用平滑处理。
23
+
24
+ 参数:
25
+ signals_list: 包含多组音频信号的列表,每组信号形状为 [sample_number, sample_length] 的numpy array
26
+ labels_list: 每组音频信号对应的标签字符串列表
27
+ sample_rate: 音频的采样率
28
+ """
29
+
30
+ # 确保传入的signals_list和labels_list长度相同
31
+ assert len(signals_list) == len(labels_list), "每组信号必须有一个对应的标签。"
32
+
33
+ signals_list = [np.array([rms_normalize(signal) for signal in signals]) for signals in signals_list]
34
+
35
+ # 绘图准备
36
+ plt.figure(figsize=figsize)
37
+
38
+ # 遍历所有的音频信号
39
+ i = 0
40
+ for signal, label in zip(signals_list, labels_list):
41
+ # 计算FFT
42
+ fft_signal = fft(signal, axis=1)
43
+
44
+ # 计算平均功率谱密度
45
+ psd_signal = np.mean(np.abs(fft_signal)**2, axis=0)
46
+
47
+ # 计算频率轴
48
+ freqs = np.fft.fftfreq(signal.shape[1], 1/sample_rate)
49
+
50
+ # 应用Savitzky-Golay滤波器进行平滑
51
+ psd_smoothed = savgol_filter(np.log2(psd_signal[:signal.shape[1] // 2] + 1), window_size, 3) # 窗口大小51, 多项式阶数3
52
+
53
+ # Normalize each curve if normalize is True
54
+ if normalize:
55
+ psd_smoothed /= np.mean(psd_smoothed)
56
+
57
+ # 绘制每组信号的功率谱密度
58
+ plt.plot(freqs[:signal.shape[1] // 2], psd_smoothed, label=label, color=[x/255.0 for x in colors[i % len(colors)]], linewidth=1)
59
+ i += 1
60
+
61
+ # 设置图表元素
62
+ plt.xlabel('Frequency (Hz)')
63
+ plt.ylabel('Mean Log-Amplitude')
64
+ plt.legend()
65
+
66
+ # 根据save_path参数决定保存图像还是直接显示
67
+ if save_path:
68
+ plt.savefig(save_path)
69
+ else:
70
+ plt.show()
71
+
72
+
73
+ def plot_amplitude_over_time(signals_list, labels_list, sample_rate=16000, window_size=500,
74
+ figsize=(10, 6), save_path=None, normalize=False, start_time=0):
75
+ """
76
+ Plot the loudness of multiple sets of audio signals over time on the same graph,
77
+ using a logarithmic scale for the loudness axis (base 2), with smoothing applied.
78
+
79
+ Parameters:
80
+ signals_list: List of sets of audio signals, each set is a numpy array with shape [sample_number, sample_length]
81
+ labels_list: List of labels corresponding to each set of audio signals
82
+ sample_rate: Sampling rate of the audio
83
+ window_size: Window size for the Savitzky-Golay filter
84
+ figsize: Figure size
85
+ save_path: Path to save the figure, if None, the figure will be displayed
86
+ normalize: Whether to normalize each curve so that the sum of each curve is the same
87
+ start_time: Time (in seconds) to start plotting, only data after this time will be retained
88
+ """
89
+ assert len(signals_list) == len(labels_list), f"len(signals_list) != len(labels_list) for " \
90
+ f"len(signals_list) = {len(signals_list)} and len(labels_list) = {len(labels_list)}"
91
+
92
+ # Compute starting sample index
93
+ start_sample = int(start_time * sample_rate)
94
+
95
+ # Normalize signals and truncate data
96
+ signals_list = [np.array([rms_normalize(signal)[start_sample:] for signal in signals]) for signals in signals_list]
97
+ time_axis = np.arange(start_sample, start_sample + signals_list[0].shape[1]) / sample_rate
98
+
99
+ plt.figure(figsize=figsize)
100
+
101
+ i = 0
102
+ for signal, label in zip(signals_list, labels_list):
103
+ amplitude_mean = np.mean(np.abs(signal), axis=0)
104
+
105
+ amplitude_smoothed = savgol_filter(np.log2(amplitude_mean + 1), window_size, 3)
106
+
107
+ # Normalize each curve if normalize is True
108
+ if normalize:
109
+ amplitude_smoothed /= np.mean(amplitude_smoothed)
110
+
111
+ plt.plot(time_axis, amplitude_smoothed, label=label, color=[x/255.0 for x in colors[i % len(colors)]], linewidth=1)
112
+ i += 1
113
+
114
+ plt.xlabel('Time (seconds)')
115
+ plt.ylabel('Mean Log-Amplitude')
116
+ plt.legend()
117
+
118
+ # Save or show the figure based on save_path parameter
119
+ if save_path:
120
+ plt.savefig(save_path)
121
+ else:
122
+ plt.show()
123
+