Spaces:
Running
Running
| """ | |
| RVC Voice Conversion β HuggingFace Space | |
| Simple, fast, GPU/CPU auto-detected. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import queue | |
| import shutil | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import uuid | |
| import zipfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| import torch | |
| # ββ Path bootstrap ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent | |
| sys.path.insert(0, str(BASE_DIR)) | |
| MODELS_DIR = BASE_DIR / "rvc_models" | |
| OUTPUT_DIR = BASE_DIR / "outputs" | |
| MODELS_DIR.mkdir(exist_ok=True) | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| os.environ.setdefault("URVC_MODELS_DIR", str(MODELS_DIR / "urvc")) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| for _noisy in ("httpx", "httpcore", "faiss", "faiss.loader", "transformers", "torch"): | |
| logging.getLogger(_noisy).setLevel(logging.WARNING) | |
| logger = logging.getLogger("rvc_space") | |
| # ββ CPU threading βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| _NUM_CORES = len(os.sched_getaffinity(0)) | |
| except AttributeError: | |
| _NUM_CORES = os.cpu_count() or 1 | |
| torch.set_num_threads(_NUM_CORES) | |
| torch.set_num_interop_threads(_NUM_CORES) | |
| os.environ["OMP_NUM_THREADS"] = str(_NUM_CORES) | |
| os.environ["MKL_NUM_THREADS"] = str(_NUM_CORES) | |
| os.environ["NUMEXPR_NUM_THREADS"] = str(_NUM_CORES) | |
| os.environ["OPENBLAS_NUM_THREADS"] = str(_NUM_CORES) | |
| torch.set_float32_matmul_precision("high") | |
| torch.backends.mkldnn.enabled = True | |
| logger.info("CPU threads: %d | matmul: high | oneDNN: enabled", _NUM_CORES) | |
| # ββ Device ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if torch.cuda.is_available(): | |
| DEVICE = "cuda" | |
| DEVICE_LABEL = f"π’ GPU Β· {torch.cuda.get_device_name(0)}" | |
| else: | |
| DEVICE = "cpu" | |
| DEVICE_LABEL = f"π΅ CPU Β· {_NUM_CORES} cores" | |
| logger.info("Device: %s", DEVICE_LABEL) | |
| # ββ Built-in models βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BUILTIN_MODELS = [ | |
| { | |
| "name": "Vestia Zeta v1", | |
| "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zeta.zip", | |
| }, | |
| { | |
| "name": "Vestia Zeta v2", | |
| "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zetaTest.zip", | |
| }, | |
| { | |
| "name": "Ayunda Risu", | |
| "url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/risu.zip", | |
| }, | |
| { | |
| "name": "Gawr Gura", | |
| "url": "https://huggingface.co/Gigrig/GigrigRVC/resolve/41d46f087b9c7d70b93acf100f1cb9f7d25f3831/GawrGura_RVC_v2_Ov2Super_e275_s64075.zip", | |
| }, | |
| ] | |
| # Max input duration in seconds (warn user beyond this) | |
| MAX_INPUT_DURATION = 300 # 5 minutes | |
| # Output file TTL β delete files older than this on each conversion | |
| OUTPUT_TTL_SECONDS = 21600 # 1 hour | |
| # Max jobs to keep in memory | |
| MAX_JOBS = 50 | |
| # ββ Lazy VoiceConverter βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _vc_instance = None | |
| def _get_vc(): | |
| global _vc_instance | |
| if _vc_instance is None: | |
| logger.info("Loading VoiceConverterβ¦") | |
| from ultimate_rvc.rvc.infer.infer import VoiceConverter | |
| _vc_instance = VoiceConverter() | |
| logger.info("VoiceConverter ready.") | |
| return _vc_instance | |
| # ββ Output file cleanup βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cleanup_old_outputs() -> None: | |
| """Delete output files older than OUTPUT_TTL_SECONDS.""" | |
| now = time.time() | |
| for f in OUTPUT_DIR.iterdir(): | |
| if f.is_file() and (now - f.stat().st_mtime) > OUTPUT_TTL_SECONDS: | |
| try: | |
| f.unlink() | |
| logger.info("Cleaned up old output: %s", f.name) | |
| except Exception: | |
| pass | |
| # ββ Model helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_models() -> list[str]: | |
| if not MODELS_DIR.exists(): | |
| return [] | |
| return sorted(p.name for p in MODELS_DIR.iterdir() | |
| if p.is_dir() and list(p.glob("*.pth"))) | |
| def _pth_and_index(name: str) -> tuple[str, str]: | |
| d = MODELS_DIR / name | |
| pths = list(d.glob("*.pth")) | |
| idxs = list(d.glob("*.index")) | |
| if not pths: | |
| raise FileNotFoundError(f"No .pth file found in model '{name}'") | |
| return str(pths[0]), str(idxs[0]) if idxs else "" | |
| def _extract_zip(zip_path: str | Path, dest_name: str) -> None: | |
| dest = MODELS_DIR / dest_name | |
| dest.mkdir(exist_ok=True) | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| zf.extractall(dest) | |
| for nested in list(dest.rglob("*.pth")) + list(dest.rglob("*.index")): | |
| target = dest / nested.name | |
| if nested != target: | |
| shutil.move(str(nested), str(target)) | |
| def _download_file(url: str, dest: Path) -> None: | |
| """Download a single file if not already present.""" | |
| if dest.exists(): | |
| return | |
| dest.parent.mkdir(parents=True, exist_ok=True) | |
| logger.info("Downloading %s β¦", dest.name) | |
| import requests | |
| r = requests.get(url, stream=True, timeout=300) | |
| r.raise_for_status() | |
| with tempfile.NamedTemporaryFile(delete=False, dir=dest.parent, suffix=".tmp") as tmp: | |
| for chunk in r.iter_content(8192): | |
| tmp.write(chunk) | |
| tmp_path = tmp.name | |
| os.replace(tmp_path, dest) | |
| logger.info("%s ready.", dest.name) | |
| def _download_model_entry(model: dict) -> str: | |
| """Download a single built-in model zip. Returns model name.""" | |
| import requests | |
| name = model["name"] | |
| dest = MODELS_DIR / name | |
| if dest.exists() and list(dest.glob("*.pth")): | |
| logger.info("Model already present: %s", name) | |
| return name | |
| logger.info("Downloading model: %s β¦", name) | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: | |
| r = requests.get(model["url"], stream=True, timeout=300) | |
| r.raise_for_status() | |
| for chunk in r.iter_content(8192): | |
| tmp.write(chunk) | |
| tmp_path = tmp.name | |
| _extract_zip(tmp_path, name) | |
| os.unlink(tmp_path) | |
| logger.info("Model ready: %s", name) | |
| return name | |
| def _startup_downloads() -> str: | |
| """ | |
| Download all required assets in parallel at startup. | |
| Returns name of first built-in model as the default selection. | |
| """ | |
| import requests # noqa: F401 β ensure available before threads | |
| # Build task list: predictors + embedders + models all in one pool | |
| predictor_base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/predictors" | |
| embedder_base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/embedders" | |
| predictors_dir = MODELS_DIR / "urvc" / "rvc" / "predictors" | |
| embedders_dir = MODELS_DIR / "urvc" / "rvc" / "embedders" | |
| file_tasks = [ | |
| (f"{predictor_base}/rmvpe.pt", predictors_dir / "rmvpe.pt"), | |
| (f"{predictor_base}/fcpe.pt", predictors_dir / "fcpe.pt"), | |
| (f"{embedder_base}/contentvec/pytorch_model.bin", embedders_dir / "contentvec" / "pytorch_model.bin"), | |
| (f"{embedder_base}/contentvec/config.json", embedders_dir / "contentvec" / "config.json"), | |
| ] | |
| with ThreadPoolExecutor(max_workers=8) as pool: | |
| # Submit file downloads | |
| file_futures = {pool.submit(_download_file, url, dest): dest.name | |
| for url, dest in file_tasks} | |
| # Submit model downloads | |
| model_futures = {pool.submit(_download_model_entry, m): m["name"] | |
| for m in BUILTIN_MODELS} | |
| all_futures = {**file_futures, **model_futures} | |
| for future in as_completed(all_futures): | |
| try: | |
| future.result() | |
| except Exception as exc: | |
| logger.warning("Download failed (%s): %s", all_futures[future], exc) | |
| return BUILTIN_MODELS[0]["name"] | |
| # ββ Upload handler ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def upload_model(zip_file, model_name): | |
| import gradio as gr | |
| if not zip_file: | |
| return "β οΈ No file provided.", gr.update(), gr.update() | |
| name = (model_name or "").strip() or Path(zip_file).stem | |
| try: | |
| _extract_zip(zip_file, name) | |
| models = list_models() | |
| return ( | |
| f"β Model **{name}** loaded successfully.", | |
| gr.update(choices=models, value=name), | |
| gr.update(value=[[m] for m in models]), | |
| ) | |
| except Exception as exc: | |
| logger.exception("Model upload failed") | |
| return f"β Error: {exc}", gr.update(), gr.update() | |
| # ββ Refresh handler βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def refresh_models(): | |
| import gradio as gr | |
| models = list_models() | |
| return gr.update(value=[[m] for m in models]), gr.update(choices=models) | |
| # ββ Autotune visibility toggle ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def toggle_autotune(enabled): | |
| import gradio as gr | |
| return gr.update(visible=enabled) | |
| # ββ ffmpeg is pre-installed on HuggingFace Spaces ββββββββββββββββββββββββββββ | |
| def _ffmpeg_bin() -> str: | |
| return "ffmpeg" | |
| # ββ Reverb effect via pedalboard βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _apply_reverb(audio_path: str, room_size: float, damping: float, wet_level: float) -> None: | |
| """Apply reverb in-place to a WAV file using pedalboard.""" | |
| try: | |
| from pedalboard import Pedalboard, Reverb | |
| from pedalboard.io import AudioFile | |
| import tempfile, shutil | |
| tmp = audio_path + ".reverb.tmp.wav" | |
| board = Pedalboard([ | |
| Reverb( | |
| room_size=room_size, | |
| damping=damping, | |
| wet_level=wet_level, | |
| dry_level=1.0 - wet_level, | |
| width=1.0, | |
| ) | |
| ]) | |
| with AudioFile(audio_path) as f: | |
| with AudioFile(tmp, "w", f.samplerate, f.num_channels) as out: | |
| while f.tell() < f.frames: | |
| chunk = f.read(f.samplerate) | |
| out.write(board(chunk, f.samplerate, reset=False)) | |
| shutil.move(tmp, audio_path) | |
| logger.info("Reverb applied (room=%.2f, damp=%.2f, wet=%.2f)", room_size, damping, wet_level) | |
| except Exception as exc: | |
| logger.warning("Reverb failed: %s", exc) | |
| # ββ Upload to temp.sh ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _upload_to_tempsh(file_path: str) -> str | None: | |
| """Upload a file to temp.sh and return the download URL, or None on failure.""" | |
| try: | |
| import subprocess | |
| result = subprocess.run( | |
| ["curl", "-s", "-F", f"file=@{file_path}", "https://temp.sh/upload"], | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| ) | |
| url = result.stdout.strip() | |
| if url.startswith("https://"): | |
| logger.info("Uploaded to temp.sh: %s", url) | |
| return url | |
| else: | |
| logger.warning("temp.sh upload failed: %s", result.stdout or result.stderr) | |
| return None | |
| except Exception as exc: | |
| logger.warning("temp.sh upload error: %s", exc) | |
| return None | |
| # ββ Background job queue βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _job_queue: queue.Queue = queue.Queue() | |
| # Job status store: job_id -> {"status": str, "url": str|None, "model": str} | |
| _jobs: dict[str, dict] = {} | |
| _jobs_lock = threading.Lock() | |
| def _worker() -> None: | |
| """Single background worker β processes one job at a time from the queue.""" | |
| while True: | |
| job = _job_queue.get() | |
| job_id = job["id"] | |
| try: | |
| _start_time = time.time() | |
| with _jobs_lock: | |
| _jobs[job_id]["status"] = "β³ Convertingβ¦" | |
| logger.info("[Job %s] Starting conversion (model: %s)", job_id, job["model_name"]) | |
| model_path, index_path = _pth_and_index(job["model_name"]) | |
| _cleanup_old_outputs() | |
| is_opus = job["output_format"].upper() == "OPUS" | |
| engine_format = "WAV" if is_opus else job["output_format"] | |
| ts = int(time.time()) | |
| wav_path = OUTPUT_DIR / f"output-{ts}.wav" | |
| out_path = OUTPUT_DIR / ( | |
| f"output-{ts}.opus" if is_opus | |
| else f"output-{ts}.{job['output_format'].lower()}" | |
| ) | |
| vc = _get_vc() | |
| vc.convert_audio( | |
| audio_input_path=job["audio_input"], | |
| audio_output_path=str(wav_path), | |
| model_path=model_path, | |
| index_path=index_path, | |
| pitch=job["pitch"], | |
| f0_method=job["f0_method"], | |
| index_rate=job["index_rate"], | |
| volume_envelope=job["volume_envelope"], | |
| protect=job["protect"], | |
| split_audio=job["split_audio"], | |
| f0_autotune=job["autotune"], | |
| f0_autotune_strength=job["autotune_strength"], | |
| clean_audio=job["clean_audio"], | |
| clean_strength=job["clean_strength"], | |
| export_format=engine_format, | |
| filter_radius=job["filter_radius"], | |
| ) | |
| if is_opus: | |
| import subprocess | |
| subprocess.run( | |
| [ | |
| _ffmpeg_bin(), "-y", | |
| "-i", str(wav_path), | |
| "-c:a", "libopus", | |
| "-b:a", "64000", | |
| "-vbr", "off", | |
| "-ar", "48000", | |
| str(out_path), | |
| ], | |
| check=True, capture_output=True, | |
| ) | |
| wav_path.unlink(missing_ok=True) | |
| # Apply reverb if enabled (operates on the final output file) | |
| if job.get("reverb"): | |
| _apply_reverb( | |
| str(out_path), | |
| room_size=job.get("reverb_room_size", 0.15), | |
| damping=job.get("reverb_damping", 0.7), | |
| wet_level=job.get("reverb_wet_level", 0.15), | |
| ) | |
| # Upload to temp.sh | |
| temp_url = _upload_to_tempsh(str(out_path)) | |
| _elapsed = time.time() - _start_time | |
| _elapsed_str = f"{_elapsed:.0f}s" if _elapsed < 60 else f"{_elapsed/60:.1f}m" | |
| with _jobs_lock: | |
| _jobs[job_id]["elapsed"] = _elapsed_str | |
| if temp_url: | |
| _jobs[job_id]["status"] = "β Done" | |
| _jobs[job_id]["url"] = temp_url | |
| _jobs[job_id]["file"] = str(out_path) | |
| logger.info("[Job %s] Complete in %s β %s", job_id, _elapsed_str, temp_url) | |
| else: | |
| _jobs[job_id]["status"] = "β Done" | |
| _jobs[job_id]["file"] = str(out_path) | |
| logger.info("[Job %s] Complete in %s (no temp.sh URL)", job_id, _elapsed_str) | |
| except Exception as exc: | |
| _elapsed = time.time() - _start_time if "_start_time" in dir() else 0 | |
| _elapsed_str = f"{_elapsed:.0f}s" if _elapsed < 60 else f"{_elapsed/60:.1f}m" | |
| logger.exception("[Job %s] Failed after %s: %s", job_id, _elapsed_str, exc) | |
| with _jobs_lock: | |
| _jobs[job_id]["status"] = f"β Failed" | |
| _jobs[job_id]["elapsed"] = _elapsed_str | |
| _jobs[job_id]["file"] = None | |
| finally: | |
| _job_queue.task_done() | |
| # Start the single background worker thread | |
| _worker_thread = threading.Thread(target=_worker, daemon=True) | |
| _worker_thread.start() | |
| logger.info("Background worker started.") | |
| # ββ Conversion ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def convert( | |
| audio_mic, audio_file, model_name, | |
| pitch, f0_method, | |
| index_rate, protect, volume_envelope, | |
| clean_audio, clean_strength, | |
| split_audio, autotune, autotune_strength, | |
| filter_radius, | |
| output_format, | |
| reverb=False, | |
| reverb_room_size=0.15, | |
| reverb_damping=0.7, | |
| reverb_wet_level=0.15, | |
| ): | |
| """Submit a job to the background worker and return immediately.""" | |
| audio_input = audio_mic or audio_file | |
| if audio_input is None: | |
| return "β οΈ Please record or upload audio first.", None | |
| if not model_name: | |
| return "β οΈ No model selected.", None | |
| # Check input duration upfront before queuing | |
| try: | |
| import soundfile as sf | |
| info = sf.info(audio_input) | |
| duration = info.duration | |
| if duration > MAX_INPUT_DURATION: | |
| return ( | |
| f"β οΈ Audio is {duration:.0f}s β max is {MAX_INPUT_DURATION//60} min. " | |
| f"Please trim your audio.", None | |
| ) | |
| logger.info("Input duration: %.1fs", duration) | |
| except Exception: | |
| pass | |
| # Validate model exists before queuing | |
| try: | |
| _pth_and_index(model_name) | |
| except FileNotFoundError as exc: | |
| return f"β {exc}", None | |
| job_id = uuid.uuid4().hex[:8] | |
| job = { | |
| "id": job_id, | |
| "audio_input": audio_input, | |
| "model_name": model_name, | |
| "pitch": pitch, | |
| "f0_method": f0_method, | |
| "index_rate": index_rate, | |
| "volume_envelope": volume_envelope, | |
| "protect": protect, | |
| "split_audio": split_audio, | |
| "autotune": autotune, | |
| "autotune_strength": autotune_strength, | |
| "clean_audio": clean_audio, | |
| "clean_strength": clean_strength, | |
| "filter_radius": filter_radius, | |
| "output_format": output_format, | |
| "reverb": reverb, | |
| "reverb_room_size": reverb_room_size, | |
| "reverb_damping": reverb_damping, | |
| "reverb_wet_level": reverb_wet_level, | |
| } | |
| with _jobs_lock: | |
| if len(_jobs) >= MAX_JOBS: | |
| oldest = next(iter(_jobs)) | |
| del _jobs[oldest] | |
| logger.info("Removed oldest job %s (limit: %d)", oldest, MAX_JOBS) | |
| _jobs[job_id] = {"status": "π Queuedβ¦", "url": None, "file": None, "model": model_name} | |
| _job_queue.put(job) | |
| queue_size = _job_queue.qsize() | |
| logger.info("[Job %s] Queued (model: %s, queue depth: %d)", job_id, model_name, queue_size) | |
| msg = ( | |
| "π Job **" + job_id + "** queued β you can close this tab.\n\n" | |
| "Check the **π Jobs** tab for your download link when done.\n\n" | |
| "_(Queue position: " + str(queue_size) + ")_" | |
| ) | |
| return msg, None | |
| def poll_job(job_id: str) -> tuple[str, str | None]: | |
| """Check status of a submitted job. Returns (status_msg, file_path_or_None).""" | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job: | |
| return f"β Job {job_id} not found.", None | |
| status = job["status"] | |
| url = job.get("url") | |
| file = job.get("file") | |
| if url: | |
| return f"{status} Β· π [Download link]({url}) Β· _(expires in 3 days)_", file | |
| return status, file | |
| # ββ Startup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _startup_status = "" | |
| _default_model = "" | |
| try: | |
| _default_model = _startup_downloads() | |
| _startup_status = f"β Ready Β· {DEVICE_LABEL}" | |
| except Exception as _e: | |
| _startup_status = f"β οΈ Some assets unavailable: {_e} Β· {DEVICE_LABEL}" | |
| logger.warning("Startup download issue: %s", _e) | |
| _initial_models = list_models() | |
| _initial_value = _default_model if _default_model in _initial_models else ( | |
| _initial_models[0] if _initial_models else None | |
| ) | |
| # ββ Log helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_jobs_table() -> list[list]: | |
| """Return job list as rows: [ID, Model, Status, Time, Download Link].""" | |
| with _jobs_lock: | |
| jobs = list(_jobs.items()) | |
| if not jobs: | |
| return [["β", "β", "No jobs yet", "β", "β"]] | |
| rows = [] | |
| for job_id, info in reversed(jobs): | |
| url = info.get("url") | |
| link = f"[β¬οΈ]({url})" if url else "β" | |
| rows.append([ | |
| job_id, | |
| info.get("model", ""), | |
| info.get("status", ""), | |
| info.get("elapsed", "β"), | |
| link, | |
| ]) | |
| return rows | |
| def get_queue_info() -> str: | |
| """Return a short queue status string.""" | |
| qs = _job_queue.qsize() | |
| total = len(_jobs) | |
| running = sum(1 for j in _jobs.values() if j.get("status", "").startswith("β³")) | |
| done = sum(1 for j in _jobs.values() if j.get("status", "").startswith("β ")) | |
| failed = sum(1 for j in _jobs.values() if j.get("status", "").startswith("β")) | |
| return ( | |
| f"**Queue:** {qs} waiting Β· " | |
| f"**Running:** {running} Β· " | |
| f"**Done:** {done} Β· " | |
| f"**Failed:** {failed} Β· " | |
| f"**Total:** {total}" | |
| ) | |
| # ββ Gradio UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import gradio as gr | |
| _CSS = """ | |
| #header { text-align: center; padding: 20px 0 8px; } | |
| #header h1 { font-size: 2rem; margin: 0; } | |
| #header p { opacity: .65; margin: 4px 0 0; } | |
| #status { text-align: center; font-size: .82rem; opacity: .7; margin-bottom: 8px; } | |
| footer { display: none !important; } | |
| """ | |
| with gr.Blocks(title="RVC Voice Conversion", delete_cache=(3600, 3600)) as demo: | |
| gr.HTML(f""" | |
| <div id="header"> | |
| <h1>ποΈ RVC Voice Conversion</h1> | |
| <p>Retrieval-Based Voice Conversion Β· record or upload Β· custom models Β· GPU/CPU auto</p> | |
| </div> | |
| <p id="status">{_startup_status}</p> | |
| """) | |
| with gr.Tabs(): | |
| # ββ TAB 1: Convert ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π€ Convert"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Input Audio") | |
| with gr.Tabs(): | |
| with gr.Tab("ποΈ Microphone"): | |
| inp_mic = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Record", | |
| ) | |
| with gr.Tab("π Upload File"): | |
| inp_file = gr.Audio( | |
| sources=["upload"], | |
| type="filepath", | |
| label="Upload audio (wav / mp3 / flac / ogg β¦)", | |
| ) | |
| gr.Markdown("### π€ Model") | |
| model_dd = gr.Dropdown( | |
| choices=_initial_models, | |
| value=_initial_value, | |
| label="Active Voice Model", | |
| interactive=True, | |
| ) | |
| gr.Markdown("### ποΈ Basic Settings") | |
| pitch_sl = gr.Slider( | |
| minimum=-24, maximum=24, value=0, step=1, | |
| label="Pitch Shift (semitones)", | |
| info="0 = unchanged Β· positive = higher Β· negative = lower", | |
| ) | |
| f0_radio = gr.Radio( | |
| choices=["rmvpe", "fcpe", "crepe", "crepe-tiny"], | |
| value="rmvpe", | |
| label="Pitch Extraction Method", | |
| info="rmvpe = fastest & accurate Β· crepe = highest quality (slower)", | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### βοΈ Advanced Settings") | |
| with gr.Accordion("Expand advanced options", open=False): | |
| index_rate_sl = gr.Slider( | |
| 0.0, 1.0, value=0.75, step=0.05, | |
| label="Index Rate", | |
| info="How strongly the FAISS index influences timbre (0 = off)", | |
| ) | |
| protect_sl = gr.Slider( | |
| 0.0, 0.5, value=0.5, step=0.01, | |
| label="Protect Consonants", | |
| info="Protects unvoiced consonants β 0.5 = max protection", | |
| ) | |
| filter_radius_sl = gr.Slider( | |
| 0, 7, value=3, step=1, | |
| label="Respiration Filter Radius", | |
| info="Median filter on pitch β higher = smoother, reduces breath noise", | |
| ) | |
| vol_env_sl = gr.Slider( | |
| 0.0, 1.0, value=0.25, step=0.05, | |
| label="Volume Envelope Mix", | |
| info="0.25 = natural blend Β· 1 = preserve input loudness Β· 0 = model output", | |
| ) | |
| with gr.Row(): | |
| clean_cb = gr.Checkbox(value=False, label="Noise Reduction") | |
| clean_sl = gr.Slider( | |
| 0.0, 1.0, value=0.5, step=0.05, | |
| label="Reduction Strength", | |
| ) | |
| with gr.Row(): | |
| split_cb = gr.Checkbox(value=False, label="Split Long Audio") | |
| autotune_cb = gr.Checkbox(value=False, label="Autotune") | |
| autotune_sl = gr.Slider( | |
| 0.0, 1.0, value=1.0, step=0.05, | |
| label="Autotune Strength", | |
| visible=False, | |
| ) | |
| autotune_cb.change( | |
| fn=toggle_autotune, | |
| inputs=autotune_cb, | |
| outputs=autotune_sl, | |
| ) | |
| gr.Markdown("**ποΈ Reverb**") | |
| reverb_cb = gr.Checkbox(value=False, label="Enable Reverb") | |
| with gr.Group(visible=False) as reverb_group: | |
| reverb_room_sl = gr.Slider( | |
| 0.0, 1.0, value=0.15, step=0.05, | |
| label="Room Size", | |
| info="Larger = bigger sounding space", | |
| ) | |
| reverb_damp_sl = gr.Slider( | |
| 0.0, 1.0, value=0.7, step=0.05, | |
| label="Damping", | |
| info="Higher = more absorption, less echo tail", | |
| ) | |
| reverb_wet_sl = gr.Slider( | |
| 0.0, 1.0, value=0.15, step=0.05, | |
| label="Wet Level", | |
| info="How much reverb is mixed in (0.15 = subtle)", | |
| ) | |
| reverb_cb.change( | |
| fn=lambda v: gr.update(visible=v), | |
| inputs=reverb_cb, | |
| outputs=reverb_group, | |
| ) | |
| fmt_radio = gr.Radio( | |
| choices=["WAV", "MP3", "FLAC", "OPUS"], | |
| value="WAV", | |
| label="Output Format", | |
| info="OPUS = small file (~64 kbps, Telegram/Discord quality)", | |
| ) | |
| convert_btn = gr.Button( | |
| "π Convert Voice", | |
| variant="primary", | |
| ) | |
| gr.Markdown("### π§ Output") | |
| out_status = gr.Markdown(value="") | |
| out_audio = gr.Audio(label="Result (if still on page)", type="filepath", interactive=False) | |
| gr.Markdown("#### π Check Job Status") | |
| with gr.Row(): | |
| job_id_box = gr.Textbox( | |
| label="Job ID", | |
| placeholder="e.g. a3f2b1c9", | |
| scale=3, | |
| ) | |
| poll_btn = gr.Button("π Check", scale=1) | |
| poll_status = gr.Markdown(value="") | |
| poll_audio = gr.Audio(label="Result", type="filepath", interactive=False) | |
| # ββ TAB 2: Models βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π¦ Models"): | |
| gr.Markdown(""" | |
| ### Upload a Custom RVC Model | |
| Provide a **`.zip`** containing: | |
| - **`model.pth`** β weights (required) | |
| - **`model.index`** β FAISS index (optional, improves voice matching) | |
| **Built-in models** (pre-downloaded on startup): | |
| Vestia Zeta v1 Β· Vestia Zeta v2 Β· Ayunda Risu Β· Gawr Gura | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| up_zip = gr.File(label="Model ZIP", file_types=[".zip"]) | |
| up_name = gr.Textbox( | |
| label="Model Name", | |
| placeholder="Leave blank to use zip filename", | |
| ) | |
| up_btn = gr.Button("π€ Load Model", variant="primary") | |
| up_status = gr.Textbox(label="Status", interactive=False, lines=2) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Loaded Models") | |
| models_table = gr.Dataframe( | |
| col_count=(1, "fixed"), | |
| value=[[m] for m in _initial_models], | |
| interactive=False, | |
| label="", | |
| ) | |
| refresh_btn = gr.Button("π Refresh") | |
| up_btn.click( | |
| fn=upload_model, | |
| inputs=[up_zip, up_name], | |
| outputs=[up_status, model_dd, models_table], | |
| ) | |
| refresh_btn.click( | |
| fn=refresh_models, | |
| outputs=[models_table, model_dd], | |
| ) | |
| # ββ TAB 3: Jobs βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π Jobs"): | |
| gr.Markdown("All submitted jobs, newest first. Click **Refresh** to update.") | |
| queue_status = gr.Markdown(value=get_queue_info, every=10) | |
| jobs_table = gr.Dataframe( | |
| headers=["Job ID", "Model", "Status", "Time", "Download"], | |
| col_count=(5, "fixed"), | |
| value=get_jobs_table, | |
| interactive=False, | |
| wrap=True, | |
| datatype=["str", "str", "str", "str", "markdown"], | |
| every=10, | |
| ) | |
| refresh_jobs_btn = gr.Button("π Refresh") | |
| def _refresh_jobs(): | |
| return get_queue_info(), get_jobs_table() | |
| refresh_jobs_btn.click(fn=_refresh_jobs, outputs=[queue_status, jobs_table]) | |
| # ββ TAB 4: Help βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("βΉοΈ Help"): | |
| gr.Markdown(f""" | |
| ## How it works | |
| RVC (Retrieval-Based Voice Conversion) transforms a voice recording to sound | |
| like a target speaker using only that speaker's model file. | |
| --- | |
| ## Quick Guide | |
| 1. Open the **Convert** tab | |
| 2. **Record** via microphone or **upload** an audio file (wav, mp3, flac, ogg β¦) | |
| 3. Choose a **model** from the dropdown β 4 models are pre-loaded on startup | |
| 4. Set **Pitch Shift** if needed (e.g. male β female: try +12 semitones) | |
| 5. Click **π Convert Voice** and wait for the result | |
| --- | |
| ## Built-in Models | |
| | Model | Description | | |
| |---|---| | |
| | **Vestia Zeta v1** | Hololive ID VTuber, v1 model | | |
| | **Vestia Zeta v2** | Hololive ID VTuber, v2 model (recommended) | | |
| | **Ayunda Risu** | Hololive ID VTuber | | |
| | **Gawr Gura** | Hololive EN VTuber | | |
| --- | |
| ## Pitch Extraction Methods | |
| | Method | Speed | Quality | Best for | | |
| |---|---|---|---| | |
| | **rmvpe** | β‘β‘β‘ | β β β β | General use (default) | | |
| | **fcpe** | β‘β‘ | β β β β | Singing | | |
| | **crepe** | β‘ | β β β β β | Highest quality, slow | | |
| | **crepe-tiny** | β‘β‘ | β β β | Low resource | | |
| --- | |
| ## Advanced Settings | |
| | Setting | Description | | |
| |---|---| | |
| | **Index Rate** | Influence of FAISS index on output timbre (0.75 recommended) | | |
| | **Protect Consonants** | Prevents artefacts on consonants (0.5 = max) | | |
| | **Respiration Filter Radius** | Smooths pitch curve β higher reduces breath noise (0β7, default 3) | | |
| | **Volume Envelope Mix** | 0.25 = natural blend Β· 1 = preserve input loudness | | |
| | **Noise Reduction** | Removes background noise before conversion | | |
| | **Split Long Audio** | Chunks audio for recordings > 60 s | | |
| | **Autotune** | Snaps pitch to nearest musical note | | |
| --- | |
| ## Output Formats | |
| | Format | Size | Quality | | |
| |---|---|---| | |
| | **WAV** | Large | Lossless | | |
| | **FLAC** | Medium | Lossless compressed | | |
| | **MP3** | Small | Lossy | | |
| | **OPUS** | Tiny (~64 kbps) | Telegram/Discord quality | | |
| --- | |
| **Device:** `{DEVICE_LABEL}` | |
| **Max input duration:** {MAX_INPUT_DURATION // 60} minutes | |
| --- | |
| ## Credits | |
| Engine: [Ultimate RVC](https://github.com/JackismyShephard/ultimate-rvc) | |
| """) | |
| # Wire convert button after all tabs so jobs_table is defined | |
| def _submit_and_extract_id(*args): | |
| status, audio = convert(*args) | |
| import re | |
| match = re.search(r"[a-f0-9]{8}", status or "") | |
| job_id = match.group(0) if match else "" | |
| return status, audio, job_id, get_queue_info(), get_jobs_table() | |
| convert_btn.click( | |
| fn=_submit_and_extract_id, | |
| inputs=[ | |
| inp_mic, inp_file, model_dd, | |
| pitch_sl, f0_radio, | |
| index_rate_sl, protect_sl, vol_env_sl, | |
| clean_cb, clean_sl, | |
| split_cb, autotune_cb, autotune_sl, | |
| filter_radius_sl, | |
| fmt_radio, | |
| reverb_cb, reverb_room_sl, reverb_damp_sl, reverb_wet_sl, | |
| ], | |
| outputs=[out_status, out_audio, job_id_box, queue_status, jobs_table], | |
| ) | |
| def _poll_and_refresh(job_id): | |
| status, file = poll_job(job_id) | |
| return status, file, get_queue_info(), get_jobs_table() | |
| poll_btn.click( | |
| fn=_poll_and_refresh, | |
| inputs=[job_id_box], | |
| outputs=[poll_status, poll_audio, queue_status, jobs_table], | |
| ) | |
| # ββ Launch ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| demo.queue(default_concurrency_limit=5) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", 7860)), | |
| max_threads=10, | |
| ssr_mode=False, | |
| css=_CSS, | |
| ) | |