Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from logging import getLogger | |
| from pathlib import Path | |
| from random import shuffle | |
| from typing import Iterable, Literal | |
| import librosa | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| from joblib import Parallel, cpu_count, delayed | |
| from tqdm import tqdm | |
| from transformers import HubertModel | |
| import so_vits_svc_fork.f0 | |
| from so_vits_svc_fork import utils | |
| from ..hparams import HParams | |
| from ..modules.mel_processing import spec_to_mel_torch, spectrogram_torch | |
| from ..utils import get_optimal_device, get_total_gpu_memory | |
| from .preprocess_utils import check_hubert_min_duration | |
| LOG = getLogger(__name__) | |
| HUBERT_MEMORY = 2900 | |
| HUBERT_MEMORY_CREPE = 3900 | |
| def _process_one( | |
| *, | |
| filepath: Path, | |
| content_model: HubertModel, | |
| device: torch.device | str = get_optimal_device(), | |
| f0_method: Literal["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"] = "dio", | |
| force_rebuild: bool = False, | |
| hps: HParams, | |
| ): | |
| audio, sr = librosa.load(filepath, sr=hps.data.sampling_rate, mono=True) | |
| if not check_hubert_min_duration(audio, sr): | |
| LOG.info(f"Skip {filepath} because it is too short.") | |
| return | |
| data_path = filepath.parent / (filepath.name + ".data.pt") | |
| if data_path.exists() and not force_rebuild: | |
| return | |
| # Compute f0 | |
| f0 = so_vits_svc_fork.f0.compute_f0( | |
| audio, sampling_rate=sr, hop_length=hps.data.hop_length, method=f0_method | |
| ) | |
| f0, uv = so_vits_svc_fork.f0.interpolate_f0(f0) | |
| f0 = torch.from_numpy(f0).float() | |
| uv = torch.from_numpy(uv).float() | |
| # Compute HuBERT content | |
| audio = torch.from_numpy(audio).float().to(device) | |
| c = utils.get_content( | |
| content_model, | |
| audio, | |
| device, | |
| sr=sr, | |
| legacy_final_proj=hps.data.get("contentvec_final_proj", True), | |
| ) | |
| c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[0]) | |
| torch.cuda.empty_cache() | |
| # Compute spectrogram | |
| audio, sr = torchaudio.load(filepath) | |
| spec = spectrogram_torch(audio, hps).squeeze(0) | |
| mel_spec = spec_to_mel_torch(spec, hps) | |
| torch.cuda.empty_cache() | |
| # fix lengths | |
| lmin = min(spec.shape[1], mel_spec.shape[1], f0.shape[0], uv.shape[0], c.shape[1]) | |
| spec, mel_spec, f0, uv, c = ( | |
| spec[:, :lmin], | |
| mel_spec[:, :lmin], | |
| f0[:lmin], | |
| uv[:lmin], | |
| c[:, :lmin], | |
| ) | |
| # get speaker id | |
| spk_name = filepath.parent.name | |
| spk = hps.spk.__dict__[spk_name] | |
| spk = torch.tensor(spk).long() | |
| assert ( | |
| spec.shape[1] == mel_spec.shape[1] == f0.shape[0] == uv.shape[0] == c.shape[1] | |
| ), (spec.shape, mel_spec.shape, f0.shape, uv.shape, c.shape) | |
| data = { | |
| "spec": spec, | |
| "mel_spec": mel_spec, | |
| "f0": f0, | |
| "uv": uv, | |
| "content": c, | |
| "audio": audio, | |
| "spk": spk, | |
| } | |
| data = {k: v.cpu() for k, v in data.items()} | |
| with data_path.open("wb") as f: | |
| torch.save(data, f) | |
| def _process_batch(filepaths: Iterable[Path], pbar_position: int, **kwargs): | |
| hps = kwargs["hps"] | |
| content_model = utils.get_hubert_model( | |
| get_optimal_device(), hps.data.get("contentvec_final_proj", True) | |
| ) | |
| for filepath in tqdm(filepaths, position=pbar_position): | |
| _process_one( | |
| content_model=content_model, | |
| filepath=filepath, | |
| **kwargs, | |
| ) | |
| def preprocess_hubert_f0( | |
| input_dir: Path | str, | |
| config_path: Path | str, | |
| n_jobs: int | None = None, | |
| f0_method: Literal["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"] = "dio", | |
| force_rebuild: bool = False, | |
| ): | |
| input_dir = Path(input_dir) | |
| config_path = Path(config_path) | |
| hps = utils.get_hparams(config_path) | |
| if n_jobs is None: | |
| # add cpu_count() to avoid SIGKILL | |
| memory = get_total_gpu_memory("total") | |
| n_jobs = min( | |
| max( | |
| memory | |
| // (HUBERT_MEMORY_CREPE if f0_method == "crepe" else HUBERT_MEMORY) | |
| if memory is not None | |
| else 1, | |
| 1, | |
| ), | |
| cpu_count(), | |
| ) | |
| LOG.info(f"n_jobs automatically set to {n_jobs}, memory: {memory} MiB") | |
| filepaths = list(input_dir.rglob("*.wav")) | |
| n_jobs = min(len(filepaths) // 16 + 1, n_jobs) | |
| shuffle(filepaths) | |
| filepath_chunks = np.array_split(filepaths, n_jobs) | |
| Parallel(n_jobs=n_jobs)( | |
| delayed(_process_batch)( | |
| filepaths=chunk, | |
| pbar_position=pbar_position, | |
| f0_method=f0_method, | |
| force_rebuild=force_rebuild, | |
| hps=hps, | |
| ) | |
| for (pbar_position, chunk) in enumerate(filepath_chunks) | |
| ) | |