HQ-SVC / utils /data_preprocessing.py
shawnpi's picture
Update utils/data_preprocessing.py
96019b0 verified
# support audio dataset with text prompt
import os
import librosa
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import sys
from huggingface_hub import hf_hub_download
sys.path.append(os.path.join(os.path.dirname(__file__), '../utils'))
from ddsp.vocoder import F0_Extractor, Volume_Extractor
import torch
from typing import Union
from torch.nn import functional as F
from slicer import Slicer
from transformers import AutoTokenizer, AutoModel
# from ThreeD_Speaker.speakerlab.bin.get_spk_sim import build_model, get_spk_emb, get_spk_emb_t
def edge_padding(f0):
f0_padded = f0.copy()
# Loop through the array, checking for boundaries (zero values)
for i in range(1, len(f0) - 1):
if f0[i] != 0:
# If boundary found, pad the previous frame (if not the first frame)
if f0[i-1] == 0:
f0_padded[i-1] = f0[i]
# Pad the next frame (if not the last frame)
if f0[i+1] == 0:
f0_padded[i+1] = f0[i]
return f0_padded
def split(audio, sample_rate, hop_size, db_thresh = -40, min_len = 5000):
slnpicer = Slicer(
sr=sample_rate,
threshold=db_thresh,
min_length=min_len)
chunks = dict(slicer.slice(audio))
result = []
for k, v in chunks.items():
tag = v["split_time"].split(",")
if tag[0] != tag[1]:
start_frame = int(int(tag[0]) // hop_size)
end_frame = int(int(tag[1]) // hop_size)
if end_frame > start_frame:
result.append((
start_frame,
audio[int(start_frame * hop_size) : int(end_frame * hop_size)]))
return result
def wav_pad(wav, multiple=200):
seq_len = wav.shape[0]
padded_len = ((seq_len + (multiple-1)) // multiple) * multiple
padded_wav = repeat_expand(wav, padded_len)
return padded_wav
def repeat_expand(
content: Union[torch.Tensor, np.ndarray], target_len: int, mode: str = "nearest"
):
"""Repeat content to target length.
This is a wrapper of torch.nn.functional.interpolate.
Args:
content (torch.Tensor): tensor
target_len (int): target length
mode (str, optional): interpolation mode. Defaults to "nearest".
Returns:
torch.Tensor: tensor
"""
ndim = content.ndim
if content.ndim == 1:
content = content[None, None]
elif content.ndim == 2:
content = content[None]
assert content.ndim == 3
is_np = isinstance(content, np.ndarray)
if is_np:
content = torch.from_numpy(content)
results = torch.nn.functional.interpolate(content, size=target_len, mode=mode)
if is_np:
results = results.numpy()
if ndim == 1:
return results[0, 0]
elif ndim == 2:
return results[0]
def repeat_expand_2d(content, target_len, mode = 'left'):
# content : [h, t]
return repeat_expand_2d_left(content, target_len) if mode == 'left' else repeat_expand_2d_other(content, target_len, mode)
def repeat_expand_2d_left(content, target_len):
# content : [h, t]
src_len = content.shape[-1]
target = torch.zeros([content.shape[0], target_len], dtype=torch.float).to(content.device)
temp = torch.arange(src_len+1) * target_len / src_len
current_pos = 0
for i in range(target_len):
if i < temp[current_pos+1]:
target[:, i] = content[:, current_pos]
else:
current_pos += 1
target[:, i] = content[:, current_pos]
return target
# mode : 'nearest'| 'linear'| 'bilinear'| 'bicubic'| 'trilinear'| 'area'
def repeat_expand_2d_other(content, target_len, mode = 'nearest'):
# content : [h, t]
content = content[None,:,:]
target = F.interpolate(content,size=target_len,mode=mode)[0]
return target
def align_data(data, max_len):
data_len = data.shape[-1]
if data_len < max_len:
data = F.pad(data, (0, max_len - data_len))
elif data_len > max_len:
data = data[:max_len]
return data
def adjust_length(feature, target_len):
# feature.shape = (current_len, dim)
current_len = feature.shape[0]
# dim = feature.shape[1]
# 如果当前长度等于目标长度,直接返回
if current_len == target_len:
return feature
# 调整维度以正确插值
feature = feature.t() # 转置为 (dim, current_len)
feature = feature.unsqueeze(0) # 添加批量维度,变为 (1, dim, current_len)
feature = F.interpolate(feature, size=target_len, mode='linear', align_corners=False)
# 输出为 (1, dim, target_len)
feature = feature.squeeze(0) # 移除批量维度,变为 (dim, target_len)
feature = feature.t() # 转置回 (target_len, dim)
return feature
def load_bert_model(model_name, device):
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(device)
return tokenizer, model
def get_style_embed(style_prompt, tokenizer, model):
inputs = tokenizer(style_prompt, return_tensors="pt").to(model.device)
outputs = model(**inputs)
return outputs[-1]
def load_facodec(device):
from Amphion.models.codec.ns3_codec import FACodecEncoderV2, FACodecDecoderV2
fa_encoder = FACodecEncoderV2(
ngf=32,
up_ratios=[2, 4, 5, 5],
out_channels=256,
)
fa_decoder = FACodecDecoderV2(
in_channels=256,
upsample_initial_channel=1024,
ngf=32,
up_ratios=[5, 5, 4, 2],
vq_num_q_c=2,
vq_num_q_p=1,
vq_num_q_r=3,
vq_dim=256,
codebook_dim=8,
codebook_size_prosody=10,
codebook_size_content=10,
codebook_size_residual=10,
use_gr_x_timbre=True,
use_gr_residual_f0=True,
use_gr_residual_phone=True,
)
# encoder_ckpt = hf_hub_download(repo_id="amphion/naturalspeech3_facodec", filename="ns3_facodec_encoder_v2.bin", local_dir="utils/pretrain")
# decoder_ckpt = hf_hub_download(repo_id="amphion/naturalspeech3_facodec", filename="ns3_facodec_decoder_v2.bin", local_dir="utils/pretrain")
encoder_ckpt = "utils/pretrain/ns3_facodec_encoder_v2.bin"
decoder_ckpt = "utils/pretrain/ns3_facodec_decoder_v2.bin"
fa_encoder.load_state_dict(torch.load(encoder_ckpt))
fa_decoder.load_state_dict(torch.load(decoder_ckpt))
fa_encoder = fa_encoder.to(device).eval()
fa_decoder = fa_decoder.to(device).eval()
return fa_encoder, fa_decoder
def load_f0_extractor(args):
f0_extractor = F0_Extractor(args.f0_extractor if args.f0_extractor is not None else 'rmvpe',
args.sr if args.sr is not None else 44100,
args.block_size if args.block_size is not None else 512,
args.f0_min if args.f0_min is not None else 60,
args.f0_max if args.f0_max is not None else 1200)
return f0_extractor
def load_volume_extractor(args):
volume_extractor = Volume_Extractor(args.block_size if args.block_size is not None else 512)
return volume_extractor
def load_audio(input_path, sr):
audio, _ = librosa.load(input_path, sr=sr)
if len(audio.shape) > 1:
audio = librosa.to_mono(audio)
return audio
def resample_and_normalize(audio, max_gain=0.6):
audio = audio / np.abs(audio).max() * max_gain
audio = audio / max(0.01, np.max(np.abs(audio))) * 32767 * max_gain
return audio.astype(np.int16)
def get_processed_file(input_path, sr, encoder_sr, mel_extractor, volume_extractor, f0_extractor,
fa_encoder=None, fa_decoder=None, content_encoder=None, spk_encoder=None,
device='cuda', max_sec=None, f0_interpolate_mode='full'):
if max_sec is not None:
max_audio_44k_len = sr * max_sec
max_audio_len = encoder_sr * max_sec
# 1. 串行加载音频(必须先拿到数据才能提取特征)
if not os.path.exists(input_path):
print(f'\n[Error] {input_path} does not exist!')
return None
try:
name = input_path.split('/')[-1].split('.')[0]
audio_44k = load_audio(input_path, sr)
audio = load_audio(input_path, encoder_sr)
if max_sec is not None and max_audio_44k_len > 0:
audio_44k = audio_44k[:min(len(audio_44k), max_audio_44k_len)]
audio = audio[:min(len(audio), max_audio_len)]
# 转换为 Tensor 供 GPU 任务使用
audio_44k_t = torch.from_numpy(audio_44k).float().to(device).unsqueeze(0)
except Exception as e:
print(f'\n[Error] Failed to load audio. Error: {e}')
return None
# --- 内部并行化逻辑开始 ---
# 定义子任务函数
def task_f0():
return f0_extractor.extract(audio_44k, uv_interp=False)
def task_volume():
return volume_extractor.extract(audio_44k)
def task_mel():
return mel_extractor.extract(audio_44k_t, sr).squeeze()
def task_encoder():
# 这里包含了原本的 FACodec 或 Content/Spk 逻辑
with torch.no_grad():
if fa_encoder is not None and fa_decoder is not None:
audio_t = torch.from_numpy(wav_pad(audio)).unsqueeze(0).unsqueeze(0).to(device)
enc_out = fa_encoder(audio_t)
prosody = fa_encoder.get_prosody_feature(audio_t)
content_emb_t, _, _, _, spk_emb_t = fa_decoder(enc_out, prosody, eval_vq=False, vq=True)
return content_emb_t.squeeze(0), spk_emb_t
return None, None
# 使用线程池并行执行
# 虽然 Python 有 GIL,但 PyTorch 和 C++ 扩展(如 F0 提取)会释放 GIL,实现真正的并行
with ThreadPoolExecutor(max_workers=4) as executor:
future_f0 = executor.submit(task_f0)
future_vol = executor.submit(task_volume)
future_mel = executor.submit(task_mel)
future_enc = executor.submit(task_encoder)
# 获取结果(阻塞直到所有任务完成)
f0 = future_f0.result()
volume = future_vol.result()
mel_t = future_mel.result()
content_emb_t, spk_emb_t = future_enc.result()
# --- 内部并行化逻辑结束 ---
# 3. 后处理(这些步骤依赖前面获取的所有结果)
if f0 is None or volume is None or mel_t is None:
return None
seq_len = mel_t.shape[0]
volume_t = align_data(torch.from_numpy(volume).float(), seq_len)
# 对齐编码器长度
if fa_encoder is not None:
content_emb_t = repeat_expand_2d(content_emb_t, seq_len).T
else:
content_emb_t = adjust_length(content_emb_t, seq_len)
# F0 插值与后处理
f0_origin = f0.copy()
if f0_interpolate_mode == 'full':
uv = (f0 == 0)
if len(f0[~uv]) > 0:
f0[uv] = np.interp(np.where(uv)[0], np.where(~uv)[0], f0[~uv])
else:
return None
elif f0_interpolate_mode == 'part':
f0 = edge_padding(f0)
f0_t = align_data(torch.from_numpy(f0).float(), seq_len)
return dict(
vq_post=content_emb_t,
spk=spk_emb_t,
f0=f0_t,
f0_origin=f0_origin,
vol=volume_t,
name=name,
mel=mel_t
)