import pyworld import numpy as np import parselmouth from pathlib import Path import sys SCRIPT_DIR = Path(__file__).resolve().parent sys.path.append(str(SCRIPT_DIR.parent)) from functools import lru_cache from scipy import signal import librosa import torch import torchcrepe from i18n import _i18n from audio import read from namer import Namer import json from extra_utils import dw_file, nuclear_clear_model, emergency_ram_clear, extra_clear_torch_cache, hf_spaces_gpu from args_parser import parse_f0_extract if __package__: from .predictors.FCPE import FCPEF0Predictor from .predictors.RMVPE import RMVPE0Predictor from .predictors.HPA_RMVPE import HPA_RMVPE else: from predictors.FCPE import FCPEF0Predictor from predictors.RMVPE import RMVPE0Predictor from predictors.HPA_RMVPE import HPA_RMVPE BASE_DIR = Path(__file__).resolve().parent / "predictors" BASE_DIR.mkdir(parents=True, exist_ok=True) RMVPE_PATH = BASE_DIR / "rmvpe.pt" HPA_RMVPE_PATH = BASE_DIR / "hpa_rmvpe.pt" FCPE_PATH = BASE_DIR / "fcpe.pt" f0_methods = ( "rmvpe+", "hpa-rmvpe", "fcpe", "mangio-crepe", "mangio-crepe-tiny", "harvest", "pm", "pyin", ) crepe_like_f0_methods = (f0_methods[3], f0_methods[4], f0_methods[7]) class UnknownF0Method(Exception): pass class F0CurveNotFound(Exception): pass requirements: list[list[str | Path]] = [ [ "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/rmvpe.pt?download=true", RMVPE_PATH, ], [ "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/hpa_rmvpe.pt?download=true", HPA_RMVPE_PATH, ], [ "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/fcpe.pt?download=true", FCPE_PATH, ] ] for url, path in requirements: if not path.exists(): dw_file(url, path) input_audio_path2wav = {} @lru_cache(maxsize=128) def get_harvest_f0( input_audio_path: str, fs: int, f0max: float, f0min: float, frame_period: float ) -> np.ndarray: """ Получить F0 с помощью Harvest Args: input_audio_path: Путь к аудиофайлу fs: Частота дискретизации f0max: Максимальная частота F0 f0min: Минимальная частота F0 frame_period: Период кадра Returns: Массив F0 """ audio = input_audio_path2wav[input_audio_path] f0, t = pyworld.harvest( audio, fs=fs, f0_ceil=f0max, f0_floor=f0min, frame_period=frame_period, ) f0 = pyworld.stonemask(audio, f0, t, fs) return f0 def f0_extract( x, sample_rate, p_len, f0_method, crepe_hop_length, window, device, time_step, is_half, f0_min=50, f0_max=1100, ): global input_audio_path2wav if f0_method in ["mangio-crepe", "mangio-crepe-tiny"]: x = x.astype(np.float32) x /= np.quantile(np.abs(x), 0.999) audio = torch.from_numpy(x).to(device, copy=True).unsqueeze(0) if audio.ndim == 2 and audio.shape[0] > 1: audio = torch.mean(audio, dim=0, keepdim=True) pitch_ = torchcrepe.predict( audio, sample_rate, crepe_hop_length, f0_min, f0_max, "tiny" if f0_method == "mangio-crepe-tiny" else "full", batch_size=crepe_hop_length * 2, device=device, pad=True, ) p_len = p_len or x.shape[0] // crepe_hop_length source = np.array(pitch_.squeeze(0).cpu().float().numpy()) source[source < 0.001] = np.nan target = np.interp( np.arange(0, len(source) * p_len, len(source)) / p_len, np.arange(0, len(source)), source, ) f0 = np.nan_to_num(target) elif f0_method == "pyin": f0, *_ = librosa.pyin( x.astype(np.float32), sr=sample_rate, fmin=f0_min, fmax=f0_max, hop_length=crepe_hop_length, ) source = np.array(f0) source[source < 0.001] = np.nan f0 = np.nan_to_num( np.interp( np.arange(0, len(source) * p_len, len(source)) / p_len, np.arange(0, len(source)), source, ) ) elif f0_method == "fcpe": model = FCPEF0Predictor( FCPE_PATH, f0_min=int(f0_min), f0_max=int(f0_max), dtype=torch.float32, device=device, sample_rate=sample_rate, threshold=0.03, ) f0 = model.compute_f0(x, p_len=p_len or len(x) // window) del model elif f0_method == "harvest": input_audio_path2wav = {} input_audio_path2wav["test.mp3"] = x.astype(np.double) f0 = get_harvest_f0("test.mp3", sample_rate, f0_max, f0_min, 10) f0 = signal.medfilt(f0, 3) elif f0_method == "pm": f0 = ( parselmouth.Sound(x, sample_rate) .to_pitch_ac( time_step=time_step / 1000, voicing_threshold=0.6, pitch_floor=f0_min, pitch_ceiling=f0_max, ) .selected_array["frequency"] ) pad_size: int = (p_len - len(f0) + 1) // 2 if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad( f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant" ) elif f0_method == "rmvpe+": model = RMVPE0Predictor( RMVPE_PATH, is_half=is_half, device=device ) f0 = model.infer_from_audio_with_pitch( x, thred=0.03, f0_min=f0_min, f0_max=f0_max ) del model elif f0_method == "hpa-rmvpe": model = HPA_RMVPE( HPA_RMVPE_PATH, is_half=is_half, device=device ) f0 = model.infer_from_audio_with_pitch( x, thred=0.03, f0_min=f0_min, f0_max=f0_max ) del model else: raise UnknownF0Method(_i18n("unknown_f0_method", method=f0_method)) return f0 @hf_spaces_gpu def f0_extract_and_write(input_audio: str | Path, f0_method: str = f0_methods[0], f0_min: int = 50, f0_max: int = 1100, output_path: str | Path = None): path = Path(input_audio) sample_rate: int = 16000 audio, sr_ = read(path, sr=sample_rate, mono=True, flatten=True) if not output_path: output_path = Path(Namer.iter(path.with_suffix(".json"))) else: output_path = Path(output_path) hop_length = 128 window: int = 160 time_step: float = window / sample_rate * 1000 p_len = len(audio) // window f0 = f0_extract(audio, sample_rate, p_len, f0_method, hop_length, window, "cuda" if torch.cuda.is_available() else "cpu", time_step, False, f0_min, f0_max) f0_info = { "method": f0_method, "sample_rate": sample_rate, "window": window, "p_len": p_len, "freqs": [freq for freq in f0.tolist()] } output_path.write_text(json.dumps(f0_info, indent=4), encoding="utf-8") del f0_info, f0 extra_clear_torch_cache() nuclear_clear_model() emergency_ram_clear() return output_path.as_posix() def f0_import(input_json: str | Path): path = Path(input_json) f0_info = json.loads(path.read_text("utf-8")) f0_list = f0_info.get("freqs") if f0_list: return np.array(f0_list, dtype=np.float32) else: raise F0CurveNotFound(_i18n("f0_curve_not_found")) if __name__ == "__main__": args = parse_f0_extract() f0_extract_and_write( input_audio=args.input, f0_method=args.f0_method, f0_min=args.f0_min, f0_max=args.f0_max, output_path=args.output_path )