| |
| """ |
| VYNL RVC Voice Cloning |
| Voice conversion using RVC (Retrieval-based Voice Conversion) |
| Train custom voice models and apply voice conversion. |
| |
| Copyright (c) 2024-2026 Robert T. Lackey. All rights reserved. |
| """ |
|
|
| import os |
| import sys |
| import tempfile |
| import shutil |
| import subprocess |
| from pathlib import Path |
| from typing import Optional, Tuple, List, Dict |
| from datetime import datetime |
| import json |
|
|
| |
| try: |
| import spaces |
| HAS_ZEROGPU = True |
| except ImportError: |
| HAS_ZEROGPU = False |
| class spaces: |
| @staticmethod |
| def GPU(duration=60): |
| def decorator(func): |
| return func |
| return decorator |
|
|
| |
| |
| |
|
|
| VYNL_DIR = Path(os.environ.get('VYNL_DIR', Path.home() / '.vynl_rvc')) |
| VYNL_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| MODELS_DIR = VYNL_DIR / 'models' |
| TRAINING_DIR = VYNL_DIR / 'training' |
| DATASET_DIR = VYNL_DIR / 'datasets' |
| OUTPUT_DIR = VYNL_DIR / 'output' |
|
|
| for d in [MODELS_DIR, TRAINING_DIR, DATASET_DIR, OUTPUT_DIR]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| |
| |
| |
|
|
| HAS_RVC = False |
| RVC_PATH = None |
|
|
| def check_rvc_installation() -> Tuple[bool, str]: |
| """Check if RVC is installed and return path""" |
| global HAS_RVC, RVC_PATH |
|
|
| |
| possible_paths = [ |
| Path.home() / 'Retrieval-based-Voice-Conversion-WebUI', |
| Path.home() / 'RVC', |
| Path('/opt/RVC'), |
| Path('/app/RVC'), |
| VYNL_DIR / 'rvc' |
| ] |
|
|
| for p in possible_paths: |
| if p.exists() and (p / 'infer_cli.py').exists(): |
| RVC_PATH = p |
| HAS_RVC = True |
| return True, str(p) |
|
|
| |
| try: |
| import rvc |
| HAS_RVC = True |
| return True, "rvc-python package" |
| except ImportError: |
| pass |
|
|
| return False, "RVC not found. Please install RVC or rvc-python package." |
|
|
| |
| |
| |
|
|
| try: |
| import librosa |
| import soundfile as sf |
| import numpy as np |
| HAS_AUDIO = True |
| except ImportError: |
| HAS_AUDIO = False |
|
|
| def preprocess_audio( |
| input_path: str, |
| output_path: str = None, |
| target_sr: int = 16000, |
| normalize: bool = True, |
| trim_silence: bool = True, |
| max_duration: float = None |
| ) -> str: |
| """Preprocess audio for RVC training/inference""" |
| if not HAS_AUDIO: |
| return input_path |
|
|
| |
| y, sr = librosa.load(input_path, sr=None) |
|
|
| |
| if sr != target_sr: |
| y = librosa.resample(y, orig_sr=sr, target_sr=target_sr) |
|
|
| |
| if trim_silence: |
| y, _ = librosa.effects.trim(y, top_db=20) |
|
|
| |
| if normalize: |
| y = librosa.util.normalize(y) |
|
|
| |
| if max_duration and len(y) / target_sr > max_duration: |
| y = y[:int(max_duration * target_sr)] |
|
|
| |
| if output_path is None: |
| output_path = tempfile.mktemp(suffix='.wav') |
|
|
| sf.write(output_path, y, target_sr) |
| return output_path |
|
|
| def split_audio_for_training( |
| input_path: str, |
| output_dir: str, |
| segment_duration: float = 10.0, |
| overlap: float = 1.0, |
| min_duration: float = 3.0 |
| ) -> List[str]: |
| """Split audio into segments for training""" |
| if not HAS_AUDIO: |
| return [input_path] |
|
|
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| y, sr = librosa.load(input_path, sr=16000) |
| total_duration = len(y) / sr |
|
|
| if total_duration < min_duration: |
| |
| out_path = output_dir / f"segment_000.wav" |
| sf.write(out_path, y, sr) |
| return [str(out_path)] |
|
|
| segments = [] |
| segment_samples = int(segment_duration * sr) |
| hop_samples = int((segment_duration - overlap) * sr) |
|
|
| for i, start in enumerate(range(0, len(y) - int(min_duration * sr), hop_samples)): |
| end = start + segment_samples |
| segment = y[start:end] |
|
|
| if len(segment) / sr >= min_duration: |
| out_path = output_dir / f"segment_{i:03d}.wav" |
| sf.write(out_path, segment, sr) |
| segments.append(str(out_path)) |
|
|
| return segments |
|
|
| |
| |
| |
|
|
| class VoiceModelRegistry: |
| """Registry for voice models""" |
|
|
| def __init__(self): |
| self.registry_file = MODELS_DIR / 'registry.json' |
| self.models = self._load_registry() |
|
|
| def _load_registry(self) -> Dict: |
| if self.registry_file.exists(): |
| try: |
| return json.loads(self.registry_file.read_text()) |
| except: |
| pass |
| return {} |
|
|
| def _save_registry(self): |
| self.registry_file.write_text(json.dumps(self.models, indent=2)) |
|
|
| def register_model( |
| self, |
| name: str, |
| model_path: str, |
| index_path: str = None, |
| description: str = "", |
| voice_type: str = "custom", |
| trained_by: str = "", |
| sample_rate: int = 40000 |
| ) -> str: |
| """Register a new voice model""" |
| model_id = f"voice_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
| |
| model_dest = MODELS_DIR / model_id |
| model_dest.mkdir(exist_ok=True) |
|
|
| model_file = model_dest / 'model.pth' |
| shutil.copy2(model_path, model_file) |
|
|
| index_file = None |
| if index_path and os.path.exists(index_path): |
| index_file = model_dest / 'index.index' |
| shutil.copy2(index_path, index_file) |
|
|
| self.models[model_id] = { |
| 'id': model_id, |
| 'name': name, |
| 'description': description, |
| 'type': voice_type, |
| 'trained_by': trained_by, |
| 'model_path': str(model_file), |
| 'index_path': str(index_file) if index_file else None, |
| 'sample_rate': sample_rate, |
| 'created': datetime.now().isoformat() |
| } |
|
|
| self._save_registry() |
| return model_id |
|
|
| def get_model(self, model_id: str) -> Optional[Dict]: |
| return self.models.get(model_id) |
|
|
| def list_models(self) -> List[Dict]: |
| return list(self.models.values()) |
|
|
| def delete_model(self, model_id: str) -> bool: |
| if model_id in self.models: |
| model_dir = MODELS_DIR / model_id |
| if model_dir.exists(): |
| shutil.rmtree(model_dir) |
| del self.models[model_id] |
| self._save_registry() |
| return True |
| return False |
|
|
| |
| |
| |
|
|
| PRESET_VOICES = { |
| "male_tenor": { |
| "name": "Male Tenor", |
| "description": "Standard male tenor voice", |
| "pitch_shift": 0, |
| "formant_shift": 0 |
| }, |
| "male_bass": { |
| "name": "Male Bass", |
| "description": "Deep male bass voice", |
| "pitch_shift": -5, |
| "formant_shift": -2 |
| }, |
| "female_alto": { |
| "name": "Female Alto", |
| "description": "Standard female alto voice", |
| "pitch_shift": 12, |
| "formant_shift": 2 |
| }, |
| "female_soprano": { |
| "name": "Female Soprano", |
| "description": "High female soprano voice", |
| "pitch_shift": 15, |
| "formant_shift": 3 |
| }, |
| "child": { |
| "name": "Child Voice", |
| "description": "Young child voice", |
| "pitch_shift": 18, |
| "formant_shift": 4 |
| }, |
| "robot": { |
| "name": "Robot", |
| "description": "Robotic voice effect", |
| "pitch_shift": 0, |
| "formant_shift": 0, |
| "effects": ["vocoder"] |
| } |
| } |
|
|
| |
| |
| |
|
|
| @spaces.GPU(duration=120) |
| def convert_voice( |
| input_audio: str, |
| model_id: str = None, |
| model_path: str = None, |
| index_path: str = None, |
| pitch_shift: int = 0, |
| index_rate: float = 0.5, |
| filter_radius: int = 3, |
| resample_sr: int = 0, |
| rms_mix_rate: float = 0.25, |
| protect: float = 0.33, |
| progress_callback=None |
| ) -> Tuple[Optional[str], str]: |
| """ |
| Convert voice using RVC model (GPU accelerated). |
| |
| Args: |
| input_audio: Path to input audio file |
| model_id: Model ID from registry (or use model_path) |
| model_path: Direct path to model file |
| index_path: Path to index file (optional) |
| pitch_shift: Pitch shift in semitones |
| index_rate: Index rate (0-1) |
| filter_radius: Filter radius |
| resample_sr: Resample rate (0 = auto) |
| rms_mix_rate: RMS mix rate |
| protect: Protect voiceless consonants |
| |
| Returns: |
| (output_path, status_message) |
| """ |
| has_rvc, rvc_info = check_rvc_installation() |
|
|
| if not has_rvc: |
| return None, rvc_info |
|
|
| |
| if model_id: |
| registry = VoiceModelRegistry() |
| model_info = registry.get_model(model_id) |
| if model_info: |
| model_path = model_info.get('model_path') |
| index_path = model_info.get('index_path') |
| else: |
| return None, f"Model {model_id} not found" |
|
|
| if not model_path or not os.path.exists(model_path): |
| return None, "No valid model specified" |
|
|
| try: |
| if progress_callback: |
| progress_callback(0.1, "Loading model...") |
|
|
| |
| try: |
| from rvc_python import RVC |
| rvc = RVC(model_path=model_path, index_path=index_path) |
|
|
| if progress_callback: |
| progress_callback(0.3, "Converting voice...") |
|
|
| output_path = tempfile.mktemp(suffix='.wav') |
| rvc.convert( |
| input_path=input_audio, |
| output_path=output_path, |
| pitch_shift=pitch_shift, |
| index_rate=index_rate |
| ) |
|
|
| if progress_callback: |
| progress_callback(1.0, "Done!") |
|
|
| return output_path, "Voice conversion complete" |
|
|
| except ImportError: |
| pass |
|
|
| |
| if RVC_PATH: |
| if progress_callback: |
| progress_callback(0.2, "Using RVC CLI...") |
|
|
| output_path = tempfile.mktemp(suffix='.wav') |
|
|
| cmd = [ |
| sys.executable, |
| str(RVC_PATH / 'infer_cli.py'), |
| '--input', input_audio, |
| '--output', output_path, |
| '--model', model_path, |
| '--pitch', str(pitch_shift), |
| '--index_rate', str(index_rate), |
| '--filter_radius', str(filter_radius), |
| '--rms_mix_rate', str(rms_mix_rate), |
| '--protect', str(protect) |
| ] |
|
|
| if index_path: |
| cmd.extend(['--index', index_path]) |
|
|
| result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
| if result.returncode != 0: |
| return None, f"RVC error: {result.stderr}" |
|
|
| if progress_callback: |
| progress_callback(1.0, "Done!") |
|
|
| return output_path, "Voice conversion complete" |
|
|
| return None, "No RVC backend available" |
|
|
| except Exception as e: |
| return None, f"Conversion error: {str(e)}" |
|
|
| |
| |
| |
|
|
| @spaces.GPU(duration=600) |
| def train_voice_model( |
| name: str, |
| training_files: List[str], |
| description: str = "", |
| epochs: int = 100, |
| batch_size: int = 8, |
| sample_rate: int = 40000, |
| user_email: str = "", |
| progress_callback=None |
| ) -> Tuple[Optional[str], str]: |
| """ |
| Train a custom RVC voice model (GPU accelerated). |
| |
| Args: |
| name: Name for the voice model |
| training_files: List of audio files for training |
| description: Model description |
| epochs: Number of training epochs |
| batch_size: Training batch size |
| sample_rate: Target sample rate |
| user_email: User email for tracking |
| progress_callback: Progress callback |
| |
| Returns: |
| (model_id, status_message) |
| """ |
| has_rvc, rvc_info = check_rvc_installation() |
|
|
| if not has_rvc: |
| |
| if progress_callback: |
| progress_callback(0.5, "RVC not installed - simulating training...") |
| progress_callback(1.0, "Training simulation complete") |
| return None, f"RVC not installed: {rvc_info}\nInstall with: pip install rvc-python" |
|
|
| try: |
| |
| train_id = f"train_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| train_dir = TRAINING_DIR / train_id |
| train_dir.mkdir(parents=True, exist_ok=True) |
|
|
| if progress_callback: |
| progress_callback(0.1, "Preparing training data...") |
|
|
| |
| processed_dir = train_dir / 'processed' |
| processed_dir.mkdir(exist_ok=True) |
|
|
| all_segments = [] |
| for i, tf in enumerate(training_files): |
| if os.path.exists(tf): |
| |
| preprocessed = preprocess_audio( |
| tf, |
| target_sr=sample_rate, |
| normalize=True, |
| trim_silence=True |
| ) |
|
|
| |
| segments = split_audio_for_training( |
| preprocessed, |
| processed_dir / f"file_{i:03d}", |
| segment_duration=10.0 |
| ) |
| all_segments.extend(segments) |
|
|
| if not all_segments: |
| return None, "No valid training audio found" |
|
|
| if progress_callback: |
| progress_callback(0.2, f"Prepared {len(all_segments)} training segments") |
|
|
| |
| try: |
| from rvc_python import RVC |
|
|
| if progress_callback: |
| progress_callback(0.3, "Starting model training...") |
|
|
| model_output = train_dir / 'model.pth' |
| index_output = train_dir / 'index.index' |
|
|
| |
| |
| rvc = RVC() |
| rvc.train( |
| dataset_path=str(processed_dir), |
| model_name=name, |
| epochs=epochs, |
| batch_size=batch_size, |
| output_path=str(model_output) |
| ) |
|
|
| if progress_callback: |
| progress_callback(0.9, "Registering model...") |
|
|
| |
| registry = VoiceModelRegistry() |
| model_id = registry.register_model( |
| name=name, |
| model_path=str(model_output), |
| index_path=str(index_output) if index_output.exists() else None, |
| description=description, |
| voice_type="custom", |
| trained_by=user_email, |
| sample_rate=sample_rate |
| ) |
|
|
| if progress_callback: |
| progress_callback(1.0, "Training complete!") |
|
|
| return model_id, f"Model trained and saved: {model_id}" |
|
|
| except ImportError: |
| return None, "RVC training requires rvc-python package" |
|
|
| except Exception as e: |
| return None, f"Training error: {str(e)}" |
|
|
| |
| |
| |
|
|
| class VoiceDataset: |
| """Manage voice training datasets""" |
|
|
| def __init__(self, user_email: str = "demo"): |
| self.user_email = user_email |
| self.user_dir = DATASET_DIR / user_email.replace('@', '_at_').replace('.', '_') |
| self.user_dir.mkdir(parents=True, exist_ok=True) |
| self.index_file = self.user_dir / 'datasets.json' |
| self.datasets = self._load_index() |
|
|
| def _load_index(self) -> Dict: |
| if self.index_file.exists(): |
| try: |
| return json.loads(self.index_file.read_text()) |
| except: |
| pass |
| return {} |
|
|
| def _save_index(self): |
| self.index_file.write_text(json.dumps(self.datasets, indent=2)) |
|
|
| def create_dataset(self, name: str, description: str = "") -> str: |
| """Create a new voice dataset""" |
| dataset_id = f"ds_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| dataset_dir = self.user_dir / dataset_id |
| dataset_dir.mkdir(exist_ok=True) |
|
|
| self.datasets[dataset_id] = { |
| 'id': dataset_id, |
| 'name': name, |
| 'description': description, |
| 'created': datetime.now().isoformat(), |
| 'files': [], |
| 'total_duration': 0 |
| } |
| self._save_index() |
| return dataset_id |
|
|
| def add_audio(self, dataset_id: str, audio_path: str) -> bool: |
| """Add audio file to dataset""" |
| if dataset_id not in self.datasets: |
| return False |
|
|
| dataset_dir = self.user_dir / dataset_id |
| filename = Path(audio_path).name |
| dest = dataset_dir / filename |
|
|
| |
| preprocessed = preprocess_audio(audio_path, str(dest)) |
|
|
| |
| duration = 0 |
| if HAS_AUDIO: |
| y, sr = librosa.load(preprocessed, sr=None) |
| duration = len(y) / sr |
|
|
| self.datasets[dataset_id]['files'].append({ |
| 'filename': filename, |
| 'path': str(dest), |
| 'duration': duration, |
| 'added': datetime.now().isoformat() |
| }) |
| self.datasets[dataset_id]['total_duration'] += duration |
| self._save_index() |
|
|
| return True |
|
|
| def get_dataset(self, dataset_id: str) -> Optional[Dict]: |
| return self.datasets.get(dataset_id) |
|
|
| def list_datasets(self) -> List[Dict]: |
| return list(self.datasets.values()) |
|
|
| def get_training_files(self, dataset_id: str) -> List[str]: |
| """Get list of file paths for training""" |
| ds = self.datasets.get(dataset_id) |
| if ds: |
| return [f['path'] for f in ds['files'] if os.path.exists(f['path'])] |
| return [] |
|
|
| |
| |
| |
|
|
| def clone_voice( |
| source_audio: str, |
| target_voice: str, |
| pitch_shift: int = 0, |
| progress_callback=None |
| ) -> Tuple[Optional[str], str]: |
| """ |
| Clone voice from target and apply to source audio. |
| |
| Args: |
| source_audio: Audio to convert |
| target_voice: Voice model ID or preset name |
| pitch_shift: Pitch adjustment in semitones |
| progress_callback: Progress callback |
| |
| Returns: |
| (output_path, status_message) |
| """ |
| |
| if target_voice in PRESET_VOICES: |
| preset = PRESET_VOICES[target_voice] |
| pitch_shift += preset.get('pitch_shift', 0) |
| |
| if HAS_AUDIO: |
| y, sr = librosa.load(source_audio, sr=None) |
| y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=pitch_shift) |
| output_path = tempfile.mktemp(suffix='.wav') |
| sf.write(output_path, y_shifted, sr) |
| return output_path, f"Applied {preset['name']} preset" |
|
|
| |
| return convert_voice( |
| input_audio=source_audio, |
| model_id=target_voice, |
| pitch_shift=pitch_shift, |
| progress_callback=progress_callback |
| ) |
|
|
| |
| |
| |
|
|
| _model_registry = None |
| _voice_dataset = None |
|
|
| def get_model_registry() -> VoiceModelRegistry: |
| global _model_registry |
| if _model_registry is None: |
| _model_registry = VoiceModelRegistry() |
| return _model_registry |
|
|
| def get_voice_dataset(user_email: str = "demo") -> VoiceDataset: |
| return VoiceDataset(user_email) |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("VYNL RVC Voice Cloning") |
| print("=" * 50) |
|
|
| has_rvc, info = check_rvc_installation() |
| print(f"RVC available: {has_rvc}") |
| print(f"Info: {info}") |
|
|
| print("\nPreset voices:") |
| for key, preset in PRESET_VOICES.items(): |
| print(f" - {key}: {preset['name']}") |
|
|
| print("\nRegistered models:") |
| registry = get_model_registry() |
| for model in registry.list_models(): |
| print(f" - {model['id']}: {model['name']}") |
|
|