mvsepless_zero_gpu / vbach_lib /f0_extractor.py
noblebarkrr's picture
Updated to Dzeta
4f175c5
import pyworld
import numpy as np
import parselmouth
from pathlib import Path
import sys
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.append(str(SCRIPT_DIR.parent))
from functools import lru_cache
from scipy import signal
import librosa
import torch
import torchcrepe
from i18n import _i18n
from audio import read
from namer import Namer
import json
from extra_utils import dw_file, nuclear_clear_model, emergency_ram_clear, extra_clear_torch_cache, hf_spaces_gpu
from args_parser import parse_f0_extract
if __package__:
from .predictors.FCPE import FCPEF0Predictor
from .predictors.RMVPE import RMVPE0Predictor
from .predictors.HPA_RMVPE import HPA_RMVPE
else:
from predictors.FCPE import FCPEF0Predictor
from predictors.RMVPE import RMVPE0Predictor
from predictors.HPA_RMVPE import HPA_RMVPE
BASE_DIR = Path(__file__).resolve().parent / "predictors"
BASE_DIR.mkdir(parents=True, exist_ok=True)
RMVPE_PATH = BASE_DIR / "rmvpe.pt"
HPA_RMVPE_PATH = BASE_DIR / "hpa_rmvpe.pt"
FCPE_PATH = BASE_DIR / "fcpe.pt"
f0_methods = (
"rmvpe+",
"hpa-rmvpe",
"fcpe",
"mangio-crepe",
"mangio-crepe-tiny",
"harvest",
"pm",
"pyin",
)
crepe_like_f0_methods = (f0_methods[3], f0_methods[4], f0_methods[7])
class UnknownF0Method(Exception): pass
class F0CurveNotFound(Exception): pass
requirements: list[list[str | Path]] = [
[
"https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/rmvpe.pt?download=true",
RMVPE_PATH,
],
[
"https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/hpa_rmvpe.pt?download=true",
HPA_RMVPE_PATH,
],
[
"https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/fcpe.pt?download=true",
FCPE_PATH,
]
]
for url, path in requirements:
if not path.exists():
dw_file(url, path)
input_audio_path2wav = {}
@lru_cache(maxsize=128)
def get_harvest_f0(
input_audio_path: str,
fs: int,
f0max: float,
f0min: float,
frame_period: float
) -> np.ndarray:
"""
Получить F0 с помощью Harvest
Args:
input_audio_path: Путь к аудиофайлу
fs: Частота дискретизации
f0max: Максимальная частота F0
f0min: Минимальная частота F0
frame_period: Период кадра
Returns:
Массив F0
"""
audio = input_audio_path2wav[input_audio_path]
f0, t = pyworld.harvest(
audio,
fs=fs,
f0_ceil=f0max,
f0_floor=f0min,
frame_period=frame_period,
)
f0 = pyworld.stonemask(audio, f0, t, fs)
return f0
def f0_extract(
x,
sample_rate,
p_len,
f0_method,
crepe_hop_length,
window,
device,
time_step,
is_half,
f0_min=50,
f0_max=1100,
):
global input_audio_path2wav
if f0_method in ["mangio-crepe", "mangio-crepe-tiny"]:
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
audio = torch.from_numpy(x).to(device, copy=True).unsqueeze(0)
if audio.ndim == 2 and audio.shape[0] > 1:
audio = torch.mean(audio, dim=0, keepdim=True)
pitch_ = torchcrepe.predict(
audio,
sample_rate,
crepe_hop_length,
f0_min,
f0_max,
"tiny" if f0_method == "mangio-crepe-tiny" else "full",
batch_size=crepe_hop_length * 2,
device=device,
pad=True,
)
p_len = p_len or x.shape[0] // crepe_hop_length
source = np.array(pitch_.squeeze(0).cpu().float().numpy())
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source,
)
f0 = np.nan_to_num(target)
elif f0_method == "pyin":
f0, *_ = librosa.pyin(
x.astype(np.float32),
sr=sample_rate,
fmin=f0_min,
fmax=f0_max,
hop_length=crepe_hop_length,
)
source = np.array(f0)
source[source < 0.001] = np.nan
f0 = np.nan_to_num(
np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source,
)
)
elif f0_method == "fcpe":
model = FCPEF0Predictor(
FCPE_PATH,
f0_min=int(f0_min),
f0_max=int(f0_max),
dtype=torch.float32,
device=device,
sample_rate=sample_rate,
threshold=0.03,
)
f0 = model.compute_f0(x, p_len=p_len or len(x) // window)
del model
elif f0_method == "harvest":
input_audio_path2wav = {}
input_audio_path2wav["test.mp3"] = x.astype(np.double)
f0 = get_harvest_f0("test.mp3", sample_rate, f0_max, f0_min, 10)
f0 = signal.medfilt(f0, 3)
elif f0_method == "pm":
f0 = (
parselmouth.Sound(x, sample_rate)
.to_pitch_ac(
time_step=time_step / 1000,
voicing_threshold=0.6,
pitch_floor=f0_min,
pitch_ceiling=f0_max,
)
.selected_array["frequency"]
)
pad_size: int = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0:
f0 = np.pad(
f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant"
)
elif f0_method == "rmvpe+":
model = RMVPE0Predictor(
RMVPE_PATH, is_half=is_half, device=device
)
f0 = model.infer_from_audio_with_pitch(
x, thred=0.03, f0_min=f0_min, f0_max=f0_max
)
del model
elif f0_method == "hpa-rmvpe":
model = HPA_RMVPE(
HPA_RMVPE_PATH, is_half=is_half, device=device
)
f0 = model.infer_from_audio_with_pitch(
x, thred=0.03, f0_min=f0_min, f0_max=f0_max
)
del model
else:
raise UnknownF0Method(_i18n("unknown_f0_method", method=f0_method))
return f0
@hf_spaces_gpu
def f0_extract_and_write(input_audio: str | Path, f0_method: str = f0_methods[0], f0_min: int = 50, f0_max: int = 1100, output_path: str | Path = None):
path = Path(input_audio)
sample_rate: int = 16000
audio, sr_ = read(path, sr=sample_rate, mono=True, flatten=True)
if not output_path:
output_path = Path(Namer.iter(path.with_suffix(".json")))
else:
output_path = Path(output_path)
hop_length = 128
window: int = 160
time_step: float = window / sample_rate * 1000
p_len = len(audio) // window
f0 = f0_extract(audio, sample_rate, p_len, f0_method, hop_length, window, "cuda" if torch.cuda.is_available() else "cpu", time_step, False, f0_min, f0_max)
f0_info = {
"method": f0_method,
"sample_rate": sample_rate,
"window": window,
"p_len": p_len,
"freqs": [freq for freq in f0.tolist()]
}
output_path.write_text(json.dumps(f0_info, indent=4), encoding="utf-8")
del f0_info, f0
extra_clear_torch_cache()
nuclear_clear_model()
emergency_ram_clear()
return output_path.as_posix()
def f0_import(input_json: str | Path):
path = Path(input_json)
f0_info = json.loads(path.read_text("utf-8"))
f0_list = f0_info.get("freqs")
if f0_list:
return np.array(f0_list, dtype=np.float32)
else:
raise F0CurveNotFound(_i18n("f0_curve_not_found"))
if __name__ == "__main__":
args = parse_f0_extract()
f0_extract_and_write(
input_audio=args.input,
f0_method=args.f0_method,
f0_min=args.f0_min,
f0_max=args.f0_max,
output_path=args.output_path
)