RVC-Neko / app.py
ozipoetra's picture
App: adjust some value
b87649f verified
"""
RVC Voice Conversion – HuggingFace Space
Simple, fast, GPU/CPU auto-detected.
"""
from __future__ import annotations
import logging
import os
import queue
import shutil
import sys
import tempfile
import threading
import time
import uuid
import zipfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import torch
# ── Path bootstrap ────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent
sys.path.insert(0, str(BASE_DIR))
MODELS_DIR = BASE_DIR / "rvc_models"
OUTPUT_DIR = BASE_DIR / "outputs"
MODELS_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
os.environ.setdefault("URVC_MODELS_DIR", str(MODELS_DIR / "urvc"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
for _noisy in ("httpx", "httpcore", "faiss", "faiss.loader", "transformers", "torch"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
logger = logging.getLogger("rvc_space")
# ── CPU threading ─────────────────────────────────────────────────────────────
try:
_NUM_CORES = len(os.sched_getaffinity(0))
except AttributeError:
_NUM_CORES = os.cpu_count() or 1
torch.set_num_threads(_NUM_CORES)
torch.set_num_interop_threads(_NUM_CORES)
os.environ["OMP_NUM_THREADS"] = str(_NUM_CORES)
os.environ["MKL_NUM_THREADS"] = str(_NUM_CORES)
os.environ["NUMEXPR_NUM_THREADS"] = str(_NUM_CORES)
os.environ["OPENBLAS_NUM_THREADS"] = str(_NUM_CORES)
torch.set_float32_matmul_precision("high")
torch.backends.mkldnn.enabled = True
logger.info("CPU threads: %d | matmul: high | oneDNN: enabled", _NUM_CORES)
# ── Device ────────────────────────────────────────────────────────────────────
if torch.cuda.is_available():
DEVICE = "cuda"
DEVICE_LABEL = f"🟒 GPU · {torch.cuda.get_device_name(0)}"
else:
DEVICE = "cpu"
DEVICE_LABEL = f"πŸ”΅ CPU Β· {_NUM_CORES} cores"
logger.info("Device: %s", DEVICE_LABEL)
# ── Built-in models ───────────────────────────────────────────────────────────
BUILTIN_MODELS = [
{
"name": "Vestia Zeta v1",
"url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zeta.zip",
},
{
"name": "Vestia Zeta v2",
"url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zetaTest.zip",
},
{
"name": "Ayunda Risu",
"url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/risu.zip",
},
{
"name": "Gawr Gura",
"url": "https://huggingface.co/Gigrig/GigrigRVC/resolve/41d46f087b9c7d70b93acf100f1cb9f7d25f3831/GawrGura_RVC_v2_Ov2Super_e275_s64075.zip",
},
]
# Max input duration in seconds (warn user beyond this)
MAX_INPUT_DURATION = 300 # 5 minutes
# Output file TTL β€” delete files older than this on each conversion
OUTPUT_TTL_SECONDS = 21600 # 1 hour
# Max jobs to keep in memory
MAX_JOBS = 50
# ── Lazy VoiceConverter ───────────────────────────────────────────────────────
_vc_instance = None
def _get_vc():
global _vc_instance
if _vc_instance is None:
logger.info("Loading VoiceConverter…")
from ultimate_rvc.rvc.infer.infer import VoiceConverter
_vc_instance = VoiceConverter()
logger.info("VoiceConverter ready.")
return _vc_instance
# ── Output file cleanup ───────────────────────────────────────────────────────
def _cleanup_old_outputs() -> None:
"""Delete output files older than OUTPUT_TTL_SECONDS."""
now = time.time()
for f in OUTPUT_DIR.iterdir():
if f.is_file() and (now - f.stat().st_mtime) > OUTPUT_TTL_SECONDS:
try:
f.unlink()
logger.info("Cleaned up old output: %s", f.name)
except Exception:
pass
# ── Model helpers ─────────────────────────────────────────────────────────────
def list_models() -> list[str]:
if not MODELS_DIR.exists():
return []
return sorted(p.name for p in MODELS_DIR.iterdir()
if p.is_dir() and list(p.glob("*.pth")))
def _pth_and_index(name: str) -> tuple[str, str]:
d = MODELS_DIR / name
pths = list(d.glob("*.pth"))
idxs = list(d.glob("*.index"))
if not pths:
raise FileNotFoundError(f"No .pth file found in model '{name}'")
return str(pths[0]), str(idxs[0]) if idxs else ""
def _extract_zip(zip_path: str | Path, dest_name: str) -> None:
dest = MODELS_DIR / dest_name
dest.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(dest)
for nested in list(dest.rglob("*.pth")) + list(dest.rglob("*.index")):
target = dest / nested.name
if nested != target:
shutil.move(str(nested), str(target))
def _download_file(url: str, dest: Path) -> None:
"""Download a single file if not already present."""
if dest.exists():
return
dest.parent.mkdir(parents=True, exist_ok=True)
logger.info("Downloading %s …", dest.name)
import requests
r = requests.get(url, stream=True, timeout=300)
r.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False, dir=dest.parent, suffix=".tmp") as tmp:
for chunk in r.iter_content(8192):
tmp.write(chunk)
tmp_path = tmp.name
os.replace(tmp_path, dest)
logger.info("%s ready.", dest.name)
def _download_model_entry(model: dict) -> str:
"""Download a single built-in model zip. Returns model name."""
import requests
name = model["name"]
dest = MODELS_DIR / name
if dest.exists() and list(dest.glob("*.pth")):
logger.info("Model already present: %s", name)
return name
logger.info("Downloading model: %s …", name)
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
r = requests.get(model["url"], stream=True, timeout=300)
r.raise_for_status()
for chunk in r.iter_content(8192):
tmp.write(chunk)
tmp_path = tmp.name
_extract_zip(tmp_path, name)
os.unlink(tmp_path)
logger.info("Model ready: %s", name)
return name
def _startup_downloads() -> str:
"""
Download all required assets in parallel at startup.
Returns name of first built-in model as the default selection.
"""
import requests # noqa: F401 β€” ensure available before threads
# Build task list: predictors + embedders + models all in one pool
predictor_base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/predictors"
embedder_base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/embedders"
predictors_dir = MODELS_DIR / "urvc" / "rvc" / "predictors"
embedders_dir = MODELS_DIR / "urvc" / "rvc" / "embedders"
file_tasks = [
(f"{predictor_base}/rmvpe.pt", predictors_dir / "rmvpe.pt"),
(f"{predictor_base}/fcpe.pt", predictors_dir / "fcpe.pt"),
(f"{embedder_base}/contentvec/pytorch_model.bin", embedders_dir / "contentvec" / "pytorch_model.bin"),
(f"{embedder_base}/contentvec/config.json", embedders_dir / "contentvec" / "config.json"),
]
with ThreadPoolExecutor(max_workers=8) as pool:
# Submit file downloads
file_futures = {pool.submit(_download_file, url, dest): dest.name
for url, dest in file_tasks}
# Submit model downloads
model_futures = {pool.submit(_download_model_entry, m): m["name"]
for m in BUILTIN_MODELS}
all_futures = {**file_futures, **model_futures}
for future in as_completed(all_futures):
try:
future.result()
except Exception as exc:
logger.warning("Download failed (%s): %s", all_futures[future], exc)
return BUILTIN_MODELS[0]["name"]
# ── Upload handler ────────────────────────────────────────────────────────────
def upload_model(zip_file, model_name):
import gradio as gr
if not zip_file:
return "⚠️ No file provided.", gr.update(), gr.update()
name = (model_name or "").strip() or Path(zip_file).stem
try:
_extract_zip(zip_file, name)
models = list_models()
return (
f"βœ… Model **{name}** loaded successfully.",
gr.update(choices=models, value=name),
gr.update(value=[[m] for m in models]),
)
except Exception as exc:
logger.exception("Model upload failed")
return f"❌ Error: {exc}", gr.update(), gr.update()
# ── Refresh handler ───────────────────────────────────────────────────────────
def refresh_models():
import gradio as gr
models = list_models()
return gr.update(value=[[m] for m in models]), gr.update(choices=models)
# ── Autotune visibility toggle ────────────────────────────────────────────────
def toggle_autotune(enabled):
import gradio as gr
return gr.update(visible=enabled)
# ── ffmpeg is pre-installed on HuggingFace Spaces ────────────────────────────
def _ffmpeg_bin() -> str:
return "ffmpeg"
# ── Reverb effect via pedalboard ─────────────────────────────────────────────
def _apply_reverb(audio_path: str, room_size: float, damping: float, wet_level: float) -> None:
"""Apply reverb in-place to a WAV file using pedalboard."""
try:
from pedalboard import Pedalboard, Reverb
from pedalboard.io import AudioFile
import tempfile, shutil
tmp = audio_path + ".reverb.tmp.wav"
board = Pedalboard([
Reverb(
room_size=room_size,
damping=damping,
wet_level=wet_level,
dry_level=1.0 - wet_level,
width=1.0,
)
])
with AudioFile(audio_path) as f:
with AudioFile(tmp, "w", f.samplerate, f.num_channels) as out:
while f.tell() < f.frames:
chunk = f.read(f.samplerate)
out.write(board(chunk, f.samplerate, reset=False))
shutil.move(tmp, audio_path)
logger.info("Reverb applied (room=%.2f, damp=%.2f, wet=%.2f)", room_size, damping, wet_level)
except Exception as exc:
logger.warning("Reverb failed: %s", exc)
# ── Upload to temp.sh ────────────────────────────────────────────────────────
def _upload_to_tempsh(file_path: str) -> str | None:
"""Upload a file to temp.sh and return the download URL, or None on failure."""
try:
import subprocess
result = subprocess.run(
["curl", "-s", "-F", f"file=@{file_path}", "https://temp.sh/upload"],
capture_output=True,
text=True,
timeout=120,
)
url = result.stdout.strip()
if url.startswith("https://"):
logger.info("Uploaded to temp.sh: %s", url)
return url
else:
logger.warning("temp.sh upload failed: %s", result.stdout or result.stderr)
return None
except Exception as exc:
logger.warning("temp.sh upload error: %s", exc)
return None
# ── Background job queue ─────────────────────────────────────────────────────
_job_queue: queue.Queue = queue.Queue()
# Job status store: job_id -> {"status": str, "url": str|None, "model": str}
_jobs: dict[str, dict] = {}
_jobs_lock = threading.Lock()
def _worker() -> None:
"""Single background worker β€” processes one job at a time from the queue."""
while True:
job = _job_queue.get()
job_id = job["id"]
try:
_start_time = time.time()
with _jobs_lock:
_jobs[job_id]["status"] = "⏳ Converting…"
logger.info("[Job %s] Starting conversion (model: %s)", job_id, job["model_name"])
model_path, index_path = _pth_and_index(job["model_name"])
_cleanup_old_outputs()
is_opus = job["output_format"].upper() == "OPUS"
engine_format = "WAV" if is_opus else job["output_format"]
ts = int(time.time())
wav_path = OUTPUT_DIR / f"output-{ts}.wav"
out_path = OUTPUT_DIR / (
f"output-{ts}.opus" if is_opus
else f"output-{ts}.{job['output_format'].lower()}"
)
vc = _get_vc()
vc.convert_audio(
audio_input_path=job["audio_input"],
audio_output_path=str(wav_path),
model_path=model_path,
index_path=index_path,
pitch=job["pitch"],
f0_method=job["f0_method"],
index_rate=job["index_rate"],
volume_envelope=job["volume_envelope"],
protect=job["protect"],
split_audio=job["split_audio"],
f0_autotune=job["autotune"],
f0_autotune_strength=job["autotune_strength"],
clean_audio=job["clean_audio"],
clean_strength=job["clean_strength"],
export_format=engine_format,
filter_radius=job["filter_radius"],
)
if is_opus:
import subprocess
subprocess.run(
[
_ffmpeg_bin(), "-y",
"-i", str(wav_path),
"-c:a", "libopus",
"-b:a", "64000",
"-vbr", "off",
"-ar", "48000",
str(out_path),
],
check=True, capture_output=True,
)
wav_path.unlink(missing_ok=True)
# Apply reverb if enabled (operates on the final output file)
if job.get("reverb"):
_apply_reverb(
str(out_path),
room_size=job.get("reverb_room_size", 0.15),
damping=job.get("reverb_damping", 0.7),
wet_level=job.get("reverb_wet_level", 0.15),
)
# Upload to temp.sh
temp_url = _upload_to_tempsh(str(out_path))
_elapsed = time.time() - _start_time
_elapsed_str = f"{_elapsed:.0f}s" if _elapsed < 60 else f"{_elapsed/60:.1f}m"
with _jobs_lock:
_jobs[job_id]["elapsed"] = _elapsed_str
if temp_url:
_jobs[job_id]["status"] = "βœ… Done"
_jobs[job_id]["url"] = temp_url
_jobs[job_id]["file"] = str(out_path)
logger.info("[Job %s] Complete in %s β†’ %s", job_id, _elapsed_str, temp_url)
else:
_jobs[job_id]["status"] = "βœ… Done"
_jobs[job_id]["file"] = str(out_path)
logger.info("[Job %s] Complete in %s (no temp.sh URL)", job_id, _elapsed_str)
except Exception as exc:
_elapsed = time.time() - _start_time if "_start_time" in dir() else 0
_elapsed_str = f"{_elapsed:.0f}s" if _elapsed < 60 else f"{_elapsed/60:.1f}m"
logger.exception("[Job %s] Failed after %s: %s", job_id, _elapsed_str, exc)
with _jobs_lock:
_jobs[job_id]["status"] = f"❌ Failed"
_jobs[job_id]["elapsed"] = _elapsed_str
_jobs[job_id]["file"] = None
finally:
_job_queue.task_done()
# Start the single background worker thread
_worker_thread = threading.Thread(target=_worker, daemon=True)
_worker_thread.start()
logger.info("Background worker started.")
# ── Conversion ────────────────────────────────────────────────────────────────
def convert(
audio_mic, audio_file, model_name,
pitch, f0_method,
index_rate, protect, volume_envelope,
clean_audio, clean_strength,
split_audio, autotune, autotune_strength,
filter_radius,
output_format,
reverb=False,
reverb_room_size=0.15,
reverb_damping=0.7,
reverb_wet_level=0.15,
):
"""Submit a job to the background worker and return immediately."""
audio_input = audio_mic or audio_file
if audio_input is None:
return "⚠️ Please record or upload audio first.", None
if not model_name:
return "⚠️ No model selected.", None
# Check input duration upfront before queuing
try:
import soundfile as sf
info = sf.info(audio_input)
duration = info.duration
if duration > MAX_INPUT_DURATION:
return (
f"⚠️ Audio is {duration:.0f}s β€” max is {MAX_INPUT_DURATION//60} min. "
f"Please trim your audio.", None
)
logger.info("Input duration: %.1fs", duration)
except Exception:
pass
# Validate model exists before queuing
try:
_pth_and_index(model_name)
except FileNotFoundError as exc:
return f"❌ {exc}", None
job_id = uuid.uuid4().hex[:8]
job = {
"id": job_id,
"audio_input": audio_input,
"model_name": model_name,
"pitch": pitch,
"f0_method": f0_method,
"index_rate": index_rate,
"volume_envelope": volume_envelope,
"protect": protect,
"split_audio": split_audio,
"autotune": autotune,
"autotune_strength": autotune_strength,
"clean_audio": clean_audio,
"clean_strength": clean_strength,
"filter_radius": filter_radius,
"output_format": output_format,
"reverb": reverb,
"reverb_room_size": reverb_room_size,
"reverb_damping": reverb_damping,
"reverb_wet_level": reverb_wet_level,
}
with _jobs_lock:
if len(_jobs) >= MAX_JOBS:
oldest = next(iter(_jobs))
del _jobs[oldest]
logger.info("Removed oldest job %s (limit: %d)", oldest, MAX_JOBS)
_jobs[job_id] = {"status": "πŸ• Queued…", "url": None, "file": None, "model": model_name}
_job_queue.put(job)
queue_size = _job_queue.qsize()
logger.info("[Job %s] Queued (model: %s, queue depth: %d)", job_id, model_name, queue_size)
msg = (
"πŸ• Job **" + job_id + "** queued β€” you can close this tab.\n\n"
"Check the **πŸ“‹ Jobs** tab for your download link when done.\n\n"
"_(Queue position: " + str(queue_size) + ")_"
)
return msg, None
def poll_job(job_id: str) -> tuple[str, str | None]:
"""Check status of a submitted job. Returns (status_msg, file_path_or_None)."""
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
return f"❌ Job {job_id} not found.", None
status = job["status"]
url = job.get("url")
file = job.get("file")
if url:
return f"{status} Β· πŸ”— [Download link]({url}) Β· _(expires in 3 days)_", file
return status, file
# ── Startup ───────────────────────────────────────────────────────────────────
_startup_status = ""
_default_model = ""
try:
_default_model = _startup_downloads()
_startup_status = f"βœ… Ready &nbsp;Β·&nbsp; {DEVICE_LABEL}"
except Exception as _e:
_startup_status = f"⚠️ Some assets unavailable: {_e} &nbsp;·&nbsp; {DEVICE_LABEL}"
logger.warning("Startup download issue: %s", _e)
_initial_models = list_models()
_initial_value = _default_model if _default_model in _initial_models else (
_initial_models[0] if _initial_models else None
)
# ── Log helpers ───────────────────────────────────────────────────────────────
def get_jobs_table() -> list[list]:
"""Return job list as rows: [ID, Model, Status, Time, Download Link]."""
with _jobs_lock:
jobs = list(_jobs.items())
if not jobs:
return [["β€”", "β€”", "No jobs yet", "β€”", "β€”"]]
rows = []
for job_id, info in reversed(jobs):
url = info.get("url")
link = f"[⬇️]({url})" if url else "β€”"
rows.append([
job_id,
info.get("model", ""),
info.get("status", ""),
info.get("elapsed", "β€”"),
link,
])
return rows
def get_queue_info() -> str:
"""Return a short queue status string."""
qs = _job_queue.qsize()
total = len(_jobs)
running = sum(1 for j in _jobs.values() if j.get("status", "").startswith("⏳"))
done = sum(1 for j in _jobs.values() if j.get("status", "").startswith("βœ…"))
failed = sum(1 for j in _jobs.values() if j.get("status", "").startswith("❌"))
return (
f"**Queue:** {qs} waiting Β· "
f"**Running:** {running} Β· "
f"**Done:** {done} Β· "
f"**Failed:** {failed} Β· "
f"**Total:** {total}"
)
# ── Gradio UI ─────────────────────────────────────────────────────────────────
import gradio as gr
_CSS = """
#header { text-align: center; padding: 20px 0 8px; }
#header h1 { font-size: 2rem; margin: 0; }
#header p { opacity: .65; margin: 4px 0 0; }
#status { text-align: center; font-size: .82rem; opacity: .7; margin-bottom: 8px; }
footer { display: none !important; }
"""
with gr.Blocks(title="RVC Voice Conversion", delete_cache=(3600, 3600)) as demo:
gr.HTML(f"""
<div id="header">
<h1>πŸŽ™οΈ RVC Voice Conversion</h1>
<p>Retrieval-Based Voice Conversion Β· record or upload Β· custom models Β· GPU/CPU auto</p>
</div>
<p id="status">{_startup_status}</p>
""")
with gr.Tabs():
# ── TAB 1: Convert ────────────────────────────────────────────────────
with gr.Tab("🎀 Convert"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ”Š Input Audio")
with gr.Tabs():
with gr.Tab("πŸŽ™οΈ Microphone"):
inp_mic = gr.Audio(
sources=["microphone"],
type="filepath",
label="Record",
)
with gr.Tab("πŸ“ Upload File"):
inp_file = gr.Audio(
sources=["upload"],
type="filepath",
label="Upload audio (wav / mp3 / flac / ogg …)",
)
gr.Markdown("### πŸ€– Model")
model_dd = gr.Dropdown(
choices=_initial_models,
value=_initial_value,
label="Active Voice Model",
interactive=True,
)
gr.Markdown("### 🎚️ Basic Settings")
pitch_sl = gr.Slider(
minimum=-24, maximum=24, value=0, step=1,
label="Pitch Shift (semitones)",
info="0 = unchanged Β· positive = higher Β· negative = lower",
)
f0_radio = gr.Radio(
choices=["rmvpe", "fcpe", "crepe", "crepe-tiny"],
value="rmvpe",
label="Pitch Extraction Method",
info="rmvpe = fastest & accurate Β· crepe = highest quality (slower)",
)
with gr.Column(scale=1):
gr.Markdown("### βš™οΈ Advanced Settings")
with gr.Accordion("Expand advanced options", open=False):
index_rate_sl = gr.Slider(
0.0, 1.0, value=0.75, step=0.05,
label="Index Rate",
info="How strongly the FAISS index influences timbre (0 = off)",
)
protect_sl = gr.Slider(
0.0, 0.5, value=0.5, step=0.01,
label="Protect Consonants",
info="Protects unvoiced consonants β€” 0.5 = max protection",
)
filter_radius_sl = gr.Slider(
0, 7, value=3, step=1,
label="Respiration Filter Radius",
info="Median filter on pitch β€” higher = smoother, reduces breath noise",
)
vol_env_sl = gr.Slider(
0.0, 1.0, value=0.25, step=0.05,
label="Volume Envelope Mix",
info="0.25 = natural blend Β· 1 = preserve input loudness Β· 0 = model output",
)
with gr.Row():
clean_cb = gr.Checkbox(value=False, label="Noise Reduction")
clean_sl = gr.Slider(
0.0, 1.0, value=0.5, step=0.05,
label="Reduction Strength",
)
with gr.Row():
split_cb = gr.Checkbox(value=False, label="Split Long Audio")
autotune_cb = gr.Checkbox(value=False, label="Autotune")
autotune_sl = gr.Slider(
0.0, 1.0, value=1.0, step=0.05,
label="Autotune Strength",
visible=False,
)
autotune_cb.change(
fn=toggle_autotune,
inputs=autotune_cb,
outputs=autotune_sl,
)
gr.Markdown("**πŸŽ›οΈ Reverb**")
reverb_cb = gr.Checkbox(value=False, label="Enable Reverb")
with gr.Group(visible=False) as reverb_group:
reverb_room_sl = gr.Slider(
0.0, 1.0, value=0.15, step=0.05,
label="Room Size",
info="Larger = bigger sounding space",
)
reverb_damp_sl = gr.Slider(
0.0, 1.0, value=0.7, step=0.05,
label="Damping",
info="Higher = more absorption, less echo tail",
)
reverb_wet_sl = gr.Slider(
0.0, 1.0, value=0.15, step=0.05,
label="Wet Level",
info="How much reverb is mixed in (0.15 = subtle)",
)
reverb_cb.change(
fn=lambda v: gr.update(visible=v),
inputs=reverb_cb,
outputs=reverb_group,
)
fmt_radio = gr.Radio(
choices=["WAV", "MP3", "FLAC", "OPUS"],
value="WAV",
label="Output Format",
info="OPUS = small file (~64 kbps, Telegram/Discord quality)",
)
convert_btn = gr.Button(
"πŸš€ Convert Voice",
variant="primary",
)
gr.Markdown("### 🎧 Output")
out_status = gr.Markdown(value="")
out_audio = gr.Audio(label="Result (if still on page)", type="filepath", interactive=False)
gr.Markdown("#### πŸ” Check Job Status")
with gr.Row():
job_id_box = gr.Textbox(
label="Job ID",
placeholder="e.g. a3f2b1c9",
scale=3,
)
poll_btn = gr.Button("πŸ”„ Check", scale=1)
poll_status = gr.Markdown(value="")
poll_audio = gr.Audio(label="Result", type="filepath", interactive=False)
# ── TAB 2: Models ─────────────────────────────────────────────────────
with gr.Tab("πŸ“¦ Models"):
gr.Markdown("""
### Upload a Custom RVC Model
Provide a **`.zip`** containing:
- **`model.pth`** β€” weights (required)
- **`model.index`** β€” FAISS index (optional, improves voice matching)
**Built-in models** (pre-downloaded on startup):
Vestia Zeta v1 Β· Vestia Zeta v2 Β· Ayunda Risu Β· Gawr Gura
""")
with gr.Row():
with gr.Column(scale=1):
up_zip = gr.File(label="Model ZIP", file_types=[".zip"])
up_name = gr.Textbox(
label="Model Name",
placeholder="Leave blank to use zip filename",
)
up_btn = gr.Button("πŸ“€ Load Model", variant="primary")
up_status = gr.Textbox(label="Status", interactive=False, lines=2)
with gr.Column(scale=1):
gr.Markdown("### Loaded Models")
models_table = gr.Dataframe(
col_count=(1, "fixed"),
value=[[m] for m in _initial_models],
interactive=False,
label="",
)
refresh_btn = gr.Button("πŸ”„ Refresh")
up_btn.click(
fn=upload_model,
inputs=[up_zip, up_name],
outputs=[up_status, model_dd, models_table],
)
refresh_btn.click(
fn=refresh_models,
outputs=[models_table, model_dd],
)
# ── TAB 3: Jobs ───────────────────────────────────────────────────────
with gr.Tab("πŸ“‹ Jobs"):
gr.Markdown("All submitted jobs, newest first. Click **Refresh** to update.")
queue_status = gr.Markdown(value=get_queue_info, every=10)
jobs_table = gr.Dataframe(
headers=["Job ID", "Model", "Status", "Time", "Download"],
col_count=(5, "fixed"),
value=get_jobs_table,
interactive=False,
wrap=True,
datatype=["str", "str", "str", "str", "markdown"],
every=10,
)
refresh_jobs_btn = gr.Button("πŸ”„ Refresh")
def _refresh_jobs():
return get_queue_info(), get_jobs_table()
refresh_jobs_btn.click(fn=_refresh_jobs, outputs=[queue_status, jobs_table])
# ── TAB 4: Help ───────────────────────────────────────────────────────
with gr.Tab("ℹ️ Help"):
gr.Markdown(f"""
## How it works
RVC (Retrieval-Based Voice Conversion) transforms a voice recording to sound
like a target speaker using only that speaker's model file.
---
## Quick Guide
1. Open the **Convert** tab
2. **Record** via microphone or **upload** an audio file (wav, mp3, flac, ogg …)
3. Choose a **model** from the dropdown β€” 4 models are pre-loaded on startup
4. Set **Pitch Shift** if needed (e.g. male β†’ female: try +12 semitones)
5. Click **πŸš€ Convert Voice** and wait for the result
---
## Built-in Models
| Model | Description |
|---|---|
| **Vestia Zeta v1** | Hololive ID VTuber, v1 model |
| **Vestia Zeta v2** | Hololive ID VTuber, v2 model (recommended) |
| **Ayunda Risu** | Hololive ID VTuber |
| **Gawr Gura** | Hololive EN VTuber |
---
## Pitch Extraction Methods
| Method | Speed | Quality | Best for |
|---|---|---|---|
| **rmvpe** | ⚑⚑⚑ | β˜…β˜…β˜…β˜… | General use (default) |
| **fcpe** | ⚑⚑ | β˜…β˜…β˜…β˜… | Singing |
| **crepe** | ⚑ | β˜…β˜…β˜…β˜…β˜… | Highest quality, slow |
| **crepe-tiny** | ⚑⚑ | β˜…β˜…β˜… | Low resource |
---
## Advanced Settings
| Setting | Description |
|---|---|
| **Index Rate** | Influence of FAISS index on output timbre (0.75 recommended) |
| **Protect Consonants** | Prevents artefacts on consonants (0.5 = max) |
| **Respiration Filter Radius** | Smooths pitch curve β€” higher reduces breath noise (0–7, default 3) |
| **Volume Envelope Mix** | 0.25 = natural blend Β· 1 = preserve input loudness |
| **Noise Reduction** | Removes background noise before conversion |
| **Split Long Audio** | Chunks audio for recordings > 60 s |
| **Autotune** | Snaps pitch to nearest musical note |
---
## Output Formats
| Format | Size | Quality |
|---|---|---|
| **WAV** | Large | Lossless |
| **FLAC** | Medium | Lossless compressed |
| **MP3** | Small | Lossy |
| **OPUS** | Tiny (~64 kbps) | Telegram/Discord quality |
---
**Device:** `{DEVICE_LABEL}`
**Max input duration:** {MAX_INPUT_DURATION // 60} minutes
---
## Credits
Engine: [Ultimate RVC](https://github.com/JackismyShephard/ultimate-rvc)
""")
# Wire convert button after all tabs so jobs_table is defined
def _submit_and_extract_id(*args):
status, audio = convert(*args)
import re
match = re.search(r"[a-f0-9]{8}", status or "")
job_id = match.group(0) if match else ""
return status, audio, job_id, get_queue_info(), get_jobs_table()
convert_btn.click(
fn=_submit_and_extract_id,
inputs=[
inp_mic, inp_file, model_dd,
pitch_sl, f0_radio,
index_rate_sl, protect_sl, vol_env_sl,
clean_cb, clean_sl,
split_cb, autotune_cb, autotune_sl,
filter_radius_sl,
fmt_radio,
reverb_cb, reverb_room_sl, reverb_damp_sl, reverb_wet_sl,
],
outputs=[out_status, out_audio, job_id_box, queue_status, jobs_table],
)
def _poll_and_refresh(job_id):
status, file = poll_job(job_id)
return status, file, get_queue_info(), get_jobs_table()
poll_btn.click(
fn=_poll_and_refresh,
inputs=[job_id_box],
outputs=[poll_status, poll_audio, queue_status, jobs_table],
)
# ── Launch ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
demo.queue(default_concurrency_limit=5)
demo.launch(
server_name="0.0.0.0",
server_port=int(os.getenv("PORT", 7860)),
max_threads=10,
ssr_mode=False,
css=_CSS,
)