niobures's picture
HuBERT (models_onnx: ailia-models)
02eb85d verified
import sys
import time
from logging import getLogger
import numpy as np
import scipy.signal as signal
from PIL import Image
import librosa
import soundfile as sf
import ailia
# import original modules
sys.path.append('../../util')
sys.path.append('../crepe')
from microphone_utils import start_microphone_input # noqa
from model_utils import check_and_download_models # noqa
from arg_utils import get_base_parser, get_savepath, update_parser # noqa
flg_ffmpeg = False
if flg_ffmpeg:
import ffmpeg
logger = getLogger(__name__)
# ======================
# Parameters
# ======================
WEIGHT_HUBERT_PATH = "hubert_base.onnx"
MODEL_HUBERT_PATH = "hubert_base.onnx.prototxt"
WEIGHT_VC_PATH = "AISO-HOWATTO.onnx"
MODEL_VC_PATH = "AISO-HOWATTO.onnx.prototxt"
REMOTE_PATH = 'https://storage.googleapis.com/ailia-models/rvc/'
SAMPLE_RATE = 16000
WAV_PATH = 'booth.wav'
SAVE_WAV_PATH = 'output.wav'
# ======================
# Arguemnt Parser Config
# ======================
parser = get_base_parser(
'Retrieval-based-Voice-Conversion', WAV_PATH, SAVE_WAV_PATH, input_ftype='audio'
)
parser.add_argument(
'--tgt_sr', metavar="SR", type=int, default=40000,
help='VC model sampling rate.',
)
parser.add_argument(
'--f0', type=int, default=0, choices=(0, 1),
help='f0 flag of VC model.',
)
parser.add_argument(
'--sid', type=int, default=0,
help='Select Speaker/Singer ID',
)
parser.add_argument(
'--f0_up_key', metavar="N", type=int, default=0,
help='Transpose (number of semitones, raise by an octave: 12, lower by an octave: -12)',
)
parser.add_argument(
'--f0_method', default="pm", choices=("pm", "harvest", "crepe", "crepe_tiny"),
help='Select the pitch extraction algorithm',
)
parser.add_argument(
'--file_index', metavar="FILE", type=str, default=None,
help='Path to the feature index file.',
)
parser.add_argument(
'--index_rate', metavar="RATIO", type=float, default=0.75,
help='Search feature ratio. (controls accent strength, too high has artifacting)',
)
parser.add_argument(
'--filter_radius', metavar="N", type=int, default=3,
help='If >=3: apply median filtering to the harvested pitch results. The value can reduce breathiness.',
)
parser.add_argument(
'--resample_sr', metavar="SR", type=int, default=0,
help='Resample the output audio. Set to 0 for no resampling.',
)
parser.add_argument(
'--rms_mix_rate', metavar="RATE", type=float, default=0.25,
help='Adjust the volume envelope scaling.',
)
parser.add_argument(
'--protect', metavar="N", type=float, default=0.33,
help='Protect voiceless consonants and breath sounds'
' to prevent artifacts such as tearing in electronic music.'
' Set to 0.5 to disable',
)
parser.add_argument(
'-m', '--model_file', default=WEIGHT_VC_PATH,
help='specify .onnx file'
)
parser.add_argument(
'--version', default=1, choices=[1, 2], type=int,
help='specify rvc version'
)
parser.add_argument(
'--onnx',
action='store_true',
help='execute onnxruntime version.'
)
args = update_parser(parser)
class VCParam(object):
def __init__(self, tgt_sr):
self.x_pad, self.x_query, self.x_center, self.x_max = (
3, 10, 60, 65
)
self.sr = 16000 # hubert输入采样率
self.window = 160 # 每帧点数
self.t_pad = self.sr * self.x_pad # 每条前后pad时间
self.t_pad_tgt = tgt_sr * self.x_pad
self.t_pad2 = self.t_pad * 2
self.t_query = self.sr * self.x_query # 查询切点前后查询时间
self.t_center = self.sr * self.x_center # 查询切点位置
self.t_max = self.sr * self.x_max # 免查询时长阈值
# ======================
# Secondaty Functions
# ======================
def load_audio(file: str, sr: int = SAMPLE_RATE):
if flg_ffmpeg:
# https://github.com/openai/whisper/blob/main/whisper/audio.py#L26
# This launches a subprocess to decode audio while down-mixing and resampling as necessary.
# Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
out, _ = ffmpeg.input(file, threads=0) \
.output("-", format="f32le", acodec="pcm_f32le", ac=1, ar=sr) \
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
audio = np.frombuffer(out, np.float32).flatten()
else:
# prepare input data
audio, source_sr = librosa.load(file, sr=None)
# Resample the wav if needed
if source_sr is not None and source_sr != sr:
audio = librosa.resample(audio, orig_sr=source_sr, target_sr=sr)
return audio
def change_rms(data1, sr1, data2, sr2, rate): # 1是输入音频,2是输出音频,rate是2的占比
rms1 = librosa.feature.rms(
y=data1, frame_length=sr1 // 2 * 2, hop_length=sr1 // 2
) # 每半秒一个点
rms2 = librosa.feature.rms(y=data2, frame_length=sr2 // 2 * 2, hop_length=sr2 // 2)
rms1 = np.array(Image.fromarray(rms1).resize((data2.shape[0], 1), Image.Resampling.BILINEAR))
rms1 = rms1.flatten()
rms2 = np.array(Image.fromarray(rms2).resize((data2.shape[0], 1), Image.Resampling.BILINEAR))
rms2 = rms2.flatten()
r = np.zeros(rms2.shape) + 1e-6
rms2 = np.where(rms2 > r, rms2, r)
data2 *= np.power(rms1, 1 - rate) * np.power(rms2, rate - 1)
return data2
# ======================
# Main functions
# ======================
def get_f0(
vc_param,
x,
p_len,
f0_up_key,
f0_method,
filter_radius,
inp_f0=None):
time_step = vc_param.window / vc_param.sr * 1000
f0_min = 50
f0_max = 1100
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
if f0_method == "pm":
import parselmouth
f0 = (
parselmouth.Sound(x, vc_param.sr).to_pitch_ac(
time_step=time_step / 1000,
voicing_threshold=0.6,
pitch_floor=f0_min,
pitch_ceiling=f0_max,
).selected_array["frequency"]
)
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0:
f0 = np.pad(
f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant"
)
elif f0_method == "harvest":
import pyworld
audio = x.astype(np.double)
fs = vc_param.sr
frame_period = 10
f0, t = pyworld.harvest(
audio,
fs=fs,
f0_ceil=f0_max,
f0_floor=f0_min,
frame_period=frame_period,
)
f0 = pyworld.stonemask(audio, f0, t, fs)
if filter_radius > 2:
f0 = signal.medfilt(f0, 3)
elif f0_method == "crepe" or f0_method == "crepe_tiny":
import mod_crepe
# Pick a batch size that doesn't cause memory errors on your gpu
batch_size = 512
audio = np.copy(x)[None]
f0, pd = mod_crepe.predict(
audio,
vc_param.sr,
vc_param.window,
f0_min,
f0_max,
batch_size=batch_size,
return_periodicity=True,
)
pd = mod_crepe.median(pd, 3)
f0 = mod_crepe.mean(f0, 3)
f0[pd < 0.1] = 0
f0 = f0[0]
else:
raise ValueError("f0_method: %s" % f0_method)
f0 *= pow(2, f0_up_key / 12)
tf0 = vc_param.sr // vc_param.window # 每秒f0点数
if inp_f0 is not None:
delta_t = np.round(
(inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1
).astype("int16")
replace_f0 = np.interp(
list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1]
)
shape = f0[vc_param.x_pad * tf0: vc_param.x_pad * tf0 + len(replace_f0)].shape[0]
f0[vc_param.x_pad * tf0: vc_param.x_pad * tf0 + len(replace_f0)] = \
replace_f0[:shape]
f0bak = f0.copy()
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = \
(f0_mel[f0_mel > 0] - f0_mel_min) * 254 \
/ (f0_mel_max - f0_mel_min) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(int)
return f0_coarse, f0bak # 1-0
def vc(
hubert,
net_g,
sid,
audio0,
pitch,
pitchf,
vc_param,
index,
big_npy,
index_rate,
protect):
feats = audio0.reshape(1, -1).astype(np.float32)
padding_mask = np.zeros(feats.shape, dtype=bool)
# feedforward
if not args.onnx:
output = hubert.predict([feats, padding_mask])
else:
output = hubert.run(None, {'source': feats, 'padding_mask': padding_mask})
if args.version == 1:
feats = output[0] # v1 : 256
elif args.version == 2:
feats = hubert.get_blob_data(hubert.find_blob_index_by_name("/encoder/Slice_5_output_0")) # v2 : 768
if protect < 0.5 and pitch is not None and pitchf is not None:
feats0 = np.copy(feats)
if isinstance(index, type(None)) is False \
and isinstance(big_npy, type(None)) is False \
and index_rate > 0:
x = feats[0]
score, ix = index.search(x, k=8)
weight = np.square(1 / score)
weight /= weight.sum(axis=1, keepdims=True)
x = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
feats = (
np.expand_dims(x, axis=0) * index_rate
+ (1 - index_rate) * feats
)
# interpolate
new_feats = np.zeros((feats.shape[0], feats.shape[1] * 2, feats.shape[2]), dtype=np.float32)
for i in range(feats.shape[1]):
new_feats[:, i * 2 + 0, :] = feats[:, i, :]
new_feats[:, i * 2 + 1, :] = feats[:, i, :]
feats = new_feats
if protect < 0.5 and pitch is not None and pitchf is not None:
# interpolate
new_feats = np.zeros((feats0.shape[0], feats0.shape[1] * 2, feats0.shape[2]), dtype=np.float32)
for i in range(feats0.shape[1]):
new_feats[:, i * 2 + 0, :] = feats0[:, i, :]
new_feats[:, i * 2 + 1, :] = feats0[:, i, :]
feats0 = new_feats
p_len = audio0.shape[0] // vc_param.window
if feats.shape[1] < p_len:
p_len = feats.shape[1]
if pitch is not None and pitchf is not None:
pitch = pitch[:, :p_len]
pitchf = pitchf[:, :p_len]
if protect < 0.5 and pitch is not None and pitchf is not None:
pitchff = np.copy(pitchf)
pitchff[pitchf > 0] = 1
pitchff[pitchf < 1] = protect
pitchff = np.expand_dims(pitchff, axis=-1)
feats = feats * pitchff + feats0 * (1 - pitchff)
p_len = np.array([p_len], dtype=int)
# feedforward
rnd = np.random.randn(1, 192, p_len[0]).astype(np.float32) * 0.66666 # 噪声(加入随机因子)
if pitch is not None and pitchf is not None:
if not args.onnx:
output = net_g.predict([feats, p_len, pitch, pitchf, sid, rnd])
else:
output = net_g.run(None, {
'phone': feats, 'phone_lengths': p_len,
'pitch': pitch, 'pitchf': pitchf,
'ds': sid, 'rnd': rnd
})
else:
if not args.onnx:
output = net_g.predict([feats, p_len, sid, rnd])
else:
output = net_g.run(None, {
'phone': feats, 'phone_lengths': p_len, 'ds': sid, 'rnd': rnd
})
audio1 = output[0][0, 0]
return audio1
bh, ah = signal.butter(N=5, Wn=48, btype="high", fs=16000)
def predict(audio, models, tgt_sr=40000, if_f0=0):
audio_max = np.abs(audio).max() / 0.95
if audio_max > 1:
audio /= audio_max
sid = args.sid
file_index = args.file_index
index_rate = args.index_rate
resample_sr = args.resample_sr
rms_mix_rate = args.rms_mix_rate
protect = args.protect
f0_up_key = args.f0_up_key
f0_method = args.f0_method
filter_radius = args.filter_radius
inp_f0 = None
vc_param = VCParam(tgt_sr)
index = big_npy = None
if file_index and index_rate > 0:
import faiss
try:
index = faiss.read_index(file_index)
big_npy = index.reconstruct_n(0, index.ntotal)
except Exception as e:
logger.exception(e)
audio = signal.filtfilt(bh, ah, audio)
audio_pad = np.pad(audio, (vc_param.window // 2, vc_param.window // 2), mode="reflect")
opt_ts = []
if audio_pad.shape[0] > vc_param.t_max:
audio_sum = np.zeros_like(audio)
for i in range(vc_param.window):
audio_sum += audio_pad[i: i - vc_param.window]
for t in range(vc_param.t_center, audio.shape[0], vc_param.t_center):
opt_ts.append(
t - vc_param.t_query
+ np.where(
np.abs(audio_sum[t - vc_param.t_query: t + vc_param.t_query])
== np.abs(audio_sum[t - vc_param.t_query: t + vc_param.t_query]).min()
)[0][0]
)
s = 0
audio_opt = []
t = None
audio_pad = np.pad(audio, (vc_param.t_pad, vc_param.t_pad), mode="reflect")
p_len = audio_pad.shape[0] // vc_param.window
pitch, pitchf = None, None
if if_f0 == 1:
pitch, pitchf = get_f0(
vc_param,
audio_pad,
p_len,
f0_up_key,
f0_method,
filter_radius,
inp_f0,
)
pitch = pitch[:p_len]
pitchf = pitchf[:p_len]
pitch = np.expand_dims(pitch, axis=0)
pitchf = np.expand_dims(pitchf, axis=0)
pitchf = pitchf.astype(np.float32)
sid = np.array([sid], dtype=int)
for t in opt_ts:
t = t // vc_param.window * vc_param.window
audio1 = vc(
models["hubert"],
models["net_g"],
sid,
audio_pad[s: t + vc_param.t_pad2 + vc_param.window],
pitch[:, s // vc_param.window: (t + vc_param.t_pad2) // vc_param.window]
if if_f0 == 1 else None,
pitchf[:, s // vc_param.window: (t + vc_param.t_pad2) // vc_param.window]
if if_f0 == 1 else None,
vc_param,
index,
big_npy,
index_rate,
protect,
)
audio_opt.append(audio1[vc_param.t_pad_tgt: -vc_param.t_pad_tgt])
s = t
audio1 = vc(
models["hubert"],
models["net_g"],
sid,
audio_pad[t:],
(pitch[:, t // vc_param.window:] if t is not None else pitch)
if if_f0 == 1 else None,
(pitchf[:, t // vc_param.window:] if t is not None else pitchf)
if if_f0 == 1 else None,
vc_param,
index,
big_npy,
index_rate,
protect,
)
audio_opt.append(audio1[vc_param.t_pad_tgt: -vc_param.t_pad_tgt])
audio_opt = np.concatenate(audio_opt)
audio_opt = audio_opt.astype(np.float32)
if rms_mix_rate < 1:
audio_opt = change_rms(audio, 16000, audio_opt, tgt_sr, rms_mix_rate)
if 16000 <= resample_sr != tgt_sr:
audio_opt = librosa.resample(
audio_opt, orig_sr=tgt_sr, target_sr=resample_sr
)
tgt_sr = resample_sr
audio_max = np.abs(audio_opt).max() / 0.99
max_int16 = 32768
if audio_max > 1:
max_int16 /= audio_max
audio_opt = (audio_opt * max_int16).astype(np.int16)
return audio_opt, tgt_sr
def recognize_from_audio(models):
# Depend on voice model
tgt_sr = args.tgt_sr
if_f0 = args.f0
# input audio loop
for audio_path in args.input:
logger.info(audio_path)
# prepare input data
audio = load_audio(audio_path, SAMPLE_RATE)
# inference
logger.info('Start inference...')
if args.benchmark:
logger.info('BENCHMARK mode')
start = int(round(time.time() * 1000))
output, sr = predict(audio, models, tgt_sr, if_f0)
end = int(round(time.time() * 1000))
estimation_time = (end - start)
logger.info(f'\ttotal processing time {estimation_time} ms')
else:
output, sr = predict(audio, models, tgt_sr, if_f0)
# save result
savepath = get_savepath(args.savepath, audio_path, ext='.wav')
logger.info(f'saved at : {savepath}')
sf.write(savepath, output, sr)
logger.info('Script finished successfully.')
def main():
WEIGHT_VC_PATH = args.model_file
MODEL_VC_PATH = WEIGHT_VC_PATH.replace(".onnx", ".onnx.prototxt")
check_and_download_models(WEIGHT_HUBERT_PATH, MODEL_HUBERT_PATH, REMOTE_PATH)
check_and_download_models(WEIGHT_VC_PATH, MODEL_VC_PATH, REMOTE_PATH)
if args.f0 == 1 and (args.f0_method == "crepe" or args.f0_method == "crepe_tiny"):
from mod_crepe import WEIGHT_CREPE_PATH, MODEL_CREPE_PATH, WEIGHT_CREPE_TINY_PATH, MODEL_CREPE_TINY_PATH
if args.f0_method == "crepe_tiny":
check_and_download_models(WEIGHT_CREPE_TINY_PATH, MODEL_CREPE_TINY_PATH, REMOTE_PATH)
else:
check_and_download_models(WEIGHT_CREPE_PATH, MODEL_CREPE_PATH, REMOTE_PATH)
env_id = args.env_id
# initialize
if not args.onnx:
hubert = ailia.Net(MODEL_HUBERT_PATH, WEIGHT_HUBERT_PATH, env_id=env_id)
net_g = ailia.Net(MODEL_VC_PATH, WEIGHT_VC_PATH, env_id=env_id)
if args.profile:
hubert.set_profile_mode(True)
net_g.set_profile_mode(True)
else:
import onnxruntime
providers = ["CPUExecutionProvider", "CUDAExecutionProvider"]
hubert = onnxruntime.InferenceSession(WEIGHT_HUBERT_PATH, providers=providers)
net_g = onnxruntime.InferenceSession(WEIGHT_VC_PATH, providers=providers)
if args.f0 == 1 and (args.f0_method == "crepe" or args.f0_method == "crepe_tiny"):
import mod_crepe
f0_model = mod_crepe.load_model(env_id, args.onnx, args.f0_method == "crepe_tiny")
if args.profile:
f0_model.set_profile_mode(True)
else:
f0_model = None
models = {
"hubert": hubert,
"net_g": net_g,
}
recognize_from_audio(models)
if args.profile and not args.onnx:
print("--- profile hubert")
print(hubert.get_summary())
print("")
print("--- profile net_g")
print(net_g.get_summary())
print("")
if f0_model != None:
print("--- profile f0_model")
print(f0_model.get_summary())
print("")
if __name__ == '__main__':
main()