""" RVC Voice Conversion – HuggingFace Space Simple, fast, GPU/CPU auto-detected. """ from __future__ import annotations import logging import os import queue import shutil import sys import tempfile import threading import time import uuid import zipfile from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import torch # ── Path bootstrap ──────────────────────────────────────────────────────────── BASE_DIR = Path(__file__).parent sys.path.insert(0, str(BASE_DIR)) MODELS_DIR = BASE_DIR / "rvc_models" OUTPUT_DIR = BASE_DIR / "outputs" MODELS_DIR.mkdir(exist_ok=True) OUTPUT_DIR.mkdir(exist_ok=True) os.environ.setdefault("URVC_MODELS_DIR", str(MODELS_DIR / "urvc")) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%H:%M:%S", ) for _noisy in ("httpx", "httpcore", "faiss", "faiss.loader", "transformers", "torch"): logging.getLogger(_noisy).setLevel(logging.WARNING) logger = logging.getLogger("rvc_space") # ── CPU threading ───────────────────────────────────────────────────────────── try: _NUM_CORES = len(os.sched_getaffinity(0)) except AttributeError: _NUM_CORES = os.cpu_count() or 1 torch.set_num_threads(_NUM_CORES) torch.set_num_interop_threads(_NUM_CORES) os.environ["OMP_NUM_THREADS"] = str(_NUM_CORES) os.environ["MKL_NUM_THREADS"] = str(_NUM_CORES) os.environ["NUMEXPR_NUM_THREADS"] = str(_NUM_CORES) os.environ["OPENBLAS_NUM_THREADS"] = str(_NUM_CORES) torch.set_float32_matmul_precision("high") torch.backends.mkldnn.enabled = True logger.info("CPU threads: %d | matmul: high | oneDNN: enabled", _NUM_CORES) # ── Device ──────────────────────────────────────────────────────────────────── if torch.cuda.is_available(): DEVICE = "cuda" DEVICE_LABEL = f"🟢 GPU · {torch.cuda.get_device_name(0)}" else: DEVICE = "cpu" DEVICE_LABEL = f"🔵 CPU · {_NUM_CORES} cores" logger.info("Device: %s", DEVICE_LABEL) # ── Built-in models ─────────────────────────────────────────────────────────── BUILTIN_MODELS = [ { "name": "Vestia Zeta v1", "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zeta.zip", }, { "name": "Vestia Zeta v2", "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zetaTest.zip", }, { "name": "Ayunda Risu", "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/risu.zip", }, { "name": "Gawr Gura", "url": "https://huggingface.co/Gigrig/GigrigRVC/resolve/41d46f087b9c7d70b93acf100f1cb9f7d25f3831/GawrGura_RVC_v2_Ov2Super_e275_s64075.zip", }, ] # Max input duration in seconds (warn user beyond this) MAX_INPUT_DURATION = 300 # 5 minutes # Output file TTL — delete files older than this on each conversion OUTPUT_TTL_SECONDS = 21600 # 1 hour # Max jobs to keep in memory MAX_JOBS = 50 # ── Lazy VoiceConverter ─────────────────────────────────────────────────────── _vc_instance = None def _get_vc(): global _vc_instance if _vc_instance is None: logger.info("Loading VoiceConverter…") from ultimate_rvc.rvc.infer.infer import VoiceConverter _vc_instance = VoiceConverter() logger.info("VoiceConverter ready.") return _vc_instance # ── Output file cleanup ─────────────────────────────────────────────────────── def _cleanup_old_outputs() -> None: """Delete output files older than OUTPUT_TTL_SECONDS.""" now = time.time() for f in OUTPUT_DIR.iterdir(): if f.is_file() and (now - f.stat().st_mtime) > OUTPUT_TTL_SECONDS: try: f.unlink() logger.info("Cleaned up old output: %s", f.name) except Exception: pass # ── Model helpers ───────────────────────────────────────────────────────────── def list_models() -> list[str]: if not MODELS_DIR.exists(): return [] return sorted(p.name for p in MODELS_DIR.iterdir() if p.is_dir() and list(p.glob("*.pth"))) def _pth_and_index(name: str) -> tuple[str, str]: d = MODELS_DIR / name pths = list(d.glob("*.pth")) idxs = list(d.glob("*.index")) if not pths: raise FileNotFoundError(f"No .pth file found in model '{name}'") return str(pths[0]), str(idxs[0]) if idxs else "" def _extract_zip(zip_path: str | Path, dest_name: str) -> None: dest = MODELS_DIR / dest_name dest.mkdir(exist_ok=True) with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(dest) for nested in list(dest.rglob("*.pth")) + list(dest.rglob("*.index")): target = dest / nested.name if nested != target: shutil.move(str(nested), str(target)) def _download_file(url: str, dest: Path) -> None: """Download a single file if not already present.""" if dest.exists(): return dest.parent.mkdir(parents=True, exist_ok=True) logger.info("Downloading %s …", dest.name) import requests r = requests.get(url, stream=True, timeout=300) r.raise_for_status() with tempfile.NamedTemporaryFile(delete=False, dir=dest.parent, suffix=".tmp") as tmp: for chunk in r.iter_content(8192): tmp.write(chunk) tmp_path = tmp.name os.replace(tmp_path, dest) logger.info("%s ready.", dest.name) def _download_model_entry(model: dict) -> str: """Download a single built-in model zip. Returns model name.""" import requests name = model["name"] dest = MODELS_DIR / name if dest.exists() and list(dest.glob("*.pth")): logger.info("Model already present: %s", name) return name logger.info("Downloading model: %s …", name) with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: r = requests.get(model["url"], stream=True, timeout=300) r.raise_for_status() for chunk in r.iter_content(8192): tmp.write(chunk) tmp_path = tmp.name _extract_zip(tmp_path, name) os.unlink(tmp_path) logger.info("Model ready: %s", name) return name def _startup_downloads() -> str: """ Download all required assets in parallel at startup. Returns name of first built-in model as the default selection. """ import requests # noqa: F401 — ensure available before threads # Build task list: predictors + embedders + models all in one pool predictor_base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/predictors" embedder_base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/embedders" predictors_dir = MODELS_DIR / "urvc" / "rvc" / "predictors" embedders_dir = MODELS_DIR / "urvc" / "rvc" / "embedders" file_tasks = [ (f"{predictor_base}/rmvpe.pt", predictors_dir / "rmvpe.pt"), (f"{predictor_base}/fcpe.pt", predictors_dir / "fcpe.pt"), (f"{embedder_base}/contentvec/pytorch_model.bin", embedders_dir / "contentvec" / "pytorch_model.bin"), (f"{embedder_base}/contentvec/config.json", embedders_dir / "contentvec" / "config.json"), ] with ThreadPoolExecutor(max_workers=8) as pool: # Submit file downloads file_futures = {pool.submit(_download_file, url, dest): dest.name for url, dest in file_tasks} # Submit model downloads model_futures = {pool.submit(_download_model_entry, m): m["name"] for m in BUILTIN_MODELS} all_futures = {**file_futures, **model_futures} for future in as_completed(all_futures): try: future.result() except Exception as exc: logger.warning("Download failed (%s): %s", all_futures[future], exc) return BUILTIN_MODELS[0]["name"] # ── Upload handler ──────────────────────────────────────────────────────────── def upload_model(zip_file, model_name): import gradio as gr if not zip_file: return "⚠️ No file provided.", gr.update(), gr.update() name = (model_name or "").strip() or Path(zip_file).stem try: _extract_zip(zip_file, name) models = list_models() return ( f"✅ Model **{name}** loaded successfully.", gr.update(choices=models, value=name), gr.update(value=[[m] for m in models]), ) except Exception as exc: logger.exception("Model upload failed") return f"❌ Error: {exc}", gr.update(), gr.update() # ── Refresh handler ─────────────────────────────────────────────────────────── def refresh_models(): import gradio as gr models = list_models() return gr.update(value=[[m] for m in models]), gr.update(choices=models) # ── Autotune visibility toggle ──────────────────────────────────────────────── def toggle_autotune(enabled): import gradio as gr return gr.update(visible=enabled) # ── ffmpeg is pre-installed on HuggingFace Spaces ──────────────────────────── def _ffmpeg_bin() -> str: return "ffmpeg" # ── Reverb effect via pedalboard ───────────────────────────────────────────── def _apply_reverb(audio_path: str, room_size: float, damping: float, wet_level: float) -> None: """Apply reverb in-place to a WAV file using pedalboard.""" try: from pedalboard import Pedalboard, Reverb from pedalboard.io import AudioFile import tempfile, shutil tmp = audio_path + ".reverb.tmp.wav" board = Pedalboard([ Reverb( room_size=room_size, damping=damping, wet_level=wet_level, dry_level=1.0 - wet_level, width=1.0, ) ]) with AudioFile(audio_path) as f: with AudioFile(tmp, "w", f.samplerate, f.num_channels) as out: while f.tell() < f.frames: chunk = f.read(f.samplerate) out.write(board(chunk, f.samplerate, reset=False)) shutil.move(tmp, audio_path) logger.info("Reverb applied (room=%.2f, damp=%.2f, wet=%.2f)", room_size, damping, wet_level) except Exception as exc: logger.warning("Reverb failed: %s", exc) # ── Upload to temp.sh ──────────────────────────────────────────────────────── def _upload_to_tempsh(file_path: str) -> str | None: """Upload a file to temp.sh and return the download URL, or None on failure.""" try: import subprocess result = subprocess.run( ["curl", "-s", "-F", f"file=@{file_path}", "https://temp.sh/upload"], capture_output=True, text=True, timeout=120, ) url = result.stdout.strip() if url.startswith("https://"): logger.info("Uploaded to temp.sh: %s", url) return url else: logger.warning("temp.sh upload failed: %s", result.stdout or result.stderr) return None except Exception as exc: logger.warning("temp.sh upload error: %s", exc) return None # ── Background job queue ───────────────────────────────────────────────────── _job_queue: queue.Queue = queue.Queue() # Job status store: job_id -> {"status": str, "url": str|None, "model": str} _jobs: dict[str, dict] = {} _jobs_lock = threading.Lock() def _worker() -> None: """Single background worker — processes one job at a time from the queue.""" while True: job = _job_queue.get() job_id = job["id"] try: _start_time = time.time() with _jobs_lock: _jobs[job_id]["status"] = "⏳ Converting…" logger.info("[Job %s] Starting conversion (model: %s)", job_id, job["model_name"]) model_path, index_path = _pth_and_index(job["model_name"]) _cleanup_old_outputs() is_opus = job["output_format"].upper() == "OPUS" engine_format = "WAV" if is_opus else job["output_format"] ts = int(time.time()) wav_path = OUTPUT_DIR / f"output-{ts}.wav" out_path = OUTPUT_DIR / ( f"output-{ts}.opus" if is_opus else f"output-{ts}.{job['output_format'].lower()}" ) vc = _get_vc() vc.convert_audio( audio_input_path=job["audio_input"], audio_output_path=str(wav_path), model_path=model_path, index_path=index_path, pitch=job["pitch"], f0_method=job["f0_method"], index_rate=job["index_rate"], volume_envelope=job["volume_envelope"], protect=job["protect"], split_audio=job["split_audio"], f0_autotune=job["autotune"], f0_autotune_strength=job["autotune_strength"], clean_audio=job["clean_audio"], clean_strength=job["clean_strength"], export_format=engine_format, filter_radius=job["filter_radius"], ) if is_opus: import subprocess subprocess.run( [ _ffmpeg_bin(), "-y", "-i", str(wav_path), "-c:a", "libopus", "-b:a", "64000", "-vbr", "off", "-ar", "48000", str(out_path), ], check=True, capture_output=True, ) wav_path.unlink(missing_ok=True) # Apply reverb if enabled (operates on the final output file) if job.get("reverb"): _apply_reverb( str(out_path), room_size=job.get("reverb_room_size", 0.15), damping=job.get("reverb_damping", 0.7), wet_level=job.get("reverb_wet_level", 0.15), ) # Upload to temp.sh temp_url = _upload_to_tempsh(str(out_path)) _elapsed = time.time() - _start_time _elapsed_str = f"{_elapsed:.0f}s" if _elapsed < 60 else f"{_elapsed/60:.1f}m" with _jobs_lock: _jobs[job_id]["elapsed"] = _elapsed_str if temp_url: _jobs[job_id]["status"] = "✅ Done" _jobs[job_id]["url"] = temp_url _jobs[job_id]["file"] = str(out_path) logger.info("[Job %s] Complete in %s → %s", job_id, _elapsed_str, temp_url) else: _jobs[job_id]["status"] = "✅ Done" _jobs[job_id]["file"] = str(out_path) logger.info("[Job %s] Complete in %s (no temp.sh URL)", job_id, _elapsed_str) except Exception as exc: _elapsed = time.time() - _start_time if "_start_time" in dir() else 0 _elapsed_str = f"{_elapsed:.0f}s" if _elapsed < 60 else f"{_elapsed/60:.1f}m" logger.exception("[Job %s] Failed after %s: %s", job_id, _elapsed_str, exc) with _jobs_lock: _jobs[job_id]["status"] = f"❌ Failed" _jobs[job_id]["elapsed"] = _elapsed_str _jobs[job_id]["file"] = None finally: _job_queue.task_done() # Start the single background worker thread _worker_thread = threading.Thread(target=_worker, daemon=True) _worker_thread.start() logger.info("Background worker started.") # ── Conversion ──────────────────────────────────────────────────────────────── def convert( audio_mic, audio_file, model_name, pitch, f0_method, index_rate, protect, volume_envelope, clean_audio, clean_strength, split_audio, autotune, autotune_strength, filter_radius, output_format, reverb=False, reverb_room_size=0.15, reverb_damping=0.7, reverb_wet_level=0.15, ): """Submit a job to the background worker and return immediately.""" audio_input = audio_mic or audio_file if audio_input is None: return "⚠️ Please record or upload audio first.", None if not model_name: return "⚠️ No model selected.", None # Check input duration upfront before queuing try: import soundfile as sf info = sf.info(audio_input) duration = info.duration if duration > MAX_INPUT_DURATION: return ( f"⚠️ Audio is {duration:.0f}s — max is {MAX_INPUT_DURATION//60} min. " f"Please trim your audio.", None ) logger.info("Input duration: %.1fs", duration) except Exception: pass # Validate model exists before queuing try: _pth_and_index(model_name) except FileNotFoundError as exc: return f"❌ {exc}", None job_id = uuid.uuid4().hex[:8] job = { "id": job_id, "audio_input": audio_input, "model_name": model_name, "pitch": pitch, "f0_method": f0_method, "index_rate": index_rate, "volume_envelope": volume_envelope, "protect": protect, "split_audio": split_audio, "autotune": autotune, "autotune_strength": autotune_strength, "clean_audio": clean_audio, "clean_strength": clean_strength, "filter_radius": filter_radius, "output_format": output_format, "reverb": reverb, "reverb_room_size": reverb_room_size, "reverb_damping": reverb_damping, "reverb_wet_level": reverb_wet_level, } with _jobs_lock: if len(_jobs) >= MAX_JOBS: oldest = next(iter(_jobs)) del _jobs[oldest] logger.info("Removed oldest job %s (limit: %d)", oldest, MAX_JOBS) _jobs[job_id] = {"status": "🕐 Queued…", "url": None, "file": None, "model": model_name} _job_queue.put(job) queue_size = _job_queue.qsize() logger.info("[Job %s] Queued (model: %s, queue depth: %d)", job_id, model_name, queue_size) msg = ( "🕐 Job **" + job_id + "** queued — you can close this tab.\n\n" "Check the **📋 Jobs** tab for your download link when done.\n\n" "_(Queue position: " + str(queue_size) + ")_" ) return msg, None def poll_job(job_id: str) -> tuple[str, str | None]: """Check status of a submitted job. Returns (status_msg, file_path_or_None).""" with _jobs_lock: job = _jobs.get(job_id) if not job: return f"❌ Job {job_id} not found.", None status = job["status"] url = job.get("url") file = job.get("file") if url: return f"{status} · 🔗 [Download link]({url}) · _(expires in 3 days)_", file return status, file # ── Startup ─────────────────────────────────────────────────────────────────── _startup_status = "" _default_model = "" try: _default_model = _startup_downloads() _startup_status = f"✅ Ready  ·  {DEVICE_LABEL}" except Exception as _e: _startup_status = f"⚠️ Some assets unavailable: {_e}  ·  {DEVICE_LABEL}" logger.warning("Startup download issue: %s", _e) _initial_models = list_models() _initial_value = _default_model if _default_model in _initial_models else ( _initial_models[0] if _initial_models else None ) # ── Log helpers ─────────────────────────────────────────────────────────────── def get_jobs_table() -> list[list]: """Return job list as rows: [ID, Model, Status, Time, Download Link].""" with _jobs_lock: jobs = list(_jobs.items()) if not jobs: return [["—", "—", "No jobs yet", "—", "—"]] rows = [] for job_id, info in reversed(jobs): url = info.get("url") link = f"[⬇️]({url})" if url else "—" rows.append([ job_id, info.get("model", ""), info.get("status", ""), info.get("elapsed", "—"), link, ]) return rows def get_queue_info() -> str: """Return a short queue status string.""" qs = _job_queue.qsize() total = len(_jobs) running = sum(1 for j in _jobs.values() if j.get("status", "").startswith("⏳")) done = sum(1 for j in _jobs.values() if j.get("status", "").startswith("✅")) failed = sum(1 for j in _jobs.values() if j.get("status", "").startswith("❌")) return ( f"**Queue:** {qs} waiting · " f"**Running:** {running} · " f"**Done:** {done} · " f"**Failed:** {failed} · " f"**Total:** {total}" ) # ── Gradio UI ───────────────────────────────────────────────────────────────── import gradio as gr _CSS = """ #header { text-align: center; padding: 20px 0 8px; } #header h1 { font-size: 2rem; margin: 0; } #header p { opacity: .65; margin: 4px 0 0; } #status { text-align: center; font-size: .82rem; opacity: .7; margin-bottom: 8px; } footer { display: none !important; } """ with gr.Blocks(title="RVC Voice Conversion", delete_cache=(3600, 3600)) as demo: gr.HTML(f"""

