HQ-SVC / utils /models /content_encoder.py
shawnpi's picture
Upload 753 files
1cd928a verified
# import os
# import numpy as np
# import yaml
import torch
# import torch.nn.functional as F
# import resampy
from transformers import HubertModel, Wav2Vec2FeatureExtractor
from fairseq import checkpoint_utils
from encoder.hubert.model import HubertSoft
from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present
from torchaudio.transforms import Resample
# from .unit2control import Unit2Control
# from .core import frequency_filter, upsample, remove_above_fmax, MaskedAvgPool1d, MedianPool1d
# import time
# import librosa
# import torch.nn.functional as F
CREPE_RESAMPLE_KERNEL = {}
F0_KERNEL = {}
class Units_Encoder:
def __init__(self, encoder, encoder_ckpt, encoder_sample_rate = 16000, encoder_hop_size = 320, device = None,
cnhubertsoft_gate=10):
if device is None:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.device = device
is_loaded_encoder = False
if encoder == 'hubertsoft':
self.model = Audio2HubertSoft(encoder_ckpt).to(device)
is_loaded_encoder = True
if encoder == 'hubertbase':
self.model = Audio2HubertBase(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'hubertbase768':
self.model = Audio2HubertBase768(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'hubertbase768l12':
self.model = Audio2HubertBase768L12(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'hubertlarge1024l24':
self.model = Audio2HubertLarge1024L24(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'contentvec':
self.model = Audio2ContentVec(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'contentvec768':
self.model = Audio2ContentVec768(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'contentvec768l12':
self.model = Audio2ContentVec768L12(encoder_ckpt, device=device)
is_loaded_encoder = True
if encoder == 'cnhubertsoftfish':
self.model = CNHubertSoftFish(encoder_ckpt, device=device, gate_size=cnhubertsoft_gate)
is_loaded_encoder = True
if not is_loaded_encoder:
raise ValueError(f" [x] Unknown units encoder: {encoder}")
self.resample_kernel = {}
self.encoder_sample_rate = encoder_sample_rate
self.encoder_hop_size = encoder_hop_size
def encode(self,
audio, # B, T
sample_rate,
hop_size):
# resample
if sample_rate == self.encoder_sample_rate:
audio_res = audio
else:
key_str = str(sample_rate)
if key_str not in self.resample_kernel:
self.resample_kernel[key_str] = Resample(sample_rate, self.encoder_sample_rate, lowpass_filter_width = 128).to(self.device)
audio_res = self.resample_kernel[key_str](audio)
# encode
if audio_res.size(-1) < 400:
audio_res = torch.nn.functional.pad(audio, (0, 400 - audio_res.size(-1)))
units = self.model(audio_res)
# alignment
n_frames = audio.size(-1) // hop_size + 1
ratio = (hop_size / sample_rate) / (self.encoder_hop_size / self.encoder_sample_rate)
index = torch.clamp(torch.round(ratio * torch.arange(n_frames).to(self.device)).long(), max = units.size(1) - 1)
units_aligned = torch.gather(units, 1, index.unsqueeze(0).unsqueeze(-1).repeat([1, 1, units.size(-1)]))
return units_aligned
def batch_encode(self,
audio, # B, T
sample_rate,
hop_size):
units_aligned_batch = []
for i in range(audio.size(0)):
audio
# resample
if sample_rate == self.encoder_sample_rate:
audio_res = audio[i]
else:
key_str = str(sample_rate)
if key_str not in self.resample_kernel:
self.resample_kernel[key_str] = Resample(sample_rate, self.encoder_sample_rate, lowpass_filter_width = 128).to(self.device)
audio_res = self.resample_kernel[key_str](audio[i])
# encode
if audio_res.size(-1) < 400:
audio_res = torch.nn.functional.pad(audio[i], (0, 400 - audio_res.size(-1)))
units = self.model(audio_res)
# alignment
n_frames = audio.size(-1) // hop_size + 1
ratio = (hop_size / sample_rate) / (self.encoder_hop_size / self.encoder_sample_rate)
index = torch.clamp(torch.round(ratio * torch.arange(n_frames).to(self.device)).long(), max = units.size(1) - 1)
units_aligned = torch.gather(units, 1, index.unsqueeze(0).unsqueeze(-1).repeat([1, 1, units.size(-1)]))
units_aligned_batch.append(units_aligned.squeeze(0))
return torch.stack(units_aligned_batch, 0) # from list of tensor to tensor
class Audio2HubertSoft(torch.nn.Module):
def __init__(self, path, h_sample_rate = 16000, h_hop_size = 320):
super().__init__()
print(' [Encoder Model] HuBERT Soft')
self.hubert = HubertSoft()
print(' [Loading] ' + path)
checkpoint = torch.load(path)
consume_prefix_in_state_dict_if_present(checkpoint, "module.")
self.hubert.load_state_dict(checkpoint)
self.hubert.eval()
def forward(self,
audio): # B, T
with torch.inference_mode():
units = self.hubert.units(audio.unsqueeze(1))
return units
class Audio2ContentVec():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] Content Vec')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert.eval()
def __call__(self,
audio): # B, T
# wav_tensor = torch.from_numpy(audio).to(self.device)
wav_tensor = audio
feats = wav_tensor.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).fill_(False)
inputs = {
"source": feats.to(wav_tensor.device),
"padding_mask": padding_mask.to(wav_tensor.device),
"output_layer": 9, # layer 9
}
with torch.no_grad():
logits = self.hubert.extract_features(**inputs)
feats = self.hubert.final_proj(logits[0])
units = feats # .transpose(2, 1)
return units
class Audio2ContentVec768():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] Content Vec')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert.eval()
def __call__(self,
audio): # B, T
# wav_tensor = torch.from_numpy(audio).to(self.device)
wav_tensor = audio
print('wav_tensor.shape: ', wav_tensor.shape)
feats = wav_tensor.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).fill_(False)
inputs = {
"source": feats.to(wav_tensor.device),
"padding_mask": padding_mask.to(wav_tensor.device),
"output_layer": 9, # layer 9
}
with torch.no_grad():
logits = self.hubert.extract_features(**inputs)
feats = logits[0]
units = feats # .transpose(2, 1)
return units
class Audio2ContentVec768L12():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] Content Vec')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert.eval()
def __call__(self,
audio): # B, T
# wav_tensor = torch.from_numpy(audio).to(self.device)
wav_tensor = audio
feats = wav_tensor.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).fill_(False)
inputs = {
"source": feats.to(wav_tensor.device),
"padding_mask": padding_mask.to(wav_tensor.device),
"output_layer": 12, # layer 12
}
with torch.no_grad():
logits = self.hubert.extract_features(**inputs)
feats = logits[0]
units = feats # .transpose(2, 1)
return units
class CNHubertSoftFish(torch.nn.Module):
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu', gate_size=10):
super().__init__()
self.device = device
self.gate_size = gate_size
self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(
"./pretrain/TencentGameMate/chinese-hubert-base")
self.model = HubertModel.from_pretrained("./pretrain/TencentGameMate/chinese-hubert-base")
self.proj = torch.nn.Sequential(torch.nn.Dropout(0.1), torch.nn.Linear(768, 256))
# self.label_embedding = nn.Embedding(128, 256)
state_dict = torch.load(path, map_location=device)
self.load_state_dict(state_dict)
@torch.no_grad()
def forward(self, audio):
input_values = self.feature_extractor(
audio, sampling_rate=16000, return_tensors="pt"
).input_values
input_values = input_values.to(self.model.device)
return self._forward(input_values[0])
@torch.no_grad()
def _forward(self, input_values):
features = self.model(input_values)
features = self.proj(features.last_hidden_state)
# Top-k gating
topk, indices = torch.topk(features, self.gate_size, dim=2)
features = torch.zeros_like(features).scatter(2, indices, topk)
features = features / features.sum(2, keepdim=True)
return features.to(self.device) # .transpose(1, 2)
class Audio2HubertBase():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] HuBERT Base')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert = self.hubert.float()
self.hubert.eval()
def __call__(self,
audio): # B, T
with torch.no_grad():
padding_mask = torch.BoolTensor(audio.shape).fill_(False)
inputs = {
"source": audio.to(self.device),
"padding_mask": padding_mask.to(self.device),
"output_layer": 9, # layer 9
}
logits = self.hubert.extract_features(**inputs)
units = self.hubert.final_proj(logits[0])
return units
class Audio2HubertBase768():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] HuBERT Base')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert = self.hubert.float()
self.hubert.eval()
def __call__(self,
audio): # B, T
with torch.no_grad():
padding_mask = torch.BoolTensor(audio.shape).fill_(False)
inputs = {
"source": audio.to(self.device),
"padding_mask": padding_mask.to(self.device),
"output_layer": 9, # layer 9
}
logits = self.hubert.extract_features(**inputs)
units = logits[0]
return units
class Audio2HubertBase768L12():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] HuBERT Base')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert = self.hubert.float()
self.hubert.eval()
def __call__(self,
audio): # B, T
with torch.no_grad():
padding_mask = torch.BoolTensor(audio.shape).fill_(False)
inputs = {
"source": audio.to(self.device),
"padding_mask": padding_mask.to(self.device),
"output_layer": 12, # layer 12
}
logits = self.hubert.extract_features(**inputs)
units = logits[0]
return units
class Audio2HubertLarge1024L24():
def __init__(self, path, h_sample_rate=16000, h_hop_size=320, device='cpu'):
self.device = device
print(' [Encoder Model] HuBERT Base')
print(' [Loading] ' + path)
self.models, self.saved_cfg, self.task = checkpoint_utils.load_model_ensemble_and_task([path], suffix="", )
self.hubert = self.models[0]
self.hubert = self.hubert.to(self.device)
self.hubert = self.hubert.float()
self.hubert.eval()
def __call__(self,
audio): # B, T
with torch.no_grad():
padding_mask = torch.BoolTensor(audio.shape).fill_(False)
inputs = {
"source": audio.to(self.device),
"padding_mask": padding_mask.to(self.device),
"output_layer": 24, # layer 24
}
logits = self.hubert.extract_features(**inputs)
units = logits[0]
return units