RVC-Neko / app.py
ozipoetra
up
0c86095
"""
RVC Voice Conversion – HuggingFace Space
Simple, fast, GPU/CPU auto-detected.
"""
from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import zipfile
from pathlib import Path
import torch
# ── Path bootstrap ────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent
sys.path.insert(0, str(BASE_DIR))
MODELS_DIR = BASE_DIR / "rvc_models"
OUTPUT_DIR = BASE_DIR / "outputs"
MODELS_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
os.environ.setdefault("URVC_MODELS_DIR", str(MODELS_DIR / "urvc"))
# ── Logging ───────────────────────────────────────────────────────────────────
# ── In-memory log buffer (feeds the Logs tab in the UI) ──────────────────────
import collections
_LOG_BUFFER = collections.deque(maxlen=200) # keep last 200 lines
class _UILogHandler(logging.Handler):
def emit(self, record):
_LOG_BUFFER.append(self.format(record))
_ui_handler = _UILogHandler()
_ui_handler.setLevel(logging.INFO)
_ui_handler.setFormatter(logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
# Attach UI handler to root logger so all child loggers feed into it
logging.getLogger().addHandler(_ui_handler)
for _noisy in ("httpx", "httpcore", "faiss", "faiss.loader", "transformers", "torch"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
logger = logging.getLogger("rvc_space")
# ── CPU threading β€” use all available cores ───────────────────────────────────
# Use sched_getaffinity to get cores actually allocated to this process
# (os.cpu_count() returns the host total which is wrong in containers)
try:
_NUM_CORES = len(os.sched_getaffinity(0))
except AttributeError:
_NUM_CORES = os.cpu_count() or 1
torch.set_num_threads(_NUM_CORES)
torch.set_num_interop_threads(_NUM_CORES)
os.environ["OMP_NUM_THREADS"] = str(_NUM_CORES)
os.environ["MKL_NUM_THREADS"] = str(_NUM_CORES)
os.environ["NUMEXPR_NUM_THREADS"] = str(_NUM_CORES)
os.environ["OPENBLAS_NUM_THREADS"] = str(_NUM_CORES)
# Use fastest matmul precision on CPU (trades tiny accuracy for speed)
torch.set_float32_matmul_precision("high")
# Enable oneDNN fusion optimisations (fuses ops like conv+relu into one kernel)
torch.backends.mkldnn.enabled = True
logger.info("CPU threads: %d | matmul: high precision | oneDNN: enabled", _NUM_CORES)
# ── Device ────────────────────────────────────────────────────────────────────
if torch.cuda.is_available():
DEVICE = "cuda"
DEVICE_LABEL = f"🟒 GPU · {torch.cuda.get_device_name(0)}"
else:
DEVICE = "cpu"
DEVICE_LABEL = f"πŸ”΅ CPU Β· {_NUM_CORES} cores"
logger.info("Device: %s", DEVICE_LABEL)
# ── Built-in models ──────────────────────────────────────────────────────────
BUILTIN_MODELS = [
{
"name": "Vestia Zeta v1",
"url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zeta.zip",
},
{
"name": "Vestia Zeta v2",
"url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/zetaTest.zip",
},
{
"name": "Ayunda Risu",
"url": "https://huggingface.co/megaaziib/my-rvc-models-collection/resolve/main/risu.zip",
},
{
"name": "Gawr Gura",
"url": "https://huggingface.co/Gigrig/GigrigRVC/resolve/41d46f087b9c7d70b93acf100f1cb9f7d25f3831/GawrGura_RVC_v2_Ov2Super_e275_s64075.zip",
},
]
# ── Lazy VoiceConverter ───────────────────────────────────────────────────────
_vc_instance = None
def _get_vc():
global _vc_instance
if _vc_instance is None:
logger.info("Loading VoiceConverter…")
from ultimate_rvc.rvc.infer.infer import VoiceConverter
_vc_instance = VoiceConverter()
logger.info("VoiceConverter ready.")
return _vc_instance
# ── Model helpers ─────────────────────────────────────────────────────────────
def list_models() -> list[str]:
if not MODELS_DIR.exists():
return []
return sorted(p.name for p in MODELS_DIR.iterdir()
if p.is_dir() and list(p.glob("*.pth")))
def _pth_and_index(name: str) -> tuple[str, str]:
d = MODELS_DIR / name
pths = list(d.glob("*.pth"))
idxs = list(d.glob("*.index"))
if not pths:
raise FileNotFoundError(f"No .pth file found in model '{name}'")
return str(pths[0]), str(idxs[0]) if idxs else ""
def _extract_zip(zip_path: str | Path, dest_name: str) -> None:
dest = MODELS_DIR / dest_name
dest.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(dest)
for nested in list(dest.rglob("*.pth")) + list(dest.rglob("*.index")):
target = dest / nested.name
if nested != target:
shutil.move(str(nested), str(target))
def _download_file(url: str, dest: Path) -> None:
"""Download a single file if not already present."""
if dest.exists():
return
dest.parent.mkdir(parents=True, exist_ok=True)
logger.info("Downloading %s ...", dest.name)
import requests
r = requests.get(url, stream=True, timeout=300)
r.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False, dir=dest.parent, suffix=".tmp") as tmp:
for chunk in r.iter_content(8192):
tmp.write(chunk)
tmp_path = tmp.name
os.replace(tmp_path, dest)
logger.info("%s ready.", dest.name)
def download_predictors() -> None:
"""Download rmvpe.pt and fcpe.pt needed by all F0 methods."""
base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/predictors"
predictors_dir = MODELS_DIR / "urvc" / "rvc" / "predictors"
for fname in ("rmvpe.pt", "fcpe.pt"):
_download_file(f"{base}/{fname}", predictors_dir / fname)
def download_embedders() -> None:
"""Download the default contentvec embedder (pytorch_model.bin + config.json)."""
base = "https://huggingface.co/JackismyShephard/ultimate-rvc/resolve/main/Resources/embedders"
embedders_dir = MODELS_DIR / "urvc" / "rvc" / "embedders"
for folder, fname in (
("contentvec", "pytorch_model.bin"),
("contentvec", "config.json"),
):
_download_file(f"{base}/{folder}/{fname}", embedders_dir / folder / fname)
def download_builtin_models() -> str:
"""Download all built-in models. Returns name of the first one as default."""
import requests
first = None
for model in BUILTIN_MODELS:
name = model["name"]
dest = MODELS_DIR / name
if dest.exists() and list(dest.glob("*.pth")):
logger.info("Model already present: %s", name)
else:
logger.info("Downloading model: %s ...", name)
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
r = requests.get(model["url"], stream=True, timeout=300)
r.raise_for_status()
for chunk in r.iter_content(8192):
tmp.write(chunk)
tmp_path = tmp.name
_extract_zip(tmp_path, name)
os.unlink(tmp_path)
logger.info("Model ready: %s", name)
if first is None:
first = name
return first
# ── Upload handler ────────────────────────────────────────────────────────────
def upload_model(zip_file, model_name):
if not zip_file:
return "⚠️ No file provided.", None, None
name = (model_name or "").strip() or Path(zip_file).stem
try:
_extract_zip(zip_file, name)
models = list_models()
return (
f"βœ… Model **{name}** loaded successfully.",
{"choices": models, "value": name, "__type__": "update"},
{"value": [[m] for m in models], "__type__": "update"},
)
except Exception as exc:
logger.exception("Model upload failed")
return f"❌ Error: {exc}", None, None
# ── Refresh handler ───────────────────────────────────────────────────────────
def refresh_models():
models = list_models()
return (
{"value": [[m] for m in models], "__type__": "update"},
{"choices": models, "__type__": "update"},
)
# ── Autotune visibility toggle ────────────────────────────────────────────────
def toggle_autotune(enabled):
return {"visible": enabled, "__type__": "update"}
# ── Conversion ────────────────────────────────────────────────────────────────
def convert(
audio_mic, audio_file, model_name,
pitch, f0_method,
index_rate, protect, volume_envelope,
clean_audio, clean_strength,
split_audio, autotune, autotune_strength,
output_format,
):
audio_input = audio_mic or audio_file
if audio_input is None:
return "⚠️ Please record or upload audio first.", None
if not model_name:
return "⚠️ No model selected.", None
try:
model_path, index_path = _pth_and_index(model_name)
except FileNotFoundError as exc:
return f"❌ {exc}", None
# Opus needs post-processing β€” always generate WAV first, then re-encode
is_opus = output_format.upper() == "OPUS"
engine_format = "WAV" if is_opus else output_format
ts = int(__import__("time").time())
wav_path = str(OUTPUT_DIR / f"output-{ts}.wav")
out_path = str(OUTPUT_DIR / (f"output-{ts}.opus" if is_opus else f"output-{ts}.{output_format.lower()}"))
try:
vc = _get_vc()
vc.convert_audio(
audio_input_path=audio_input,
audio_output_path=wav_path,
model_path=model_path,
index_path=index_path,
pitch=pitch,
f0_method=f0_method,
index_rate=index_rate,
volume_envelope=volume_envelope,
protect=protect,
split_audio=split_audio,
f0_autotune=autotune,
f0_autotune_strength=autotune_strength,
clean_audio=clean_audio,
clean_strength=clean_strength,
export_format=engine_format,
)
if is_opus:
# Encode to Opus at 4800 bps via ffmpeg (pydub shells out to ffmpeg)
import subprocess
subprocess.run(
[
"ffmpeg", "-y",
"-i", wav_path,
"-c:a", "libopus",
"-b:a", "64000",
"-vbr", "off",
"-ar", "48000",
out_path,
],
check=True,
capture_output=True,
)
return "βœ… Conversion complete!", out_path
except Exception as exc:
logger.exception("Conversion error")
return f"❌ Error: {exc}", None
# ── Startup ───────────────────────────────────────────────────────────────────
_startup_status = ""
_default_model = ""
try:
download_predictors()
download_embedders()
_default_model = download_builtin_models()
_startup_status = f"βœ… Ready  Β·  {DEVICE_LABEL}"
except Exception as _e:
_startup_status = f"⚠️ Default model unavailable: {_e}  Β·  {DEVICE_LABEL}"
logger.warning("Could not download default model: %s", _e)
_initial_models = list_models()
_initial_value = _default_model if _default_model in _initial_models else (
_initial_models[0] if _initial_models else None
)
# ── Gradio 6 UI ───────────────────────────────────────────────────────────────
def get_logs() -> str:
"""Return buffered log lines as a single string for display."""
return "\n".join(_LOG_BUFFER) if _LOG_BUFFER else "(no logs yet)"
import gradio as gr
_CSS = """
#header { text-align: center; padding: 20px 0 8px; }
#header h1 { font-size: 2rem; margin: 0; }
#header p { opacity: .65; margin: 4px 0 0; }
#status { text-align: center; font-size: .82rem; opacity: .7; margin-bottom: 8px; }
footer { display: none !important; }
"""
# In Gradio 6, css goes back into gr.Blocks()
with gr.Blocks(title="RVC Voice Conversion", css=_CSS) as demo:
gr.HTML(f"""
<div id="header">
<h1>πŸŽ™οΈ RVC Voice Conversion</h1>
<p>Retrieval-Based Voice Conversion Β· record or upload Β· custom models Β· GPU/CPU auto</p>
</div>
<p id="status">{_startup_status}</p>
""")
with gr.Tabs():
# ── TAB 1: Convert ────────────────────────────────────────────────────
with gr.Tab("🎀 Convert"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ”Š Input Audio")
with gr.Tabs():
with gr.Tab("πŸŽ™οΈ Microphone"):
inp_mic = gr.Audio(
sources=["microphone"],
type="filepath",
label="Record",
)
with gr.Tab("πŸ“ Upload File"):
inp_file = gr.Audio(
sources=["upload"],
type="filepath",
label="Upload audio (wav / mp3 / flac / ogg …)",
)
gr.Markdown("### πŸ€– Model")
model_dd = gr.Dropdown(
choices=_initial_models,
value=_initial_value,
label="Active Voice Model",
interactive=True,
)
gr.Markdown("### 🎚️ Basic Settings")
pitch_sl = gr.Slider(
minimum=-24, maximum=24, value=0, step=1,
label="Pitch Shift (semitones)",
info="0 = unchanged Β· positive = higher Β· negative = lower",
)
f0_radio = gr.Radio(
choices=["rmvpe", "fcpe", "crepe", "crepe-tiny"],
value="rmvpe",
label="Pitch Extraction Method",
info="rmvpe = fastest & accurate Β· crepe = highest quality (slower)",
)
with gr.Column(scale=1):
gr.Markdown("### βš™οΈ Advanced Settings")
with gr.Accordion("Expand advanced options", open=False):
index_rate_sl = gr.Slider(
0.0, 1.0, value=0.75, step=0.05,
label="Index Rate",
info="How strongly the FAISS index influences timbre (0 = off)",
)
protect_sl = gr.Slider(
0.0, 0.5, value=0.5, step=0.01,
label="Protect Consonants",
info="Protects unvoiced consonants β€” 0.5 = max protection",
)
vol_env_sl = gr.Slider(
0.0, 1.0, value=1.0, step=0.1,
label="Volume Envelope Mix",
info="1 = use input volume shape Β· 0 = use model output volume",
)
with gr.Row():
clean_cb = gr.Checkbox(value=False, label="Noise Reduction")
clean_sl = gr.Slider(
0.0, 1.0, value=0.5, step=0.05,
label="Reduction Strength",
)
with gr.Row():
split_cb = gr.Checkbox(value=False, label="Split Long Audio")
autotune_cb = gr.Checkbox(value=False, label="Autotune")
autotune_sl = gr.Slider(
0.0, 1.0, value=1.0, step=0.05,
label="Autotune Strength",
visible=False,
)
autotune_cb.change(
fn=toggle_autotune,
inputs=autotune_cb,
outputs=autotune_sl,
)
fmt_radio = gr.Radio(
choices=["WAV", "MP3", "FLAC", "OPUS"],
value="WAV",
label="Output Format",
info="OPUS = small file size (~64 kbps, Telegram/Discord quality)",
)
convert_btn = gr.Button("πŸš€ Convert Voice", variant="primary")
gr.Markdown("### 🎧 Output")
out_audio = gr.Audio(label="Result", type="filepath", interactive=False)
out_status = gr.Textbox(label="Status", interactive=False, lines=1)
# convert_btn wired below after all tabs
# ── TAB 2: Models ─────────────────────────────────────────────────────
with gr.Tab("πŸ“¦ Models"):
gr.Markdown("""
### Upload a Custom RVC Model
Provide a **`.zip`** containing:
- **`model.pth`** β€” weights (required)
- **`model.index`** β€” FAISS index (optional, improves voice matching)
Built-in models pre-downloaded on startup: **Vestia Zeta v1**, **Vestia Zeta v2**, **Ayunda Risu**, **Gawr Gura**.
""")
with gr.Row():
with gr.Column(scale=1):
up_zip = gr.File(label="Model ZIP", file_types=[".zip"])
up_name = gr.Textbox(
label="Model Name",
placeholder="Leave blank to use zip filename",
)
up_btn = gr.Button("πŸ“€ Load Model", variant="primary")
up_status = gr.Textbox(label="Status", interactive=False, lines=2)
with gr.Column(scale=1):
gr.Markdown("### Loaded Models")
models_table = gr.Dataframe(
col_count=(1, "fixed"),
value=[[m] for m in _initial_models],
interactive=False,
label="",
)
refresh_btn = gr.Button("πŸ”„ Refresh")
up_btn.click(
fn=upload_model,
inputs=[up_zip, up_name],
outputs=[up_status, model_dd, models_table],
)
refresh_btn.click(
fn=refresh_models,
outputs=[models_table, model_dd],
)
# ── TAB 3: Logs ───────────────────────────────────────────────────────
with gr.Tab("πŸ“‹ Logs"):
gr.Markdown("Live log output from the conversion engine. Click **Refresh** to update.")
logs_box = gr.Textbox(
value=get_logs,
label="",
lines=20,
max_lines=20,
interactive=False,
autoscroll=True,
)
with gr.Row():
refresh_logs_btn = gr.Button("πŸ”„ Refresh Logs")
clear_logs_btn = gr.Button("πŸ—‘οΈ Clear")
refresh_logs_btn.click(fn=get_logs, outputs=logs_box)
def clear_logs():
_LOG_BUFFER.clear()
return ""
clear_logs_btn.click(fn=clear_logs, outputs=logs_box)
# ── TAB 4: Help ───────────────────────────────────────────────────────
with gr.Tab("ℹ️ Help"):
gr.Markdown(f"""
## How it works
RVC (Retrieval-Based Voice Conversion) transforms a voice recording to sound
like a target speaker using only that speaker's model file.
---
## Quick Guide
1. Open the **Convert** tab
2. **Record** via microphone or **upload** an audio file (wav, mp3, flac, ogg …)
3. Choose a **model** β€” *zetaTest* is pre-loaded automatically
4. Set **Pitch Shift** if needed (e.g. male β†’ female: try +12 semitones)
5. Click **πŸš€ Convert Voice**
---
## Pitch Extraction Methods
| Method | Speed | Quality | Best for |
|---|---|---|---|
| **rmvpe** | ⚑⚑⚑ | β˜…β˜…β˜…β˜… | General use |
| **fcpe** | ⚑⚑ | β˜…β˜…β˜…β˜… | Singing |
| **crepe** | ⚑ | β˜…β˜…β˜…β˜…β˜… | Highest quality |
| **crepe-tiny** | ⚑⚑ | β˜…β˜…β˜… | Low resource |
---
## Advanced Settings
| Setting | Description |
|---|---|
| **Index Rate** | Influence of FAISS index on output timbre (0.75 recommended) |
| **Protect Consonants** | Prevents artefacts on consonants (0.5 = max) |
| **Volume Envelope Mix** | 1 = preserve input loudness Β· 0 = model controls loudness |
| **Noise Reduction** | Removes background noise before conversion |
| **Split Long Audio** | Chunks audio for recordings > 60 s |
| **Autotune** | Snaps pitch to nearest musical note |
---
**Device:** `{DEVICE_LABEL}`
---
## Credits
Engine: [Ultimate RVC](https://github.com/JackismyShephard/ultimate-rvc)
Default model: [zetaTest by megaaziib](https://huggingface.co/megaaziib/my-rvc-models-collection)
""")
# Wire convert button here β€” after all tabs so logs_box is defined
convert_btn.click(
fn=lambda *a: (*convert(*a), get_logs()),
inputs=[
inp_mic, inp_file, model_dd,
pitch_sl, f0_radio,
index_rate_sl, protect_sl, vol_env_sl,
clean_cb, clean_sl,
split_cb, autotune_cb, autotune_sl,
fmt_radio,
],
outputs=[out_status, out_audio, logs_box],
)
# ── Launch ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=int(os.getenv("PORT", 7860)),
)