| import gradio as gr | |
| import os | |
| import sys | |
| import subprocess | |
| import shutil | |
| from pathlib import Path | |
| import zipfile | |
| import json | |
| class RealRVCTrainer: | |
| def __init__(self): | |
| self.rvc_dir = Path("./Retrieval-based-Voice-Conversion-WebUI") | |
| self.workspace = Path("./workspace") | |
| self.workspace.mkdir(exist_ok=True) | |
| self.setup_complete = False | |
| def install_rvc(self, progress=gr.Progress()): | |
| """Clone and setup official RVC repository""" | |
| try: | |
| progress(0.1, desc="Cloning RVC repository...") | |
| if self.rvc_dir.exists(): | |
| return "β RVC already installed!" | |
| subprocess.run([ | |
| "git", "clone", | |
| "https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI.git" | |
| ], check=True) | |
| progress(0.3, desc="Installing dependencies...") | |
| core_packages = [ | |
| "torch", "torchaudio", "torchvision", "numpy", "scipy", | |
| "librosa", "soundfile", "faiss-cpu", "praat-parselmouth", | |
| "pyworld", "scikit-learn", "numba", "resampy", "pydub" | |
| ] | |
| for pkg in core_packages: | |
| try: | |
| subprocess.run([sys.executable, "-m", "pip", "install", "-q", pkg], timeout=60) | |
| except: | |
| pass | |
| progress(0.6, desc="Downloading pretrained models...") | |
| pretrained_dir = self.rvc_dir / "pretrained" | |
| pretrained_dir.mkdir(exist_ok=True) | |
| models_to_download = [ | |
| ("https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/main/pretrained/f0G40k.pth", "f0G40k.pth"), | |
| ("https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/main/pretrained/f0D40k.pth", "f0D40k.pth"), | |
| ("https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/main/hubert_base.pt", "hubert_base.pt"), | |
| ] | |
| for idx, (url, filename) in enumerate(models_to_download): | |
| progress(0.6 + (idx / len(models_to_download)) * 0.3, desc=f"Downloading {filename}...") | |
| output_path = pretrained_dir / filename | |
| if not output_path.exists(): | |
| try: | |
| subprocess.run(["wget", "-q", "-O", str(output_path), url], timeout=300) | |
| except: | |
| try: | |
| subprocess.run(["curl", "-L", "-o", str(output_path), url], timeout=300) | |
| except: | |
| import requests | |
| response = requests.get(url, stream=True, timeout=300) | |
| with open(output_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| self.setup_complete = True | |
| progress(1.0, desc="Setup complete!") | |
| return "β RVC Installation Complete!\n\nπ¦ Installed:\n- Official RVC codebase\n- Pre-trained models\n- All dependencies\n\nπ Ready to train!" | |
| except Exception as e: | |
| return f"β Installation failed: {str(e)}\n\nπ§ Try manual installation or use Google Colab." | |
| def prepare_dataset(self, audio_files, model_name, progress=gr.Progress()): | |
| """Prepare dataset in RVC format""" | |
| if not audio_files: | |
| return "β Please upload audio files" | |
| if not model_name: | |
| model_name = "my_model" | |
| try: | |
| progress(0.1, desc="Creating dataset structure...") | |
| dataset_path = self.rvc_dir / "dataset" / model_name | |
| dataset_path.mkdir(parents=True, exist_ok=True) | |
| progress(0.3, desc="Copying audio files...") | |
| for idx, audio_file in enumerate(audio_files): | |
| dest = dataset_path / f"{idx:04d}_{Path(audio_file.name).name}" | |
| shutil.copy2(audio_file.name, dest) | |
| progress(0.3 + (idx / len(audio_files)) * 0.6, desc=f"Copied {idx+1}/{len(audio_files)} files") | |
| progress(1.0, desc="Dataset ready!") | |
| return f"β Dataset Prepared!\n\nπ Location: {dataset_path}\nπ Files: {len(audio_files)}\nπ€ Model: {model_name}\n\nβ Ready for preprocessing!" | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| def preprocess_data(self, model_name, sample_rate, progress=gr.Progress()): | |
| """Run RVC preprocessing""" | |
| try: | |
| progress(0.1, desc="Starting preprocessing...") | |
| dataset_path = self.rvc_dir / "dataset" / model_name | |
| if not dataset_path.exists(): | |
| return "β Dataset not found. Please prepare dataset first." | |
| preprocess_script = self.rvc_dir / "infer" / "modules" / "train" / "preprocess.py" | |
| if not preprocess_script.exists(): | |
| preprocess_script = self.rvc_dir / "trainset_preprocess_pipeline_print.py" | |
| progress(0.3, desc="Preprocessing audio...") | |
| cmd = [sys.executable, str(preprocess_script), str(dataset_path), str(sample_rate), "2"] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| progress(1.0, desc="Preprocessing complete!") | |
| return f"β Preprocessing Complete!\n\nπ΅ Sample Rate: {sample_rate}Hz\nπ Features extracted\nπ Ready for training!" | |
| except Exception as e: | |
| return f"β Preprocessing failed: {str(e)}" | |
| def train_model(self, model_name, epochs, batch_size, sample_rate, progress=gr.Progress()): | |
| """Run actual RVC training""" | |
| try: | |
| progress(0.05, desc="Initializing training...") | |
| log_dir = self.rvc_dir / "logs" / model_name | |
| log_dir.mkdir(parents=True, exist_ok=True) | |
| progress(0.1, desc="Starting RVC training...") | |
| train_script = self.rvc_dir / "infer" / "modules" / "train" / "train.py" | |
| if not train_script.exists(): | |
| train_script = self.rvc_dir / "train_nsf_sim_cache_sid_load_pretrain.py" | |
| cmd = [ | |
| sys.executable, str(train_script), | |
| "-e", model_name, "-sr", str(sample_rate), | |
| "-f0", "1", "-bs", str(batch_size), | |
| "-g", "0", "-te", str(epochs), "-se", "10", | |
| "-pg", str(self.rvc_dir / "pretrained" / "f0G40k.pth"), | |
| "-pd", str(self.rvc_dir / "pretrained" / "f0D40k.pth"), | |
| "-l", "0", "-c", "0" | |
| ] | |
| progress(0.2, desc=f"Training {model_name}...") | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| for line in process.stdout: | |
| if "epoch" in line.lower(): | |
| progress(0.2 + 0.6, desc=f"Training: {line.strip()[:50]}") | |
| process.wait() | |
| progress(0.9, desc="Searching for model files...") | |
| possible_paths = [ | |
| log_dir / "weights", log_dir, | |
| self.rvc_dir / "weights" / model_name, | |
| self.rvc_dir / "logs" / model_name | |
| ] | |
| model_files = [] | |
| index_files = [] | |
| for path in possible_paths: | |
| if path.exists(): | |
| model_files.extend(list(path.glob("**/*.pth"))) | |
| index_files.extend(list(path.glob("**/*.index"))) | |
| if model_files or index_files: | |
| output_dir = self.workspace / model_name | |
| output_dir.mkdir(exist_ok=True) | |
| files_info = [] | |
| if model_files: | |
| latest_model = max(model_files, key=lambda p: p.stat().st_mtime) | |
| shutil.copy2(latest_model, output_dir / f"{model_name}.pth") | |
| model_size = latest_model.stat().st_size / (1024*1024) | |
| files_info.append(f"- {model_name}.pth ({model_size:.1f}MB)") | |
| if index_files: | |
| latest_index = max(index_files, key=lambda p: p.stat().st_mtime) | |
| shutil.copy2(latest_index, output_dir / latest_index.name) | |
| files_info.append(f"- {latest_index.name}") | |
| progress(1.0, desc="Training complete!") | |
| return f"β Training Complete!\n\nπ Model: {model_name}\nπ Epochs: {epochs}\n\nπΎ Model Files:\n{chr(10).join(files_info)}\n\nπ Location: {output_dir}\n\nπ Ready to download!" | |
| else: | |
| debug_info = [] | |
| if log_dir.exists(): | |
| debug_info.append(f"Log dir: {log_dir}") | |
| for item in log_dir.rglob("*"): | |
| debug_info.append(f" - {item.relative_to(log_dir)}") | |
| return f"β οΈ Training completed but model files not found.\n\nπ Searched in:\n{chr(10).join([f'- {p}' for p in possible_paths])}\n\nπ Debug:\n{chr(10).join(debug_info)}" | |
| except Exception as e: | |
| return f"β Training failed: {str(e)}" | |
| def package_model(self, model_name): | |
| """Package model for download""" | |
| try: | |
| output_dir = self.workspace / model_name | |
| if not output_dir.exists(): | |
| output_dir = self.rvc_dir / "logs" / model_name / "weights" | |
| if not output_dir.exists(): | |
| return None, "β Model not found" | |
| zip_path = self.workspace / f"{model_name}_RVC.zip" | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file in output_dir.rglob("*"): | |
| if file.is_file() and (file.suffix in ['.pth', '.index', '.json']): | |
| zipf.write(file, file.name) | |
| return str(zip_path), f"β Model packaged: {zip_path.name}" | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| trainer = RealRVCTrainer() | |
| with gr.Blocks(title="Real RVC Training") as demo: | |
| gr.Markdown("# π€ Real RVC Model Training\n### Using Official RVC-Project Implementation\n\nβ οΈ Uses REAL RVC training. Models work on weights.gg!") | |
| with gr.Tab("βοΈ Step 0: Install RVC"): | |
| gr.Markdown("Install official RVC codebase and pretrained models (~200MB)") | |
| install_btn = gr.Button("π¦ Install RVC Components", variant="primary", size="lg") | |
| install_output = gr.Textbox(label="Installation Status", lines=10) | |
| install_btn.click(fn=trainer.install_rvc, outputs=install_output) | |
| with gr.Tab("π Step 1: Prepare Dataset"): | |
| gr.Markdown("Upload voice audio files (10-30 min recommended, WAV/MP3/FLAC)") | |
| model_name_prep = gr.Textbox(label="Model Name", value="my_voice_model") | |
| audio_files = gr.File(label="Upload Audio Files", file_count="multiple", file_types=["audio"]) | |
| prep_btn = gr.Button("π Prepare Dataset", variant="primary") | |
| prep_output = gr.Textbox(label="Status", lines=8) | |
| prep_btn.click(fn=trainer.prepare_dataset, inputs=[audio_files, model_name_prep], outputs=prep_output) | |
| with gr.Tab("π§ Step 2: Preprocess"): | |
| gr.Markdown("Preprocess audio and extract features") | |
| model_name_process = gr.Textbox(label="Model Name", value="my_voice_model") | |
| sample_rate_process = gr.Radio(choices=["40000", "48000"], value="40000", label="Sample Rate") | |
| process_btn = gr.Button("π§ Preprocess Data", variant="primary") | |
| process_output = gr.Textbox(label="Status", lines=8) | |
| process_btn.click(fn=trainer.preprocess_data, inputs=[model_name_process, sample_rate_process], outputs=process_output) | |
| with gr.Tab("π Step 3: Train Model"): | |
| gr.Markdown("Train RVC model (β οΈ CPU training takes hours/days)") | |
| model_name_train = gr.Textbox(label="Model Name", value="my_voice_model") | |
| epochs_train = gr.Slider(minimum=10, maximum=500, value=100, step=10, label="Epochs") | |
| batch_size_train = gr.Slider(minimum=1, maximum=16, value=4, step=1, label="Batch Size") | |
| sample_rate_train = gr.Radio(choices=["40000", "48000"], value="40000", label="Sample Rate") | |
| train_btn = gr.Button("π Start Real Training", variant="primary") | |
| train_output = gr.Textbox(label="Training Status", lines=15) | |
| train_btn.click(fn=trainer.train_model, inputs=[model_name_train, epochs_train, batch_size_train, sample_rate_train], outputs=train_output) | |
| with gr.Tab("π₯ Step 4: Download"): | |
| gr.Markdown("Download your trained RVC model") | |
| model_name_download = gr.Textbox(label="Model Name", value="my_voice_model") | |
| download_btn = gr.Button("π¦ Package Model", variant="primary") | |
| download_file = gr.File(label="Download") | |
| download_status = gr.Textbox(label="Status") | |
| download_btn.click(fn=trainer.package_model, inputs=model_name_download, outputs=[download_file, download_status]) | |
| gr.Markdown("---\n### π Resources\n- [RVC Project](https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI)\n- [Weights.gg](https://weights.gg/)\n\n### β οΈ Important\n- Uses REAL RVC training\n- Models work on weights.gg\n- CPU training is VERY slow\n- Recommended: Google Colab with GPU") | |
| if __name__ == "__main__": | |
| demo.launch() |