Spaces:
Sleeping
Sleeping
| import os | |
| import traceback | |
| from concurrent.futures import ProcessPoolExecutor | |
| from typing import * | |
| import multiprocessing as mp | |
| import numpy as np | |
| import pyworld | |
| import torch | |
| import torchcrepe | |
| from torch import Tensor | |
| from tqdm import tqdm | |
| from lib.rvc.utils import load_audio | |
| def get_optimal_torch_device(index: int = 0) -> torch.device: | |
| # Get cuda device | |
| if torch.cuda.is_available(): | |
| return torch.device(f"cuda:{index % torch.cuda.device_count()}") # Very fast | |
| elif torch.backends.mps.is_available(): | |
| return torch.device("mps") | |
| # Insert an else here to grab "xla" devices if available. TO DO later. Requires the torch_xla.core.xla_model library | |
| # Else wise return the "cpu" as a torch device, | |
| return torch.device("cpu") | |
| def get_f0_official_crepe_computation( | |
| x, | |
| sr, | |
| f0_min, | |
| f0_max, | |
| model="full", | |
| ): | |
| batch_size = 512 | |
| torch_device = get_optimal_torch_device() | |
| audio = torch.tensor(np.copy(x))[None].float() | |
| f0, pd = torchcrepe.predict( | |
| audio, | |
| sr, | |
| 160, | |
| f0_min, | |
| f0_max, | |
| model, | |
| batch_size=batch_size, | |
| device=torch_device, | |
| return_periodicity=True, | |
| ) | |
| pd = torchcrepe.filter.median(pd, 3) | |
| f0 = torchcrepe.filter.mean(f0, 3) | |
| f0[pd < 0.1] = 0 | |
| f0 = f0[0].cpu().numpy() | |
| f0 = f0[1:] # Get rid of extra first frame | |
| return f0 | |
| def get_f0_crepe_computation( | |
| x, | |
| sr, | |
| f0_min, | |
| f0_max, | |
| hop_length=160, # 512 before. Hop length changes the speed that the voice jumps to a different dramatic pitch. Lower hop lengths means more pitch accuracy but longer inference time. | |
| model="full", # Either use crepe-tiny "tiny" or crepe "full". Default is full | |
| ): | |
| x = x.astype(np.float32) # fixes the F.conv2D exception. We needed to convert double to float. | |
| x /= np.quantile(np.abs(x), 0.999) | |
| torch_device = get_optimal_torch_device() | |
| audio = torch.from_numpy(x).to(torch_device, copy=True) | |
| audio = torch.unsqueeze(audio, dim=0) | |
| if audio.ndim == 2 and audio.shape[0] > 1: | |
| audio = torch.mean(audio, dim=0, keepdim=True).detach() | |
| audio = audio.detach() | |
| print("Initiating prediction with a crepe_hop_length of: " + str(hop_length)) | |
| pitch: Tensor = torchcrepe.predict( | |
| audio, | |
| sr, | |
| hop_length, | |
| f0_min, | |
| f0_max, | |
| model, | |
| batch_size=hop_length * 2, | |
| device=torch_device, | |
| pad=True | |
| ) | |
| p_len = x.shape[0] // hop_length | |
| # Resize the pitch for final f0 | |
| source = np.array(pitch.squeeze(0).cpu().float().numpy()) | |
| source[source < 0.001] = np.nan | |
| target = np.interp( | |
| np.arange(0, len(source) * p_len, len(source)) / p_len, | |
| np.arange(0, len(source)), | |
| source | |
| ) | |
| f0 = np.nan_to_num(target) | |
| f0 = f0[1:] # Get rid of extra first frame | |
| return f0 # Resized f0 | |
| def compute_f0( | |
| path: str, | |
| f0_method: str, | |
| fs: int, | |
| hop: int, | |
| f0_max: float, | |
| f0_min: float, | |
| ): | |
| x = load_audio(path, fs) | |
| if f0_method == "harvest": | |
| f0, t = pyworld.harvest( | |
| x.astype(np.double), | |
| fs=fs, | |
| f0_ceil=f0_max, | |
| f0_floor=f0_min, | |
| frame_period=1000 * hop / fs, | |
| ) | |
| f0 = pyworld.stonemask(x.astype(np.double), f0, t, fs) | |
| elif f0_method == "dio": | |
| f0, t = pyworld.dio( | |
| x.astype(np.double), | |
| fs=fs, | |
| f0_ceil=f0_max, | |
| f0_floor=f0_min, | |
| frame_period=1000 * hop / fs, | |
| ) | |
| f0 = pyworld.stonemask(x.astype(np.double), f0, t, fs) | |
| elif f0_method == "mangio-crepe": | |
| f0 = get_f0_crepe_computation(x, fs, f0_min, f0_max, 160, "full") | |
| elif f0_method == "crepe": | |
| f0 = get_f0_official_crepe_computation(x.astype(np.double), fs, f0_min, f0_max, "full") | |
| return f0 | |
| def coarse_f0(f0, f0_bin, f0_mel_min, f0_mel_max): | |
| f0_mel = 1127 * np.log(1 + f0 / 700) | |
| f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * (f0_bin - 2) / ( | |
| f0_mel_max - f0_mel_min | |
| ) + 1 | |
| # use 0 or 1 | |
| f0_mel[f0_mel <= 1] = 1 | |
| f0_mel[f0_mel > f0_bin - 1] = f0_bin - 1 | |
| f0_coarse = np.rint(f0_mel).astype(np.int) | |
| assert f0_coarse.max() <= 255 and f0_coarse.min() >= 1, ( | |
| f0_coarse.max(), | |
| f0_coarse.min(), | |
| ) | |
| return f0_coarse | |
| def processor(paths, f0_method, samplerate=16000, hop_size=160, process_id=0): | |
| fs = samplerate | |
| hop = hop_size | |
| f0_bin = 256 | |
| f0_max = 1100.0 | |
| f0_min = 50.0 | |
| f0_mel_min = 1127 * np.log(1 + f0_min / 700) | |
| f0_mel_max = 1127 * np.log(1 + f0_max / 700) | |
| if len(paths) != 0: | |
| for idx, (inp_path, opt_path1, opt_path2) in enumerate( | |
| tqdm(paths, position=1 + process_id) | |
| ): | |
| try: | |
| if ( | |
| os.path.exists(opt_path1 + ".npy") == True | |
| and os.path.exists(opt_path2 + ".npy") == True | |
| ): | |
| continue | |
| featur_pit = compute_f0(inp_path, f0_method, fs, hop, f0_max, f0_min) | |
| np.save( | |
| opt_path2, | |
| featur_pit, | |
| allow_pickle=False, | |
| ) # nsf | |
| coarse_pit = coarse_f0(featur_pit, f0_bin, f0_mel_min, f0_mel_max) | |
| np.save( | |
| opt_path1, | |
| coarse_pit, | |
| allow_pickle=False, | |
| ) # ori | |
| except: | |
| print(f"f0 failed {idx}: {inp_path} {traceback.format_exc()}") | |
| def run(training_dir: str, num_processes: int, f0_method: str): | |
| paths = [] | |
| dataset_dir = os.path.join(training_dir, "1_16k_wavs") | |
| opt_dir_f0 = os.path.join(training_dir, "2a_f0") | |
| opt_dir_f0_nsf = os.path.join(training_dir, "2b_f0nsf") | |
| if os.path.exists(opt_dir_f0) and os.path.exists(opt_dir_f0_nsf): | |
| return | |
| os.makedirs(opt_dir_f0, exist_ok=True) | |
| os.makedirs(opt_dir_f0_nsf, exist_ok=True) | |
| names = [] | |
| for pathname in sorted(list(os.listdir(dataset_dir))): | |
| if os.path.isdir(os.path.join(dataset_dir, pathname)): | |
| for f in sorted(list(os.listdir(os.path.join(dataset_dir, pathname)))): | |
| if "spec" in f: | |
| continue | |
| names.append(os.path.join(pathname, f)) | |
| else: | |
| names.append(pathname) | |
| for name in names: # dataset_dir/{05d}/file.ext | |
| filepath = os.path.join(dataset_dir, name) | |
| if "spec" in filepath: | |
| continue | |
| opt_filepath_f0 = os.path.join(opt_dir_f0, name) | |
| opt_filepath_f0_nsf = os.path.join(opt_dir_f0_nsf, name) | |
| paths.append([filepath, opt_filepath_f0, opt_filepath_f0_nsf]) | |
| for dir in set([(os.path.dirname(p[1]), os.path.dirname(p[2])) for p in paths]): | |
| os.makedirs(dir[0], exist_ok=True) | |
| os.makedirs(dir[1], exist_ok=True) | |
| with ProcessPoolExecutor(mp_context=mp.get_context("spawn")) as executer: | |
| for i in range(num_processes): | |
| executer.submit(processor, paths[i::num_processes], f0_method, process_id=i) | |
| processor(paths, f0_method) | |