{_startup_status}

""") with gr.Tabs(): # ── TAB 1: Convert ──────────────────────────────────────────────────── with gr.Tab("🎤 Convert"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 🔊 Input Audio") with gr.Tabs(): with gr.Tab("🎙️ Microphone"): inp_mic = gr.Audio( sources=["microphone"], type="filepath", label="Record", ) with gr.Tab("📁 Upload File"): inp_file = gr.Audio( sources=["upload"], type="filepath", label="Upload audio (wav / mp3 / flac / ogg …)", ) gr.Markdown("### 🤖 Model") model_dd = gr.Dropdown( choices=_initial_models, value=_initial_value, label="Active Voice Model", interactive=True, ) gr.Markdown("### 🎚️ Basic Settings") pitch_sl = gr.Slider( minimum=-24, maximum=24, value=0, step=1, label="Pitch Shift (semitones)", info="0 = unchanged · positive = higher · negative = lower", ) f0_radio = gr.Radio( choices=["rmvpe", "fcpe", "crepe", "crepe-tiny"], value="rmvpe", label="Pitch Extraction Method", info="rmvpe = fastest & accurate · crepe = highest quality (slower)", ) with gr.Column(scale=1): gr.Markdown("### ⚙️ Advanced Settings") with gr.Accordion("Expand advanced options", open=False): index_rate_sl = gr.Slider( 0.0, 1.0, value=0.75, step=0.05, label="Index Rate", info="How strongly the FAISS index influences timbre (0 = off)", ) protect_sl = gr.Slider( 0.0, 0.5, value=0.5, step=0.01, label="Protect Consonants", info="Protects unvoiced consonants — 0.5 = max protection", ) filter_radius_sl = gr.Slider( 0, 7, value=3, step=1, label="Respiration Filter Radius", info="Median filter on pitch — higher = smoother, reduces breath noise", ) vol_env_sl = gr.Slider( 0.0, 1.0, value=0.25, step=0.05, label="Volume Envelope Mix", info="0.25 = natural blend · 1 = preserve input loudness · 0 = model output", ) with gr.Row(): clean_cb = gr.Checkbox(value=False, label="Noise Reduction") clean_sl = gr.Slider( 0.0, 1.0, value=0.5, step=0.05, label="Reduction Strength", ) with gr.Row(): split_cb = gr.Checkbox(value=False, label="Split Long Audio") autotune_cb = gr.Checkbox(value=False, label="Autotune") autotune_sl = gr.Slider( 0.0, 1.0, value=1.0, step=0.05, label="Autotune Strength", visible=False, ) autotune_cb.change( fn=toggle_autotune, inputs=autotune_cb, outputs=autotune_sl, ) gr.Markdown("**🎛️ Reverb**") reverb_cb = gr.Checkbox(value=False, label="Enable Reverb") with gr.Group(visible=False) as reverb_group: reverb_room_sl = gr.Slider( 0.0, 1.0, value=0.15, step=0.05, label="Room Size", info="Larger = bigger sounding space", ) reverb_damp_sl = gr.Slider( 0.0, 1.0, value=0.7, step=0.05, label="Damping", info="Higher = more absorption, less echo tail", ) reverb_wet_sl = gr.Slider( 0.0, 1.0, value=0.15, step=0.05, label="Wet Level", info="How much reverb is mixed in (0.15 = subtle)", ) reverb_cb.change( fn=lambda v: gr.update(visible=v), inputs=reverb_cb, outputs=reverb_group, ) fmt_radio = gr.Radio( choices=["WAV", "MP3", "FLAC", "OPUS"], value="WAV", label="Output Format", info="OPUS = small file (~64 kbps, Telegram/Discord quality)", ) convert_btn = gr.Button( "🚀 Convert Voice", variant="primary", ) gr.Markdown("### 🎧 Output") out_status = gr.Markdown(value="") out_audio = gr.Audio(label="Result (if still on page)", type="filepath", interactive=False) gr.Markdown("#### 🔍 Check Job Status") with gr.Row(): job_id_box = gr.Textbox( label="Job ID", placeholder="e.g. a3f2b1c9", scale=3, ) poll_btn = gr.Button("🔄 Check", scale=1) poll_status = gr.Markdown(value="") poll_audio = gr.Audio(label="Result", type="filepath", interactive=False) # ── TAB 2: Models ───────────────────────────────────────────────────── with gr.Tab("📦 Models"): gr.Markdown(""" ### Upload a Custom RVC Model Provide a **`.zip`** containing: - **`model.pth`** — weights (required) - **`model.index`** — FAISS index (optional, improves voice matching) **Built-in models** (pre-downloaded on startup): Vestia Zeta v1 · Vestia Zeta v2 · Ayunda Risu · Gawr Gura """) with gr.Row(): with gr.Column(scale=1): up_zip = gr.File(label="Model ZIP", file_types=[".zip"]) up_name = gr.Textbox( label="Model Name", placeholder="Leave blank to use zip filename", ) up_btn = gr.Button("📤 Load Model", variant="primary") up_status = gr.Textbox(label="Status", interactive=False, lines=2) with gr.Column(scale=1): gr.Markdown("### Loaded Models") models_table = gr.Dataframe( col_count=(1, "fixed"), value=[[m] for m in _initial_models], interactive=False, label="", ) refresh_btn = gr.Button("🔄 Refresh") up_btn.click( fn=upload_model, inputs=[up_zip, up_name], outputs=[up_status, model_dd, models_table], ) refresh_btn.click( fn=refresh_models, outputs=[models_table, model_dd], ) # ── TAB 3: Jobs ─────────────────────────────────────────────────────── with gr.Tab("📋 Jobs"): gr.Markdown("All submitted jobs, newest first. Click **Refresh** to update.") queue_status = gr.Markdown(value=get_queue_info, every=10) jobs_table = gr.Dataframe( headers=["Job ID", "Model", "Status", "Time", "Download"], col_count=(5, "fixed"), value=get_jobs_table, interactive=False, wrap=True, datatype=["str", "str", "str", "str", "markdown"], every=10, ) refresh_jobs_btn = gr.Button("🔄 Refresh") def _refresh_jobs(): return get_queue_info(), get_jobs_table() refresh_jobs_btn.click(fn=_refresh_jobs, outputs=[queue_status, jobs_table]) # ── TAB 4: Help ─────────────────────────────────────────────────────── with gr.Tab("ℹ️ Help"): gr.Markdown(f""" ## How it works RVC (Retrieval-Based Voice Conversion) transforms a voice recording to sound like a target speaker using only that speaker's model file. --- ## Quick Guide 1. Open the **Convert** tab 2. **Record** via microphone or **upload** an audio file (wav, mp3, flac, ogg …) 3. Choose a **model** from the dropdown — 4 models are pre-loaded on startup 4. Set **Pitch Shift** if needed (e.g. male → female: try +12 semitones) 5. Click **🚀 Convert Voice** and wait for the result --- ## Built-in Models | Model | Description | |---|---| | **Vestia Zeta v1** | Hololive ID VTuber, v1 model | | **Vestia Zeta v2** | Hololive ID VTuber, v2 model (recommended) | | **Ayunda Risu** | Hololive ID VTuber | | **Gawr Gura** | Hololive EN VTuber | --- ## Pitch Extraction Methods | Method | Speed | Quality | Best for | |---|---|---|---| | **rmvpe** | ⚡⚡⚡ | ★★★★ | General use (default) | | **fcpe** | ⚡⚡ | ★★★★ | Singing | | **crepe** | ⚡ | ★★★★★ | Highest quality, slow | | **crepe-tiny** | ⚡⚡ | ★★★ | Low resource | --- ## Advanced Settings | Setting | Description | |---|---| | **Index Rate** | Influence of FAISS index on output timbre (0.75 recommended) | | **Protect Consonants** | Prevents artefacts on consonants (0.5 = max) | | **Respiration Filter Radius** | Smooths pitch curve — higher reduces breath noise (0–7, default 3) | | **Volume Envelope Mix** | 0.25 = natural blend · 1 = preserve input loudness | | **Noise Reduction** | Removes background noise before conversion | | **Split Long Audio** | Chunks audio for recordings > 60 s | | **Autotune** | Snaps pitch to nearest musical note | --- ## Output Formats | Format | Size | Quality | |---|---|---| | **WAV** | Large | Lossless | | **FLAC** | Medium | Lossless compressed | | **MP3** | Small | Lossy | | **OPUS** | Tiny (~64 kbps) | Telegram/Discord quality | --- **Device:** `{DEVICE_LABEL}` **Max input duration:** {MAX_INPUT_DURATION // 60} minutes --- ## Credits Engine: [Ultimate RVC](https://github.com/JackismyShephard/ultimate-rvc) """) # Wire convert button after all tabs so jobs_table is defined def _submit_and_extract_id(*args): status, audio = convert(*args) import re match = re.search(r"[a-f0-9]{8}", status or "") job_id = match.group(0) if match else "" return status, audio, job_id, get_queue_info(), get_jobs_table() convert_btn.click( fn=_submit_and_extract_id, inputs=[ inp_mic, inp_file, model_dd, pitch_sl, f0_radio, index_rate_sl, protect_sl, vol_env_sl, clean_cb, clean_sl, split_cb, autotune_cb, autotune_sl, filter_radius_sl, fmt_radio, reverb_cb, reverb_room_sl, reverb_damp_sl, reverb_wet_sl, ], outputs=[out_status, out_audio, job_id_box, queue_status, jobs_table], ) def _poll_and_refresh(job_id): status, file = poll_job(job_id) return status, file, get_queue_info(), get_jobs_table() poll_btn.click( fn=_poll_and_refresh, inputs=[job_id_box], outputs=[poll_status, poll_audio, queue_status, jobs_table], ) # ── Launch ──────────────────────────────────────────────────────────────────── if __name__ == "__main__": demo.queue(default_concurrency_limit=5) demo.launch( server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860)), max_threads=10, ssr_mode=False, css=_CSS, )