| |
| |
| |
| |
|
|
| import random |
| import torch |
| from torch.nn.utils.rnn import pad_sequence |
| import json |
| import os |
| import numpy as np |
| import librosa |
|
|
| from utils.data_utils import * |
| from processors.acoustic_extractor import cal_normalized_mel, load_mel_extrema |
| from processors.content_extractor import ( |
| ContentvecExtractor, |
| WhisperExtractor, |
| WenetExtractor, |
| ) |
| from models.base.base_dataset import ( |
| BaseOfflineDataset, |
| BaseOfflineCollator, |
| BaseOnlineDataset, |
| BaseOnlineCollator, |
| ) |
| from models.base.new_dataset import BaseTestDataset |
|
|
| EPS = 1.0e-12 |
|
|
|
|
| class SVCOfflineDataset(BaseOfflineDataset): |
| def __init__(self, cfg, dataset, is_valid=False): |
| BaseOfflineDataset.__init__(self, cfg, dataset, is_valid=is_valid) |
|
|
| cfg = self.cfg |
|
|
| if cfg.model.condition_encoder.use_whisper: |
| self.whisper_aligner = WhisperExtractor(self.cfg) |
| self.utt2whisper_path = load_content_feature_path( |
| self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.whisper_dir |
| ) |
|
|
| if cfg.model.condition_encoder.use_contentvec: |
| self.contentvec_aligner = ContentvecExtractor(self.cfg) |
| self.utt2contentVec_path = load_content_feature_path( |
| self.metadata, |
| cfg.preprocess.processed_dir, |
| cfg.preprocess.contentvec_dir, |
| ) |
|
|
| if cfg.model.condition_encoder.use_mert: |
| self.utt2mert_path = load_content_feature_path( |
| self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.mert_dir |
| ) |
| if cfg.model.condition_encoder.use_wenet: |
| self.wenet_aligner = WenetExtractor(self.cfg) |
| self.utt2wenet_path = load_content_feature_path( |
| self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.wenet_dir |
| ) |
|
|
| def __getitem__(self, index): |
| single_feature = BaseOfflineDataset.__getitem__(self, index) |
|
|
| utt_info = self.metadata[index] |
| dataset = utt_info["Dataset"] |
| uid = utt_info["Uid"] |
| utt = "{}_{}".format(dataset, uid) |
|
|
| if self.cfg.model.condition_encoder.use_whisper: |
| assert "target_len" in single_feature.keys() |
| aligned_whisper_feat = ( |
| self.whisper_aligner.offline_resolution_transformation( |
| np.load(self.utt2whisper_path[utt]), single_feature["target_len"] |
| ) |
| ) |
| single_feature["whisper_feat"] = aligned_whisper_feat |
|
|
| if self.cfg.model.condition_encoder.use_contentvec: |
| assert "target_len" in single_feature.keys() |
| aligned_contentvec = ( |
| self.contentvec_aligner.offline_resolution_transformation( |
| np.load(self.utt2contentVec_path[utt]), single_feature["target_len"] |
| ) |
| ) |
| single_feature["contentvec_feat"] = aligned_contentvec |
|
|
| if self.cfg.model.condition_encoder.use_mert: |
| assert "target_len" in single_feature.keys() |
| aligned_mert_feat = align_content_feature_length( |
| np.load(self.utt2mert_path[utt]), |
| single_feature["target_len"], |
| source_hop=self.cfg.preprocess.mert_hop_size, |
| ) |
| single_feature["mert_feat"] = aligned_mert_feat |
|
|
| if self.cfg.model.condition_encoder.use_wenet: |
| assert "target_len" in single_feature.keys() |
| aligned_wenet_feat = self.wenet_aligner.offline_resolution_transformation( |
| np.load(self.utt2wenet_path[utt]), single_feature["target_len"] |
| ) |
| single_feature["wenet_feat"] = aligned_wenet_feat |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| return self.clip_if_too_long(single_feature) |
|
|
| def __len__(self): |
| return len(self.metadata) |
|
|
| def random_select(self, feature_seq_len, max_seq_len, ending_ts=2812): |
| """ |
| ending_ts: to avoid invalid whisper features for over 30s audios |
| 2812 = 30 * 24000 // 256 |
| """ |
| ts = max(feature_seq_len - max_seq_len, 0) |
| ts = min(ts, ending_ts - max_seq_len) |
|
|
| start = random.randint(0, ts) |
| end = start + max_seq_len |
| return start, end |
|
|
| def clip_if_too_long(self, sample, max_seq_len=512): |
| """ |
| sample : |
| { |
| 'spk_id': (1,), |
| 'target_len': int |
| 'mel': (seq_len, dim), |
| 'frame_pitch': (seq_len,) |
| 'frame_energy': (seq_len,) |
| 'content_vector_feat': (seq_len, dim) |
| } |
| """ |
|
|
| if sample["target_len"] <= max_seq_len: |
| return sample |
|
|
| start, end = self.random_select(sample["target_len"], max_seq_len) |
| sample["target_len"] = end - start |
|
|
| for k in sample.keys(): |
| if k == "audio": |
| |
| sample[k] = sample[k][ |
| start |
| * self.cfg.preprocess.hop_size : end |
| * self.cfg.preprocess.hop_size |
| ] |
| elif k == "audio_len": |
| sample[k] = (end - start) * self.cfg.preprocess.hop_size |
| elif k not in ["spk_id", "target_len"]: |
| sample[k] = sample[k][start:end] |
|
|
| return sample |
|
|
|
|
| class SVCOnlineDataset(BaseOnlineDataset): |
| def __init__(self, cfg, dataset, is_valid=False): |
| super().__init__(cfg, dataset, is_valid=is_valid) |
|
|
| |
| self.all_sample_rates = {self.sample_rate} |
| if self.cfg.model.condition_encoder.use_whisper: |
| self.all_sample_rates.add(self.cfg.preprocess.whisper_sample_rate) |
| if self.cfg.model.condition_encoder.use_contentvec: |
| self.all_sample_rates.add(self.cfg.preprocess.contentvec_sample_rate) |
| if self.cfg.model.condition_encoder.use_wenet: |
| self.all_sample_rates.add(self.cfg.preprocess.wenet_sample_rate) |
|
|
| self.highest_sample_rate = max(list(self.all_sample_rates)) |
|
|
| |
| self.max_duration = 6.0 |
| self.max_n_frames = int(self.max_duration * self.highest_sample_rate) |
|
|
| def random_select(self, wav, duration, wav_path): |
| """ |
| wav: (T,) |
| """ |
| if duration <= self.max_duration: |
| return wav |
|
|
| ts_frame = int((duration - self.max_duration) * self.highest_sample_rate) |
| start = random.randint(0, ts_frame) |
| end = start + self.max_n_frames |
|
|
| if (wav[start:end] == 0).all(): |
| print("*" * 20) |
| print("Warning! The wav file {} has a lot of silience.".format(wav_path)) |
|
|
| |
| assert (wav != 0).any() |
| start = np.where(wav != 0)[0][0] |
| end = start + self.max_n_frames |
|
|
| return wav[start:end] |
|
|
| def __getitem__(self, index): |
| """ |
| single_feature: dict, |
| wav: (T,) |
| wav_len: int |
| target_len: int |
| mask: (n_frames, 1) |
| spk_id |
| |
| wav_{sr}: (T,) |
| wav_{sr}_len: int |
| """ |
| single_feature = dict() |
|
|
| utt_item = self.metadata[index] |
| wav_path = utt_item["Path"] |
|
|
| |
| highest_sr_wav, _ = librosa.load(wav_path, sr=self.highest_sample_rate) |
| highest_sr_wav = self.random_select( |
| highest_sr_wav, utt_item["Duration"], wav_path |
| ) |
|
|
| |
| for sr in self.all_sample_rates: |
| |
| if sr != self.highest_sample_rate: |
| wav_sr = librosa.resample( |
| highest_sr_wav, orig_sr=self.highest_sample_rate, target_sr=sr |
| ) |
| else: |
| wav_sr = highest_sr_wav |
|
|
| wav_sr = torch.as_tensor(wav_sr, dtype=torch.float32) |
| single_feature["wav_{}".format(sr)] = wav_sr |
| single_feature["wav_{}_len".format(sr)] = len(wav_sr) |
|
|
| |
| if sr == self.sample_rate: |
| wav_len = len(wav_sr) |
| frame_len = wav_len // self.hop_size |
|
|
| single_feature["wav"] = wav_sr |
| single_feature["wav_len"] = wav_len |
| single_feature["target_len"] = frame_len |
| single_feature["mask"] = torch.ones(frame_len, 1, dtype=torch.long) |
|
|
| |
| if self.cfg.preprocess.use_spkid: |
| utt = "{}_{}".format(utt_item["Dataset"], utt_item["Uid"]) |
| single_feature["spk_id"] = torch.tensor( |
| [self.spk2id[self.utt2spk[utt]]], dtype=torch.int32 |
| ) |
|
|
| return single_feature |
|
|
| def __len__(self): |
| return len(self.metadata) |
|
|
|
|
| class SVCOfflineCollator(BaseOfflineCollator): |
| def __init__(self, cfg): |
| super().__init__(cfg) |
|
|
| def __call__(self, batch): |
| parsed_batch_features = super().__call__(batch) |
| return parsed_batch_features |
|
|
|
|
| class SVCOnlineCollator(BaseOnlineCollator): |
| def __init__(self, cfg): |
| super().__init__(cfg) |
|
|
| def __call__(self, batch): |
| """ |
| SVCOnlineDataset.__getitem__: |
| wav: (T,) |
| wav_len: int |
| target_len: int |
| mask: (n_frames, 1) |
| spk_id: (1) |
| |
| wav_{sr}: (T,) |
| wav_{sr}_len: int |
| |
| Returns: |
| wav: (B, T), torch.float32 |
| wav_len: (B), torch.long |
| target_len: (B), torch.long |
| mask: (B, n_frames, 1), torch.long |
| spk_id: (B, 1), torch.int32 |
| |
| wav_{sr}: (B, T) |
| wav_{sr}_len: (B), torch.long |
| """ |
| packed_batch_features = dict() |
|
|
| for key in batch[0].keys(): |
| if "_len" in key: |
| packed_batch_features[key] = torch.LongTensor([b[key] for b in batch]) |
| else: |
| packed_batch_features[key] = pad_sequence( |
| [b[key] for b in batch], batch_first=True, padding_value=0 |
| ) |
| return packed_batch_features |
|
|
|
|
| class SVCTestDataset(BaseTestDataset): |
| def __init__(self, args, cfg, infer_type): |
| BaseTestDataset.__init__(self, args, cfg, infer_type) |
| self.metadata = self.get_metadata() |
|
|
| target_singer = args.target_singer |
| self.cfg = cfg |
| self.trans_key = args.trans_key |
| assert type(target_singer) == str |
|
|
| self.target_singer = target_singer.split("_")[-1] |
| self.target_dataset = target_singer.replace( |
| "_{}".format(self.target_singer), "" |
| ) |
| if cfg.preprocess.mel_min_max_norm: |
| if self.cfg.preprocess.features_extraction_mode == "online": |
| |
|
|
| |
| self.target_mel_extrema = load_mel_extrema(cfg.preprocess, "vctk") |
| else: |
| self.target_mel_extrema = load_mel_extrema( |
| cfg.preprocess, self.target_dataset |
| ) |
|
|
| self.target_mel_extrema = torch.as_tensor( |
| self.target_mel_extrema[0] |
| ), torch.as_tensor(self.target_mel_extrema[1]) |
|
|
| |
| if cfg.preprocess.use_spkid: |
| spk2id_path = os.path.join(args.acoustics_dir, cfg.preprocess.spk2id) |
| |
|
|
| with open(spk2id_path, "r", encoding="utf-8") as f: |
| self.spk2id = json.load(f) |
| |
|
|
| if cfg.preprocess.use_uv: |
| self.utt2uv_path = { |
| f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( |
| cfg.preprocess.processed_dir, |
| utt_info["Dataset"], |
| cfg.preprocess.uv_dir, |
| utt_info["Uid"] + ".npy", |
| ) |
| for utt_info in self.metadata |
| } |
|
|
| if cfg.preprocess.use_frame_pitch: |
| self.utt2frame_pitch_path = { |
| f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( |
| cfg.preprocess.processed_dir, |
| utt_info["Dataset"], |
| cfg.preprocess.pitch_dir, |
| utt_info["Uid"] + ".npy", |
| ) |
| for utt_info in self.metadata |
| } |
|
|
| |
| target_f0_statistics_path = os.path.join( |
| cfg.preprocess.processed_dir, |
| self.target_dataset, |
| cfg.preprocess.pitch_dir, |
| "statistics.json", |
| ) |
| self.target_pitch_median = json.load( |
| open(target_f0_statistics_path, "r", encoding="utf-8") |
| )[f"{self.target_dataset}_{self.target_singer}"]["voiced_positions"][ |
| "median" |
| ] |
|
|
| |
| if infer_type == "from_file": |
| source_audio_name = cfg.inference.source_audio_name |
| source_f0_statistics_path = os.path.join( |
| cfg.preprocess.processed_dir, |
| source_audio_name, |
| cfg.preprocess.pitch_dir, |
| "statistics.json", |
| ) |
| self.source_pitch_median = json.load( |
| open(source_f0_statistics_path, "r", encoding="utf-8") |
| )[f"{source_audio_name}_{source_audio_name}"]["voiced_positions"][ |
| "median" |
| ] |
| else: |
| self.source_pitch_median = None |
|
|
| if cfg.preprocess.use_frame_energy: |
| self.utt2frame_energy_path = { |
| f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( |
| cfg.preprocess.processed_dir, |
| utt_info["Dataset"], |
| cfg.preprocess.energy_dir, |
| utt_info["Uid"] + ".npy", |
| ) |
| for utt_info in self.metadata |
| } |
|
|
| if cfg.preprocess.use_mel: |
| self.utt2mel_path = { |
| f'{utt_info["Dataset"]}_{utt_info["Uid"]}': os.path.join( |
| cfg.preprocess.processed_dir, |
| utt_info["Dataset"], |
| cfg.preprocess.mel_dir, |
| utt_info["Uid"] + ".npy", |
| ) |
| for utt_info in self.metadata |
| } |
|
|
| |
| if cfg.model.condition_encoder.use_whisper: |
| self.whisper_aligner = WhisperExtractor(cfg) |
| self.utt2whisper_path = load_content_feature_path( |
| self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.whisper_dir |
| ) |
|
|
| if cfg.model.condition_encoder.use_contentvec: |
| self.contentvec_aligner = ContentvecExtractor(cfg) |
| self.utt2contentVec_path = load_content_feature_path( |
| self.metadata, |
| cfg.preprocess.processed_dir, |
| cfg.preprocess.contentvec_dir, |
| ) |
|
|
| if cfg.model.condition_encoder.use_mert: |
| self.utt2mert_path = load_content_feature_path( |
| self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.mert_dir |
| ) |
| if cfg.model.condition_encoder.use_wenet: |
| self.wenet_aligner = WenetExtractor(cfg) |
| self.utt2wenet_path = load_content_feature_path( |
| self.metadata, cfg.preprocess.processed_dir, cfg.preprocess.wenet_dir |
| ) |
|
|
| def __getitem__(self, index): |
| single_feature = {} |
|
|
| utt_info = self.metadata[index] |
| dataset = utt_info["Dataset"] |
| uid = utt_info["Uid"] |
| utt = "{}_{}".format(dataset, uid) |
|
|
| source_dataset = self.metadata[index]["Dataset"] |
|
|
| if self.cfg.preprocess.use_spkid: |
| single_feature["spk_id"] = np.array( |
| [self.spk2id[f"{self.target_dataset}_{self.target_singer}"]], |
| dtype=np.int32, |
| ) |
|
|
| |
| if self.cfg.preprocess.use_mel: |
| mel = np.load(self.utt2mel_path[utt]) |
| assert mel.shape[0] == self.cfg.preprocess.n_mel |
| if self.cfg.preprocess.use_min_max_norm_mel: |
| |
| mel = cal_normalized_mel(mel, source_dataset, self.cfg.preprocess) |
|
|
| if "target_len" not in single_feature.keys(): |
| single_feature["target_len"] = mel.shape[1] |
| single_feature["mel"] = mel.T |
|
|
| if self.cfg.preprocess.use_frame_pitch: |
| frame_pitch_path = self.utt2frame_pitch_path[utt] |
| frame_pitch = np.load(frame_pitch_path) |
|
|
| if self.trans_key: |
| try: |
| self.trans_key = int(self.trans_key) |
| except: |
| pass |
| if type(self.trans_key) == int: |
| frame_pitch = transpose_key(frame_pitch, self.trans_key) |
| elif self.trans_key: |
| assert self.target_singer |
|
|
| frame_pitch = pitch_shift_to_target( |
| frame_pitch, self.target_pitch_median, self.source_pitch_median |
| ) |
|
|
| if "target_len" not in single_feature.keys(): |
| single_feature["target_len"] = len(frame_pitch) |
| aligned_frame_pitch = align_length( |
| frame_pitch, single_feature["target_len"] |
| ) |
| single_feature["frame_pitch"] = aligned_frame_pitch |
|
|
| if self.cfg.preprocess.use_uv: |
| frame_uv_path = self.utt2uv_path[utt] |
| frame_uv = np.load(frame_uv_path) |
| aligned_frame_uv = align_length(frame_uv, single_feature["target_len"]) |
| aligned_frame_uv = [ |
| 0 if frame_uv else 1 for frame_uv in aligned_frame_uv |
| ] |
| aligned_frame_uv = np.array(aligned_frame_uv) |
| single_feature["frame_uv"] = aligned_frame_uv |
|
|
| if self.cfg.preprocess.use_frame_energy: |
| frame_energy_path = self.utt2frame_energy_path[utt] |
| frame_energy = np.load(frame_energy_path) |
| if "target_len" not in single_feature.keys(): |
| single_feature["target_len"] = len(frame_energy) |
| aligned_frame_energy = align_length( |
| frame_energy, single_feature["target_len"] |
| ) |
| single_feature["frame_energy"] = aligned_frame_energy |
|
|
| |
| if self.cfg.model.condition_encoder.use_whisper: |
| assert "target_len" in single_feature.keys() |
| aligned_whisper_feat = ( |
| self.whisper_aligner.offline_resolution_transformation( |
| np.load(self.utt2whisper_path[utt]), single_feature["target_len"] |
| ) |
| ) |
| single_feature["whisper_feat"] = aligned_whisper_feat |
|
|
| if self.cfg.model.condition_encoder.use_contentvec: |
| assert "target_len" in single_feature.keys() |
| aligned_contentvec = ( |
| self.contentvec_aligner.offline_resolution_transformation( |
| np.load(self.utt2contentVec_path[utt]), single_feature["target_len"] |
| ) |
| ) |
| single_feature["contentvec_feat"] = aligned_contentvec |
|
|
| if self.cfg.model.condition_encoder.use_mert: |
| assert "target_len" in single_feature.keys() |
| aligned_mert_feat = align_content_feature_length( |
| np.load(self.utt2mert_path[utt]), |
| single_feature["target_len"], |
| source_hop=self.cfg.preprocess.mert_hop_size, |
| ) |
| single_feature["mert_feat"] = aligned_mert_feat |
|
|
| if self.cfg.model.condition_encoder.use_wenet: |
| assert "target_len" in single_feature.keys() |
| aligned_wenet_feat = self.wenet_aligner.offline_resolution_transformation( |
| np.load(self.utt2wenet_path[utt]), single_feature["target_len"] |
| ) |
| single_feature["wenet_feat"] = aligned_wenet_feat |
|
|
| return single_feature |
|
|
| def __len__(self): |
| return len(self.metadata) |
|
|
|
|
| class SVCTestCollator: |
| """Zero-pads model inputs and targets based on number of frames per step""" |
|
|
| def __init__(self, cfg): |
| self.cfg = cfg |
|
|
| def __call__(self, batch): |
| packed_batch_features = dict() |
|
|
| |
| |
| |
| |
| |
|
|
| for key in batch[0].keys(): |
| if key == "target_len": |
| packed_batch_features["target_len"] = torch.LongTensor( |
| [b["target_len"] for b in batch] |
| ) |
| masks = [ |
| torch.ones((b["target_len"], 1), dtype=torch.long) for b in batch |
| ] |
| packed_batch_features["mask"] = pad_sequence( |
| masks, batch_first=True, padding_value=0 |
| ) |
| else: |
| values = [torch.from_numpy(b[key]) for b in batch] |
| packed_batch_features[key] = pad_sequence( |
| values, batch_first=True, padding_value=0 |
| ) |
|
|
| return packed_batch_features |
|
|