| | import torch |
| | import librosa |
| | import json5 |
| | from huggingface_hub import hf_hub_download |
| | from transformers import SeamlessM4TFeatureExtractor, Wav2Vec2BertModel |
| | import safetensors |
| | import numpy as np |
| |
|
| | from indextts.utils.maskgct.models.codec.kmeans.repcodec_model import RepCodec |
| | from indextts.utils.maskgct.models.tts.maskgct.maskgct_s2a import MaskGCT_S2A |
| | from indextts.utils.maskgct.models.codec.amphion_codec.codec import CodecEncoder, CodecDecoder |
| | import time |
| |
|
| |
|
| | def _load_config(config_fn, lowercase=False): |
| | """Load configurations into a dictionary |
| | |
| | Args: |
| | config_fn (str): path to configuration file |
| | lowercase (bool, optional): whether changing keys to lower case. Defaults to False. |
| | |
| | Returns: |
| | dict: dictionary that stores configurations |
| | """ |
| | with open(config_fn, "r") as f: |
| | data = f.read() |
| | config_ = json5.loads(data) |
| | if "base_config" in config_: |
| | |
| | p_config_path = os.path.join(os.getenv("WORK_DIR"), config_["base_config"]) |
| | p_config_ = _load_config(p_config_path) |
| | config_ = override_config(p_config_, config_) |
| | if lowercase: |
| | |
| | config_ = get_lowercase_keys_config(config_) |
| | return config_ |
| |
|
| |
|
| | def load_config(config_fn, lowercase=False): |
| | """Load configurations into a dictionary |
| | |
| | Args: |
| | config_fn (str): path to configuration file |
| | lowercase (bool, optional): _description_. Defaults to False. |
| | |
| | Returns: |
| | JsonHParams: an object that stores configurations |
| | """ |
| | config_ = _load_config(config_fn, lowercase=lowercase) |
| | |
| | cfg = JsonHParams(**config_) |
| | return cfg |
| |
|
| |
|
| | class JsonHParams: |
| | def __init__(self, **kwargs): |
| | for k, v in kwargs.items(): |
| | if type(v) == dict: |
| | v = JsonHParams(**v) |
| | self[k] = v |
| |
|
| | def keys(self): |
| | return self.__dict__.keys() |
| |
|
| | def items(self): |
| | return self.__dict__.items() |
| |
|
| | def values(self): |
| | return self.__dict__.values() |
| |
|
| | def __len__(self): |
| | return len(self.__dict__) |
| |
|
| | def __getitem__(self, key): |
| | return getattr(self, key) |
| |
|
| | def __setitem__(self, key, value): |
| | return setattr(self, key, value) |
| |
|
| | def __contains__(self, key): |
| | return key in self.__dict__ |
| |
|
| | def __repr__(self): |
| | return self.__dict__.__repr__() |
| |
|
| |
|
| | def build_semantic_model(path_='./models/tts/maskgct/ckpt/wav2vec2bert_stats.pt'): |
| | semantic_model = Wav2Vec2BertModel.from_pretrained("facebook/w2v-bert-2.0") |
| | semantic_model.eval() |
| | stat_mean_var = torch.load(path_) |
| | semantic_mean = stat_mean_var["mean"] |
| | semantic_std = torch.sqrt(stat_mean_var["var"]) |
| | return semantic_model, semantic_mean, semantic_std |
| |
|
| |
|
| | def build_semantic_codec(cfg): |
| | semantic_codec = RepCodec(cfg=cfg) |
| | semantic_codec.eval() |
| | return semantic_codec |
| |
|
| |
|
| | def build_s2a_model(cfg, device): |
| | soundstorm_model = MaskGCT_S2A(cfg=cfg) |
| | soundstorm_model.eval() |
| | soundstorm_model.to(device) |
| | return soundstorm_model |
| |
|
| |
|
| | def build_acoustic_codec(cfg, device): |
| | codec_encoder = CodecEncoder(cfg=cfg.encoder) |
| | codec_decoder = CodecDecoder(cfg=cfg.decoder) |
| | codec_encoder.eval() |
| | codec_decoder.eval() |
| | codec_encoder.to(device) |
| | codec_decoder.to(device) |
| | return codec_encoder, codec_decoder |
| |
|
| |
|
| | class Inference_Pipeline(): |
| | def __init__( |
| | self, |
| | semantic_model, |
| | semantic_codec, |
| | semantic_mean, |
| | semantic_std, |
| | codec_encoder, |
| | codec_decoder, |
| | s2a_model_1layer, |
| | s2a_model_full, |
| | ): |
| | self.semantic_model = semantic_model |
| | self.semantic_codec = semantic_codec |
| | self.semantic_mean = semantic_mean |
| | self.semantic_std = semantic_std |
| |
|
| | self.codec_encoder = codec_encoder |
| | self.codec_decoder = codec_decoder |
| | self.s2a_model_1layer = s2a_model_1layer |
| | self.s2a_model_full = s2a_model_full |
| |
|
| | @torch.no_grad() |
| | def get_emb(self, input_features, attention_mask): |
| | vq_emb = self.semantic_model( |
| | input_features=input_features, |
| | attention_mask=attention_mask, |
| | output_hidden_states=True, |
| | ) |
| | feat = vq_emb.hidden_states[17] |
| | feat = (feat - self.semantic_mean.to(feat)) / self.semantic_std.to(feat) |
| | return feat |
| |
|
| | @torch.no_grad() |
| | def extract_acoustic_code(self, speech): |
| | vq_emb = self.codec_encoder(speech.unsqueeze(1)) |
| | _, vq, _, _, _ = self.codec_decoder.quantizer(vq_emb) |
| | acoustic_code = vq.permute(1, 2, 0) |
| | return acoustic_code |
| |
|
| | @torch.no_grad() |
| | def get_scode(self, inputs): |
| | semantic_code, feat = self.semantic_codec.quantize(inputs) |
| | |
| | |
| | return semantic_code |
| |
|
| | @torch.no_grad() |
| | def semantic2acoustic( |
| | self, |
| | combine_semantic_code, |
| | acoustic_code, |
| | n_timesteps=[25, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], |
| | cfg=2.5, |
| | rescale_cfg=0.75, |
| | ): |
| | semantic_code = combine_semantic_code |
| |
|
| | cond = self.s2a_model_1layer.cond_emb(semantic_code) |
| | prompt = acoustic_code[:, :, :] |
| | predict_1layer = self.s2a_model_1layer.reverse_diffusion( |
| | cond=cond, |
| | prompt=prompt, |
| | temp=1.5, |
| | filter_thres=0.98, |
| | n_timesteps=n_timesteps[:1], |
| | cfg=cfg, |
| | rescale_cfg=rescale_cfg, |
| | ) |
| |
|
| | cond = self.s2a_model_full.cond_emb(semantic_code) |
| | prompt = acoustic_code[:, :, :] |
| | predict_full = self.s2a_model_full.reverse_diffusion( |
| | cond=cond, |
| | prompt=prompt, |
| | temp=1.5, |
| | filter_thres=0.98, |
| | n_timesteps=n_timesteps, |
| | cfg=cfg, |
| | rescale_cfg=rescale_cfg, |
| | gt_code=predict_1layer, |
| | ) |
| |
|
| | vq_emb = self.codec_decoder.vq2emb( |
| | predict_full.permute(2, 0, 1), n_quantizers=12 |
| | ) |
| | recovered_audio = self.codec_decoder(vq_emb) |
| | prompt_vq_emb = self.codec_decoder.vq2emb( |
| | prompt.permute(2, 0, 1), n_quantizers=12 |
| | ) |
| | recovered_prompt_audio = self.codec_decoder(prompt_vq_emb) |
| | recovered_prompt_audio = recovered_prompt_audio[0][0].cpu().numpy() |
| | recovered_audio = recovered_audio[0][0].cpu().numpy() |
| | combine_audio = np.concatenate([recovered_prompt_audio, recovered_audio]) |
| |
|
| | return combine_audio, recovered_audio |
| |
|
| | def s2a_inference( |
| | self, |
| | prompt_speech_path, |
| | combine_semantic_code, |
| | cfg=2.5, |
| | n_timesteps_s2a=[25, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], |
| | cfg_s2a=2.5, |
| | rescale_cfg_s2a=0.75, |
| | ): |
| | speech = librosa.load(prompt_speech_path, sr=24000)[0] |
| | acoustic_code = self.extract_acoustic_code( |
| | torch.tensor(speech).unsqueeze(0).to(combine_semantic_code.device) |
| | ) |
| | _, recovered_audio = self.semantic2acoustic( |
| | combine_semantic_code, |
| | acoustic_code, |
| | n_timesteps=n_timesteps_s2a, |
| | cfg=cfg_s2a, |
| | rescale_cfg=rescale_cfg_s2a, |
| | ) |
| |
|
| | return recovered_audio |
| |
|
| | @torch.no_grad() |
| | def gt_inference( |
| | self, |
| | prompt_speech_path, |
| | combine_semantic_code, |
| | ): |
| | speech = librosa.load(prompt_speech_path, sr=24000)[0] |
| | ''' |
| | acoustic_code = self.extract_acoustic_code( |
| | torch.tensor(speech).unsqueeze(0).to(combine_semantic_code.device) |
| | ) |
| | prompt = acoustic_code[:, :, :] |
| | prompt_vq_emb = self.codec_decoder.vq2emb( |
| | prompt.permute(2, 0, 1), n_quantizers=12 |
| | ) |
| | ''' |
| |
|
| | prompt_vq_emb = self.codec_encoder(torch.tensor(speech).unsqueeze(0).unsqueeze(1).to(combine_semantic_code.device)) |
| | recovered_prompt_audio = self.codec_decoder(prompt_vq_emb) |
| | recovered_prompt_audio = recovered_prompt_audio[0][0].cpu().numpy() |
| | return recovered_prompt_audio |
| |
|