| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | import os |
| | import librosa |
| | import torch |
| | import numpy as np |
| | from fairseq import checkpoint_utils |
| | from tqdm import tqdm |
| | import torch |
| |
|
| |
|
| | def load_hubert_model(hps): |
| | |
| | ckpt_path = hps.hubert_file |
| | print("Load Hubert Model...") |
| |
|
| | models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( |
| | [ckpt_path], |
| | suffix="", |
| | ) |
| | model = models[0] |
| | model.eval() |
| |
|
| | if torch.cuda.is_available(): |
| | model = model.cuda() |
| |
|
| | return model |
| |
|
| |
|
| | def get_hubert_content(hmodel, wav_16k_tensor): |
| | feats = wav_16k_tensor |
| | if feats.dim() == 2: |
| | feats = feats.mean(-1) |
| | assert feats.dim() == 1, feats.dim() |
| | feats = feats.view(1, -1) |
| | padding_mask = torch.BoolTensor(feats.shape).fill_(False) |
| | inputs = { |
| | "source": feats.to(wav_16k_tensor.device), |
| | "padding_mask": padding_mask.to(wav_16k_tensor.device), |
| | "output_layer": 9, |
| | } |
| | with torch.no_grad(): |
| | logits = hmodel.extract_features(**inputs) |
| | feats = hmodel.final_proj(logits[0]).squeeze(0) |
| |
|
| | return feats |
| |
|
| |
|
| | def content_vector_encoder(model, audio_path, default_sampling_rate=16000): |
| | """ |
| | # content vector default sr: 16000 |
| | """ |
| |
|
| | wav16k, sr = librosa.load(audio_path, sr=default_sampling_rate) |
| | device = next(model.parameters()).device |
| | wav16k = torch.from_numpy(wav16k).to(device) |
| |
|
| | |
| | content_feature = get_hubert_content(model, wav_16k_tensor=wav16k) |
| |
|
| | return content_feature.cpu().detach().numpy() |
| |
|
| |
|
| | def repeat_expand_2d(content, target_len): |
| | """ |
| | content : [hubert_dim(256), src_len] |
| | target: [hubert_dim(256), target_len] |
| | """ |
| | src_len = content.shape[-1] |
| | target = torch.zeros([content.shape[0], target_len], dtype=torch.float).to( |
| | content.device |
| | ) |
| | temp = torch.arange(src_len + 1) * target_len / src_len |
| | current_pos = 0 |
| | for i in range(target_len): |
| | if i < temp[current_pos + 1]: |
| | target[:, i] = content[:, current_pos] |
| | else: |
| | current_pos += 1 |
| | target[:, i] = content[:, current_pos] |
| |
|
| | return target |
| |
|
| |
|
| | def get_mapped_features(raw_content_features, mapping_features): |
| | """ |
| | Content Vector: frameshift = 20ms, hop_size = 480 in 24k |
| | |
| | Now it's only used for mapping to bigvgan's mels (sr = 24k, hop_size = 256, frameshift ~= 10.7 ms) |
| | """ |
| | source_hop = 480 |
| | target_hop = 256 |
| |
|
| | factor = np.gcd(source_hop, target_hop) |
| | source_hop //= factor |
| | target_hop //= factor |
| | print( |
| | "Mapping source's {} frames => target's {} frames".format( |
| | target_hop, source_hop |
| | ) |
| | ) |
| |
|
| | results = [] |
| | for index, mapping_feat in enumerate(tqdm(mapping_features)): |
| | |
| | target_len = len(mapping_feat) |
| |
|
| | |
| | raw_feats = raw_content_features[index][0].cpu().numpy().T |
| | source_len, width = raw_feats.shape |
| |
|
| | |
| | const = source_len * source_hop // target_hop * target_hop |
| |
|
| | |
| | up_sampling_feats = np.repeat(raw_feats, source_hop, axis=0) |
| | |
| | down_sampling_feats = np.average( |
| | up_sampling_feats[:const].reshape(-1, target_hop, width), axis=1 |
| | ) |
| |
|
| | err = abs(target_len - len(down_sampling_feats)) |
| | if err > 3: |
| | print("index:", index) |
| | print("mels:", mapping_feat.shape) |
| | print("raw content vector:", raw_feats.shape) |
| | print("up_sampling:", up_sampling_feats.shape) |
| | print("down_sampling_feats:", down_sampling_feats.shape) |
| | exit() |
| | if len(down_sampling_feats) < target_len: |
| | |
| | end = down_sampling_feats[-1][None, :].repeat(err, axis=0) |
| | down_sampling_feats = np.concatenate([down_sampling_feats, end], axis=0) |
| |
|
| | |
| | feats = down_sampling_feats[:target_len] |
| | results.append(feats) |
| |
|
| | return results |
| |
|
| |
|
| | def extract_hubert_features_of_dataset(datasets, model, out_dir): |
| | for utt in tqdm(datasets): |
| | uid = utt["Uid"] |
| | audio_path = utt["Path"] |
| |
|
| | content_vector_feature = content_vector_encoder(model, audio_path) |
| |
|
| | save_path = os.path.join(out_dir, uid + ".npy") |
| | np.save(save_path, content_vector_feature) |
| |
|