# import os # import numpy as np # import yaml import torch # import torch.nn.functional as F # import resampy from transformers import HubertModel, Wav2Vec2FeatureExtractor from fairseq import checkpoint_utils from encoder.hubert.model import HubertSoft from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present from torchaudio.transforms import Resample # from .unit2control import Unit2Control # from .core import frequency_filter, upsample, remove_above_fmax, MaskedAvgPool1d, MedianPool1d # import time # import librosa # import torch.nn.functional as F CREPE_RESAMPLE_KERNEL = {} F0_KERNEL = {} class Units_Encoder: def __init__(self, encoder, encoder_ckpt, encoder_sample_rate = 16000, encoder_hop_size = 320, device = None, cnhubertsoft_gate=10): if device is None: device = 'cuda' if torch.cuda.is_available() else 'cpu' self.device = device is_loaded_encoder = False if encoder == 'hubertsoft': self.model = Audio2HubertSoft(encoder_ckpt).to(device) is_loaded_encoder = True if encoder == 'hubertbase': self.model = Audio2HubertBase(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'hubertbase768': self.model = Audio2HubertBase768(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'hubertbase768l12': self.model = Audio2HubertBase768L12(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'hubertlarge1024l24': self.model = Audio2HubertLarge1024L24(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'contentvec': self.model = Audio2ContentVec(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'contentvec768': self.model = Audio2ContentVec768(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'contentvec768l12': self.model = Audio2ContentVec768L12(encoder_ckpt, device=device) is_loaded_encoder = True if encoder == 'cnhubertsoftfish': self.model = CNHubertSoftFish(encoder_ckpt, device=device, gate_size=cnhubertsoft_gate) is_loaded_encoder = True if not is_loaded_encoder: raise ValueError(f" [x] Unknown units encoder: {encoder}") self.resample_kernel = {} self.encoder_sample_rate = encoder_sample_rate self.encoder_hop_size = encoder_hop_size def encode(self, audio, # B, T sample_rate, hop_size): # resample if sample_rate == self.encoder_sample_rate: audio_res = audio else: key_str = str(sample_rate) if key_str not in self.resample_kernel: self.resample_kernel[key_str] = Resample(sample_rate, self.encoder_sample_rate, lowpass_filter_width = 128).to(self.device) audio_res = self.resample_kernel[key_str](audio) # encode if audio_res.size(-1) < 400: audio_res = torch.nn.functional.pad(audio, (0, 400 - audio_res.size(-1))) units = self.model(audio_res) # alignment n_frames = audio.size(-1) // hop_size + 1 ratio = (hop_size / sample_rate) / (self.encoder_hop_size / self.encoder_sample_rate) index = torch.clamp(torch.round(ratio * torch.arange(n_frames).to(self.device)).long(), max = units.size(1) - 1) units_aligned = torch.gather(units, 1, index.unsqueeze(0).unsqueeze(-1).repeat([1, 1, units.size(-1)])) return units_aligned def batch_encode(self, audio, # B, T sample_rate, hop_size): units_aligned_batch = [] for i in range(audio.size(0)): audio # resample if sample_rate == self.encoder_sample_rate: audio_res = audio[i] else: key_str = str(sample_rate) if key_str not in self.resample_kernel: self.resample_kernel[key_str] = Resample(sample_rate, self.encoder_sample_rate, lowpass_filter_width = 128).to(self.device) audio_res = self.resample_kernel[key_str](audio[i]) # encode if audio_res.size(-1) < 400: audio_res = torch.nn.functional.pad(audio[i], (0, 400 - audio_res.size(-1))) units = self.model(audio_res) # alignment n_frames = audio.size(-1) // hop_size + 1 ratio = (hop_size / sample_rate) / (self.encoder_hop_size / self.encoder_sample_rate) index = torch.clamp(torch.round(ratio * torch.arange(n_frames).to(self.device)).long(), max = units.size(1) - 1) units_aligned = torch.gather(units, 1, index.unsqueeze(0).unsqueeze(-1).repeat([1, 1, units.size(-1)])) units_aligned_batch.append(units_aligned.squeeze(0)) return torch.stack(units_aligned_batch, 0) # from list of tensor to tensor class Audio2HubertSoft(torch.nn.Module): def __init__(self, path, h_sample_rate = 16000, h_hop_size = 320): super().__init__() print(' [Encoder Model] HuBERT Soft') self.hubert = HubertSoft() print(' [Loading] ' + path) checkpoint = torch.load(path) consume_prefix_in_state_dict_if_present(checkpoint, "module.") self.hubert.load_state_dict(checkpoint) self.hubert.eval() def forward(self, audio): # B, T with torch.inference_mode(): units = self.hubert.units(audio.unsqueeze(1)) return units class Audio2ContentVec(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] Content Vec') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert.eval() def __call__(self, audio): # B, T # wav_tensor = torch.from_numpy(audio).to(self.device) wav_tensor = audio feats = wav_tensor.view(1, -1) padding_mask = torch.BoolTensor(feats.shape).fill_(False) inputs = { "source": feats.to(wav_tensor.device), "padding_mask": padding_mask.to(wav_tensor.device), "output_layer": 9, # layer 9 } with torch.no_grad(): logits = self.hubert.extract_features(**inputs) feats = self.hubert.final_proj(logits[0]) units = feats # .transpose(2, 1) return units class Audio2ContentVec768(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] Content Vec') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert.eval() def __call__(self, audio): # B, T # wav_tensor = torch.from_numpy(audio).to(self.device) wav_tensor = audio print('wav_tensor.shape: ', wav_tensor.shape) feats = wav_tensor.view(1, -1) padding_mask = torch.BoolTensor(feats.shape).fill_(False) inputs = { "source": feats.to(wav_tensor.device), "padding_mask": padding_mask.to(wav_tensor.device), "output_layer": 9, # layer 9 } with torch.no_grad(): logits = self.hubert.extract_features(**inputs) feats = logits[0] units = feats # .transpose(2, 1) return units class Audio2ContentVec768L12(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] Content Vec') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert.eval() def __call__(self, audio): # B, T # wav_tensor = torch.from_numpy(audio).to(self.device) wav_tensor = audio feats = wav_tensor.view(1, -1) padding_mask = torch.BoolTensor(feats.shape).fill_(False) inputs = { "source": feats.to(wav_tensor.device), "padding_mask": padding_mask.to(wav_tensor.device), "output_layer": 12, # layer 12 } with torch.no_grad(): logits = self.hubert.extract_features(**inputs) feats = logits[0] units = feats # .transpose(2, 1) return units class CNHubertSoftFish(torch.nn.Module): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu', gate_size=10): super().__init__() self.device = device self.gate_size = gate_size self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained( "./pretrain/TencentGameMate/chinese-hubert-base") self.model = HubertModel.from_pretrained("./pretrain/TencentGameMate/chinese-hubert-base") self.proj = torch.nn.Sequential(torch.nn.Dropout(0.1), torch.nn.Linear(768, 256)) # self.label_embedding = nn.Embedding(128, 256) state_dict = torch.load(path, map_location=device) self.load_state_dict(state_dict) @torch.no_grad() def forward(self, audio): input_values = self.feature_extractor( audio, sampling_rate=16000, return_tensors="pt" ).input_values input_values = input_values.to(self.model.device) return self._forward(input_values[0]) @torch.no_grad() def _forward(self, input_values): features = self.model(input_values) features = self.proj(features.last_hidden_state) # Top-k gating topk, indices = torch.topk(features, self.gate_size, dim=2) features = torch.zeros_like(features).scatter(2, indices, topk) features = features / features.sum(2, keepdim=True) return features.to(self.device) # .transpose(1, 2) class Audio2HubertBase(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] HuBERT Base') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert = self.hubert.float() self.hubert.eval() def __call__(self, audio): # B, T with torch.no_grad(): padding_mask = torch.BoolTensor(audio.shape).fill_(False) inputs = { "source": audio.to(self.device), "padding_mask": padding_mask.to(self.device), "output_layer": 9, # layer 9 } logits = self.hubert.extract_features(**inputs) units = self.hubert.final_proj(logits[0]) return units class Audio2HubertBase768(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] HuBERT Base') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert = self.hubert.float() self.hubert.eval() def __call__(self, audio): # B, T with torch.no_grad(): padding_mask = torch.BoolTensor(audio.shape).fill_(False) inputs = { "source": audio.to(self.device), "padding_mask": padding_mask.to(self.device), "output_layer": 9, # layer 9 } logits = self.hubert.extract_features(**inputs) units = logits[0] return units class Audio2HubertBase768L12(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] HuBERT Base') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert = self.hubert.float() self.hubert.eval() def __call__(self, audio): # B, T with torch.no_grad(): padding_mask = torch.BoolTensor(audio.shape).fill_(False) inputs = { "source": audio.to(self.device), "padding_mask": padding_mask.to(self.device), "output_layer": 12, # layer 12 } logits = self.hubert.extract_features(**inputs) units = logits[0] return units class Audio2HubertLarge1024L24(): def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'): self.device = device print(' [Encoder Model] HuBERT Base') print(' [Loading] ' + path) self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", ) self.hubert = self.models[0] self.hubert = self.hubert.to(self.device) self.hubert = self.hubert.float() self.hubert.eval() def __call__(self, audio): # B, T with torch.no_grad(): padding_mask = torch.BoolTensor(audio.shape).fill_(False) inputs = { "source": audio.to(self.device), "padding_mask": padding_mask.to(self.device), "output_layer": 24, # layer 24 } logits = self.hubert.extract_features(**inputs) units = logits[0] return units