Spaces:
Sleeping
Sleeping
| """ | |
| RVC Voice Conversion β HuggingFace Space | |
| Simple, fast, GPU/CPU auto-detected. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import shutil | |
| import sys | |
| import tempfile | |
| import zipfile | |
| from pathlib import Path | |
| import torch | |
| # ββ Path bootstrap ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent | |
| sys.path.insert(0, str(BASE_DIR)) | |
| MODELS_DIR = BASE_DIR / "rvc_models" | |
| OUTPUT_DIR = BASE_DIR / "outputs" | |
| MODELS_DIR.mkdir(exist_ok=True) | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| os.environ.setdefault("URVC_MODELS_DIR", str(MODELS_DIR / "urvc")) | |
| # ββ Logging βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββ In-memory log buffer (feeds the Logs tab in the UI) ββββββββββββββββββββββ | |
| import collections | |
| _LOG_BUFFER = collections.deque(maxlen=200) # keep last 200 lines | |
| class _UILogHandler(logging.Handler): | |
| def emit(self, record): | |
| _LOG_BUFFER.append(self.format(record)) | |
| _ui_handler = _UILogHandler() | |
| _ui_handler.setLevel(logging.INFO) | |
| _ui_handler.setFormatter(logging.Formatter( | |
| fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| datefmt="%H:%M:%S", | |
| )) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| # Attach UI handler to root logger so all child loggers feed into it | |
| logging.getLogger().addHandler(_ui_handler) | |
| for _noisy in ("httpx", "httpcore", "faiss", "faiss.loader", "transformers", "torch"): | |
| logging.getLogger(_noisy).setLevel(logging.WARNING) | |
| logger = logging.getLogger("rvc_space") | |
| # ββ CPU threading β use all available cores βββββββββββββββββββββββββββββββββββ | |
| # Use sched_getaffinity to get cores actually allocated to this process | |
| # (os.cpu_count() returns the host total which is wrong in containers) | |
| try: | |
| _NUM_CORES = len(os.sched_getaffinity(0)) | |
| except AttributeError: | |
| _NUM_CORES = os.cpu_count() or 1 | |
| torch.set_num_threads(_NUM_CORES) | |
| torch.set_num_interop_threads(_NUM_CORES) | |
| os.environ["OMP_NUM_THREADS"] = str(_NUM_CORES) | |
| os.environ["MKL_NUM_THREADS"] = str(_NUM_CORES) | |
| os.environ["NUMEXPR_NUM_THREADS"] = str(_NUM_CORES) | |
| os.environ["OPENBLAS_NUM_THREADS"] = str(_NUM_CORES) | |
| # Use fastest matmul precision on CPU (trades tiny accuracy for speed) | |
| torch.set_float32_matmul_precision("high") | |
| # Enable oneDNN fusion optimisations (fuses ops like conv+relu into one kernel) | |
| torch.backends.mkldnn.enabled = True | |
| logger.info("CPU threads: %d | matmul: high precision | oneDNN: enabled", _NUM_CORES) | |
| # ββ Device ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if torch.cuda.is_available(): | |
| DEVICE = "cuda" | |
| DEVICE_LABEL = f"π’ GPU Β· {torch.cuda.get_device_name(0)}" | |
| else: | |
| DEVICE = "cpu" | |
| DEVICE_LABEL = f"π΅ CPU Β· {_NUM_CORES} cores" | |
| logger.info("Device: %s", DEVICE_LABEL) | |
| # ββ Built-in models ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BUILTIN_MODELS = [ | |
| { | |
| "name": "Vestia Zeta v1", | |
| "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zeta.zip", | |
| }, | |
| { | |
| "name": "Vestia Zeta v2", | |
| "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zetaTest.zip", | |
| }, | |
| { | |
| "name": "Ayunda Risu", | |
| "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/risu.zip", | |
| }, | |
| { | |
| "name": "Gawr Gura", | |
| "url": "https://huggingface.co/Gigrig/GigrigRVC/resolve/41d46f087b9c7d70b93acf100f1cb9f7d25f3831/GawrGura_RVC_v2_Ov2Super_e275_s64075.zip", | |
| }, | |
| ] | |
| # ββ Lazy VoiceConverter βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _vc_instance = None | |
| def _get_vc(): | |
| global _vc_instance | |
| if _vc_instance is None: | |
| logger.info("Loading VoiceConverterβ¦") | |
| from ultimate_rvc.rvc.infer.infer import VoiceConverter | |
| _vc_instance = VoiceConverter() | |
| logger.info("VoiceConverter ready.") | |
| return _vc_instance | |
| # ββ Model helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_models() -> list[str]: | |
| if not MODELS_DIR.exists(): | |
| return [] | |
| return sorted(p.name for p in MODELS_DIR.iterdir() | |
| if p.is_dir() and list(p.glob("*.pth"))) | |
| def _pth_and_index(name: str) -> tuple[str, str]: | |
| d = MODELS_DIR / name | |
| pths = list(d.glob("*.pth")) | |
| idxs = list(d.glob("*.index")) | |
| if not pths: | |
| raise FileNotFoundError(f"No .pth file found in model '{name}'") | |
| return str(pths[0]), str(idxs[0]) if idxs else "" | |
| def _extract_zip(zip_path: str | Path, dest_name: str) -> None: | |
| dest = MODELS_DIR / dest_name | |
| dest.mkdir(exist_ok=True) | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| zf.extractall(dest) | |
| for nested in list(dest.rglob("*.pth")) + list(dest.rglob("*.index")): | |
| target = dest / nested.name | |
| if nested != target: | |
| shutil.move(str(nested), str(target)) | |
| def _download_file(url: str, dest: Path) -> None: | |
| """Download a single file if not already present.""" | |
| if dest.exists(): | |
| return | |
| dest.parent.mkdir(parents=True, exist_ok=True) | |
| logger.info("Downloading %s ...", dest.name) | |
| import requests | |
| r = requests.get(url, stream=True, timeout=300) | |
| r.raise_for_status() | |
| with tempfile.NamedTemporaryFile(delete=False, dir=dest.parent, suffix=".tmp") as tmp: | |
| for chunk in r.iter_content(8192): | |
| tmp.write(chunk) | |
| tmp_path = tmp.name | |
| os.replace(tmp_path, dest) | |
| logger.info("%s ready.", dest.name) | |
| def download_predictors() -> None: | |
| """Download rmvpe.pt and fcpe.pt needed by all F0 methods.""" | |
| base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/predictors" | |
| predictors_dir = MODELS_DIR / "urvc" / "rvc" / "predictors" | |
| for fname in ("rmvpe.pt", "fcpe.pt"): | |
| _download_file(f"{base}/{fname}", predictors_dir / fname) | |
| def download_embedders() -> None: | |
| """Download the default contentvec embedder (pytorch_model.bin + config.json).""" | |
| base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/embedders" | |
| embedders_dir = MODELS_DIR / "urvc" / "rvc" / "embedders" | |
| for folder, fname in ( | |
| ("contentvec", "pytorch_model.bin"), | |
| ("contentvec", "config.json"), | |
| ): | |
| _download_file(f"{base}/{folder}/{fname}", embedders_dir / folder / fname) | |
| def download_builtin_models() -> str: | |
| """Download all built-in models. Returns name of the first one as default.""" | |
| import requests | |
| first = None | |
| for model in BUILTIN_MODELS: | |
| name = model["name"] | |
| dest = MODELS_DIR / name | |
| if dest.exists() and list(dest.glob("*.pth")): | |
| logger.info("Model already present: %s", name) | |
| else: | |
| logger.info("Downloading model: %s ...", name) | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: | |
| r = requests.get(model["url"], stream=True, timeout=300) | |
| r.raise_for_status() | |
| for chunk in r.iter_content(8192): | |
| tmp.write(chunk) | |
| tmp_path = tmp.name | |
| _extract_zip(tmp_path, name) | |
| os.unlink(tmp_path) | |
| logger.info("Model ready: %s", name) | |
| if first is None: | |
| first = name | |
| return first | |
| # ββ Upload handler ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def upload_model(zip_file, model_name): | |
| if not zip_file: | |
| return "β οΈ No file provided.", None, None | |
| name = (model_name or "").strip() or Path(zip_file).stem | |
| try: | |
| _extract_zip(zip_file, name) | |
| models = list_models() | |
| return ( | |
| f"β Model **{name}** loaded successfully.", | |
| {"choices": models, "value": name, "__type__": "update"}, | |
| {"value": [[m] for m in models], "__type__": "update"}, | |
| ) | |
| except Exception as exc: | |
| logger.exception("Model upload failed") | |
| return f"β Error: {exc}", None, None | |
| # ββ Refresh handler βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def refresh_models(): | |
| models = list_models() | |
| return ( | |
| {"value": [[m] for m in models], "__type__": "update"}, | |
| {"choices": models, "__type__": "update"}, | |
| ) | |
| # ββ Autotune visibility toggle ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def toggle_autotune(enabled): | |
| return {"visible": enabled, "__type__": "update"} | |
| # ββ Conversion ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def convert( | |
| audio_mic, audio_file, model_name, | |
| pitch, f0_method, | |
| index_rate, protect, volume_envelope, | |
| clean_audio, clean_strength, | |
| split_audio, autotune, autotune_strength, | |
| output_format, | |
| ): | |
| audio_input = audio_mic or audio_file | |
| if audio_input is None: | |
| return "β οΈ Please record or upload audio first.", None | |
| if not model_name: | |
| return "β οΈ No model selected.", None | |
| try: | |
| model_path, index_path = _pth_and_index(model_name) | |
| except FileNotFoundError as exc: | |
| return f"β {exc}", None | |
| # Opus needs post-processing β always generate WAV first, then re-encode | |
| is_opus = output_format.upper() == "OPUS" | |
| engine_format = "WAV" if is_opus else output_format | |
| ts = int(__import__("time").time()) | |
| wav_path = str(OUTPUT_DIR / f"output-{ts}.wav") | |
| out_path = str(OUTPUT_DIR / (f"output-{ts}.opus" if is_opus else f"output-{ts}.{output_format.lower()}")) | |
| try: | |
| vc = _get_vc() | |
| vc.convert_audio( | |
| audio_input_path=audio_input, | |
| audio_output_path=wav_path, | |
| model_path=model_path, | |
| index_path=index_path, | |
| pitch=pitch, | |
| f0_method=f0_method, | |
| index_rate=index_rate, | |
| volume_envelope=volume_envelope, | |
| protect=protect, | |
| split_audio=split_audio, | |
| f0_autotune=autotune, | |
| f0_autotune_strength=autotune_strength, | |
| clean_audio=clean_audio, | |
| clean_strength=clean_strength, | |
| export_format=engine_format, | |
| ) | |
| if is_opus: | |
| # Encode to Opus at 4800 bps via ffmpeg (pydub shells out to ffmpeg) | |
| import subprocess | |
| subprocess.run( | |
| [ | |
| "ffmpeg", "-y", | |
| "-i", wav_path, | |
| "-c:a", "libopus", | |
| "-b:a", "64000", | |
| "-vbr", "off", | |
| "-ar", "48000", | |
| out_path, | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| return "β Conversion complete!", out_path | |
| except Exception as exc: | |
| logger.exception("Conversion error") | |
| return f"β Error: {exc}", None | |
| # ββ Startup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _startup_status = "" | |
| _default_model = "" | |
| try: | |
| download_predictors() | |
| download_embedders() | |
| _default_model = download_builtin_models() | |
| _startup_status = f"β Ready Β· {DEVICE_LABEL}" | |
| except Exception as _e: | |
| _startup_status = f"β οΈ Default model unavailable: {_e} Β· {DEVICE_LABEL}" | |
| logger.warning("Could not download default model: %s", _e) | |
| _initial_models = list_models() | |
| _initial_value = _default_model if _default_model in _initial_models else ( | |
| _initial_models[0] if _initial_models else None | |
| ) | |
| # ββ Gradio 6 UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_logs() -> str: | |
| """Return buffered log lines as a single string for display.""" | |
| return "\n".join(_LOG_BUFFER) if _LOG_BUFFER else "(no logs yet)" | |
| import gradio as gr | |
| _CSS = """ | |
| #header { text-align: center; padding: 20px 0 8px; } | |
| #header h1 { font-size: 2rem; margin: 0; } | |
| #header p { opacity: .65; margin: 4px 0 0; } | |
| #status { text-align: center; font-size: .82rem; opacity: .7; margin-bottom: 8px; } | |
| footer { display: none !important; } | |
| """ | |
| # In Gradio 6, css goes back into gr.Blocks() | |
| with gr.Blocks(title="RVC Voice Conversion", css=_CSS) as demo: | |
| gr.HTML(f""" | |
| <div id="header"> | |
| <h1>ποΈ RVC Voice Conversion</h1> | |
| <p>Retrieval-Based Voice Conversion Β· record or upload Β· custom models Β· GPU/CPU auto</p> | |
| </div> | |
| <p id="status">{_startup_status}</p> | |
| """) | |
| with gr.Tabs(): | |
| # ββ TAB 1: Convert ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π€ Convert"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Input Audio") | |
| with gr.Tabs(): | |
| with gr.Tab("ποΈ Microphone"): | |
| inp_mic = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Record", | |
| ) | |
| with gr.Tab("π Upload File"): | |
| inp_file = gr.Audio( | |
| sources=["upload"], | |
| type="filepath", | |
| label="Upload audio (wav / mp3 / flac / ogg β¦)", | |
| ) | |
| gr.Markdown("### π€ Model") | |
| model_dd = gr.Dropdown( | |
| choices=_initial_models, | |
| value=_initial_value, | |
| label="Active Voice Model", | |
| interactive=True, | |
| ) | |
| gr.Markdown("### ποΈ Basic Settings") | |
| pitch_sl = gr.Slider( | |
| minimum=-24, maximum=24, value=0, step=1, | |
| label="Pitch Shift (semitones)", | |
| info="0 = unchanged Β· positive = higher Β· negative = lower", | |
| ) | |
| f0_radio = gr.Radio( | |
| choices=["rmvpe", "fcpe", "crepe", "crepe-tiny"], | |
| value="rmvpe", | |
| label="Pitch Extraction Method", | |
| info="rmvpe = fastest & accurate Β· crepe = highest quality (slower)", | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### βοΈ Advanced Settings") | |
| with gr.Accordion("Expand advanced options", open=False): | |
| index_rate_sl = gr.Slider( | |
| 0.0, 1.0, value=0.75, step=0.05, | |
| label="Index Rate", | |
| info="How strongly the FAISS index influences timbre (0 = off)", | |
| ) | |
| protect_sl = gr.Slider( | |
| 0.0, 0.5, value=0.5, step=0.01, | |
| label="Protect Consonants", | |
| info="Protects unvoiced consonants β 0.5 = max protection", | |
| ) | |
| vol_env_sl = gr.Slider( | |
| 0.0, 1.0, value=1.0, step=0.1, | |
| label="Volume Envelope Mix", | |
| info="1 = use input volume shape Β· 0 = use model output volume", | |
| ) | |
| with gr.Row(): | |
| clean_cb = gr.Checkbox(value=False, label="Noise Reduction") | |
| clean_sl = gr.Slider( | |
| 0.0, 1.0, value=0.5, step=0.05, | |
| label="Reduction Strength", | |
| ) | |
| with gr.Row(): | |
| split_cb = gr.Checkbox(value=False, label="Split Long Audio") | |
| autotune_cb = gr.Checkbox(value=False, label="Autotune") | |
| autotune_sl = gr.Slider( | |
| 0.0, 1.0, value=1.0, step=0.05, | |
| label="Autotune Strength", | |
| visible=False, | |
| ) | |
| autotune_cb.change( | |
| fn=toggle_autotune, | |
| inputs=autotune_cb, | |
| outputs=autotune_sl, | |
| ) | |
| fmt_radio = gr.Radio( | |
| choices=["WAV", "MP3", "FLAC", "OPUS"], | |
| value="WAV", | |
| label="Output Format", | |
| info="OPUS = small file size (~64 kbps, Telegram/Discord quality)", | |
| ) | |
| convert_btn = gr.Button("π Convert Voice", variant="primary") | |
| gr.Markdown("### π§ Output") | |
| out_audio = gr.Audio(label="Result", type="filepath", interactive=False) | |
| out_status = gr.Textbox(label="Status", interactive=False, lines=1) | |
| # convert_btn wired below after all tabs | |
| # ββ TAB 2: Models βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π¦ Models"): | |
| gr.Markdown(""" | |
| ### Upload a Custom RVC Model | |
| Provide a **`.zip`** containing: | |
| - **`model.pth`** β weights (required) | |
| - **`model.index`** β FAISS index (optional, improves voice matching) | |
| Built-in models pre-downloaded on startup: **Vestia Zeta v1**, **Vestia Zeta v2**, **Ayunda Risu**, **Gawr Gura**. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| up_zip = gr.File(label="Model ZIP", file_types=[".zip"]) | |
| up_name = gr.Textbox( | |
| label="Model Name", | |
| placeholder="Leave blank to use zip filename", | |
| ) | |
| up_btn = gr.Button("π€ Load Model", variant="primary") | |
| up_status = gr.Textbox(label="Status", interactive=False, lines=2) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Loaded Models") | |
| models_table = gr.Dataframe( | |
| col_count=(1, "fixed"), | |
| value=[[m] for m in _initial_models], | |
| interactive=False, | |
| label="", | |
| ) | |
| refresh_btn = gr.Button("π Refresh") | |
| up_btn.click( | |
| fn=upload_model, | |
| inputs=[up_zip, up_name], | |
| outputs=[up_status, model_dd, models_table], | |
| ) | |
| refresh_btn.click( | |
| fn=refresh_models, | |
| outputs=[models_table, model_dd], | |
| ) | |
| # ββ TAB 3: Logs βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π Logs"): | |
| gr.Markdown("Live log output from the conversion engine. Click **Refresh** to update.") | |
| logs_box = gr.Textbox( | |
| value=get_logs, | |
| label="", | |
| lines=20, | |
| max_lines=20, | |
| interactive=False, | |
| autoscroll=True, | |
| ) | |
| with gr.Row(): | |
| refresh_logs_btn = gr.Button("π Refresh Logs") | |
| clear_logs_btn = gr.Button("ποΈ Clear") | |
| refresh_logs_btn.click(fn=get_logs, outputs=logs_box) | |
| def clear_logs(): | |
| _LOG_BUFFER.clear() | |
| return "" | |
| clear_logs_btn.click(fn=clear_logs, outputs=logs_box) | |
| # ββ TAB 4: Help βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("βΉοΈ Help"): | |
| gr.Markdown(f""" | |
| ## How it works | |
| RVC (Retrieval-Based Voice Conversion) transforms a voice recording to sound | |
| like a target speaker using only that speaker's model file. | |
| --- | |
| ## Quick Guide | |
| 1. Open the **Convert** tab | |
| 2. **Record** via microphone or **upload** an audio file (wav, mp3, flac, ogg β¦) | |
| 3. Choose a **model** β *zetaTest* is pre-loaded automatically | |
| 4. Set **Pitch Shift** if needed (e.g. male β female: try +12 semitones) | |
| 5. Click **π Convert Voice** | |
| --- | |
| ## Pitch Extraction Methods | |
| | Method | Speed | Quality | Best for | | |
| |---|---|---|---| | |
| | **rmvpe** | β‘β‘β‘ | β β β β | General use | | |
| | **fcpe** | β‘β‘ | β β β β | Singing | | |
| | **crepe** | β‘ | β β β β β | Highest quality | | |
| | **crepe-tiny** | β‘β‘ | β β β | Low resource | | |
| --- | |
| ## Advanced Settings | |
| | Setting | Description | | |
| |---|---| | |
| | **Index Rate** | Influence of FAISS index on output timbre (0.75 recommended) | | |
| | **Protect Consonants** | Prevents artefacts on consonants (0.5 = max) | | |
| | **Volume Envelope Mix** | 1 = preserve input loudness Β· 0 = model controls loudness | | |
| | **Noise Reduction** | Removes background noise before conversion | | |
| | **Split Long Audio** | Chunks audio for recordings > 60 s | | |
| | **Autotune** | Snaps pitch to nearest musical note | | |
| --- | |
| **Device:** `{DEVICE_LABEL}` | |
| --- | |
| ## Credits | |
| Engine: [Ultimate RVC](https://github.com/JackismyShephard/ultimate-rvc) | |
| Default model: [zetaTest by megaaziib](https://huggingface.co/megaaziib/my-rvc-models-collection) | |
| """) | |
| # Wire convert button here β after all tabs so logs_box is defined | |
| convert_btn.click( | |
| fn=lambda *a: (*convert(*a), get_logs()), | |
| inputs=[ | |
| inp_mic, inp_file, model_dd, | |
| pitch_sl, f0_radio, | |
| index_rate_sl, protect_sl, vol_env_sl, | |
| clean_cb, clean_sl, | |
| split_cb, autotune_cb, autotune_sl, | |
| fmt_radio, | |
| ], | |
| outputs=[out_status, out_audio, logs_box], | |
| ) | |
| # ββ Launch ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", 7860)), | |
| ) | |