vynl / vynl_rvc.py
rlackey's picture
Add ZeroGPU support and Windows local startup
a971d5c
#!/usr/bin/env python3
"""
VYNL RVC Voice Cloning
Voice conversion using RVC (Retrieval-based Voice Conversion)
Train custom voice models and apply voice conversion.
Copyright (c) 2024-2026 Robert T. Lackey. All rights reserved.
"""
import os
import sys
import tempfile
import shutil
import subprocess
from pathlib import Path
from typing import Optional, Tuple, List, Dict
from datetime import datetime
import json
# ZeroGPU support for HuggingFace Spaces
try:
import spaces
HAS_ZEROGPU = True
except ImportError:
HAS_ZEROGPU = False
class spaces:
@staticmethod
def GPU(duration=60):
def decorator(func):
return func
return decorator
# ============================================================================
# CONFIGURATION
# ============================================================================
VYNL_DIR = Path(os.environ.get('VYNL_DIR', Path.home() / '.vynl_rvc'))
VYNL_DIR.mkdir(parents=True, exist_ok=True)
MODELS_DIR = VYNL_DIR / 'models'
TRAINING_DIR = VYNL_DIR / 'training'
DATASET_DIR = VYNL_DIR / 'datasets'
OUTPUT_DIR = VYNL_DIR / 'output'
for d in [MODELS_DIR, TRAINING_DIR, DATASET_DIR, OUTPUT_DIR]:
d.mkdir(parents=True, exist_ok=True)
# ============================================================================
# RVC DETECTION
# ============================================================================
HAS_RVC = False
RVC_PATH = None
def check_rvc_installation() -> Tuple[bool, str]:
"""Check if RVC is installed and return path"""
global HAS_RVC, RVC_PATH
# Check common installation paths
possible_paths = [
Path.home() / 'Retrieval-based-Voice-Conversion-WebUI',
Path.home() / 'RVC',
Path('/opt/RVC'),
Path('/app/RVC'),
VYNL_DIR / 'rvc'
]
for p in possible_paths:
if p.exists() and (p / 'infer_cli.py').exists():
RVC_PATH = p
HAS_RVC = True
return True, str(p)
# Check if rvc-python is installed
try:
import rvc
HAS_RVC = True
return True, "rvc-python package"
except ImportError:
pass
return False, "RVC not found. Please install RVC or rvc-python package."
# ============================================================================
# AUDIO PREPROCESSING
# ============================================================================
try:
import librosa
import soundfile as sf
import numpy as np
HAS_AUDIO = True
except ImportError:
HAS_AUDIO = False
def preprocess_audio(
input_path: str,
output_path: str = None,
target_sr: int = 16000,
normalize: bool = True,
trim_silence: bool = True,
max_duration: float = None
) -> str:
"""Preprocess audio for RVC training/inference"""
if not HAS_AUDIO:
return input_path
# Load audio
y, sr = librosa.load(input_path, sr=None)
# Resample if needed
if sr != target_sr:
y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
# Trim silence
if trim_silence:
y, _ = librosa.effects.trim(y, top_db=20)
# Normalize
if normalize:
y = librosa.util.normalize(y)
# Trim to max duration
if max_duration and len(y) / target_sr > max_duration:
y = y[:int(max_duration * target_sr)]
# Save
if output_path is None:
output_path = tempfile.mktemp(suffix='.wav')
sf.write(output_path, y, target_sr)
return output_path
def split_audio_for_training(
input_path: str,
output_dir: str,
segment_duration: float = 10.0,
overlap: float = 1.0,
min_duration: float = 3.0
) -> List[str]:
"""Split audio into segments for training"""
if not HAS_AUDIO:
return [input_path]
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
y, sr = librosa.load(input_path, sr=16000)
total_duration = len(y) / sr
if total_duration < min_duration:
# File too short, just copy
out_path = output_dir / f"segment_000.wav"
sf.write(out_path, y, sr)
return [str(out_path)]
segments = []
segment_samples = int(segment_duration * sr)
hop_samples = int((segment_duration - overlap) * sr)
for i, start in enumerate(range(0, len(y) - int(min_duration * sr), hop_samples)):
end = start + segment_samples
segment = y[start:end]
if len(segment) / sr >= min_duration:
out_path = output_dir / f"segment_{i:03d}.wav"
sf.write(out_path, segment, sr)
segments.append(str(out_path))
return segments
# ============================================================================
# VOICE MODEL MANAGEMENT
# ============================================================================
class VoiceModelRegistry:
"""Registry for voice models"""
def __init__(self):
self.registry_file = MODELS_DIR / 'registry.json'
self.models = self._load_registry()
def _load_registry(self) -> Dict:
if self.registry_file.exists():
try:
return json.loads(self.registry_file.read_text())
except:
pass
return {}
def _save_registry(self):
self.registry_file.write_text(json.dumps(self.models, indent=2))
def register_model(
self,
name: str,
model_path: str,
index_path: str = None,
description: str = "",
voice_type: str = "custom",
trained_by: str = "",
sample_rate: int = 40000
) -> str:
"""Register a new voice model"""
model_id = f"voice_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Copy model to models directory
model_dest = MODELS_DIR / model_id
model_dest.mkdir(exist_ok=True)
model_file = model_dest / 'model.pth'
shutil.copy2(model_path, model_file)
index_file = None
if index_path and os.path.exists(index_path):
index_file = model_dest / 'index.index'
shutil.copy2(index_path, index_file)
self.models[model_id] = {
'id': model_id,
'name': name,
'description': description,
'type': voice_type,
'trained_by': trained_by,
'model_path': str(model_file),
'index_path': str(index_file) if index_file else None,
'sample_rate': sample_rate,
'created': datetime.now().isoformat()
}
self._save_registry()
return model_id
def get_model(self, model_id: str) -> Optional[Dict]:
return self.models.get(model_id)
def list_models(self) -> List[Dict]:
return list(self.models.values())
def delete_model(self, model_id: str) -> bool:
if model_id in self.models:
model_dir = MODELS_DIR / model_id
if model_dir.exists():
shutil.rmtree(model_dir)
del self.models[model_id]
self._save_registry()
return True
return False
# ============================================================================
# PRESET VOICES
# ============================================================================
PRESET_VOICES = {
"male_tenor": {
"name": "Male Tenor",
"description": "Standard male tenor voice",
"pitch_shift": 0,
"formant_shift": 0
},
"male_bass": {
"name": "Male Bass",
"description": "Deep male bass voice",
"pitch_shift": -5,
"formant_shift": -2
},
"female_alto": {
"name": "Female Alto",
"description": "Standard female alto voice",
"pitch_shift": 12,
"formant_shift": 2
},
"female_soprano": {
"name": "Female Soprano",
"description": "High female soprano voice",
"pitch_shift": 15,
"formant_shift": 3
},
"child": {
"name": "Child Voice",
"description": "Young child voice",
"pitch_shift": 18,
"formant_shift": 4
},
"robot": {
"name": "Robot",
"description": "Robotic voice effect",
"pitch_shift": 0,
"formant_shift": 0,
"effects": ["vocoder"]
}
}
# ============================================================================
# RVC INFERENCE
# ============================================================================
@spaces.GPU(duration=120)
def convert_voice(
input_audio: str,
model_id: str = None,
model_path: str = None,
index_path: str = None,
pitch_shift: int = 0,
index_rate: float = 0.5,
filter_radius: int = 3,
resample_sr: int = 0,
rms_mix_rate: float = 0.25,
protect: float = 0.33,
progress_callback=None
) -> Tuple[Optional[str], str]:
"""
Convert voice using RVC model (GPU accelerated).
Args:
input_audio: Path to input audio file
model_id: Model ID from registry (or use model_path)
model_path: Direct path to model file
index_path: Path to index file (optional)
pitch_shift: Pitch shift in semitones
index_rate: Index rate (0-1)
filter_radius: Filter radius
resample_sr: Resample rate (0 = auto)
rms_mix_rate: RMS mix rate
protect: Protect voiceless consonants
Returns:
(output_path, status_message)
"""
has_rvc, rvc_info = check_rvc_installation()
if not has_rvc:
return None, rvc_info
# Get model paths
if model_id:
registry = VoiceModelRegistry()
model_info = registry.get_model(model_id)
if model_info:
model_path = model_info.get('model_path')
index_path = model_info.get('index_path')
else:
return None, f"Model {model_id} not found"
if not model_path or not os.path.exists(model_path):
return None, "No valid model specified"
try:
if progress_callback:
progress_callback(0.1, "Loading model...")
# Try using rvc-python package first
try:
from rvc_python import RVC
rvc = RVC(model_path=model_path, index_path=index_path)
if progress_callback:
progress_callback(0.3, "Converting voice...")
output_path = tempfile.mktemp(suffix='.wav')
rvc.convert(
input_path=input_audio,
output_path=output_path,
pitch_shift=pitch_shift,
index_rate=index_rate
)
if progress_callback:
progress_callback(1.0, "Done!")
return output_path, "Voice conversion complete"
except ImportError:
pass
# Fall back to CLI
if RVC_PATH:
if progress_callback:
progress_callback(0.2, "Using RVC CLI...")
output_path = tempfile.mktemp(suffix='.wav')
cmd = [
sys.executable,
str(RVC_PATH / 'infer_cli.py'),
'--input', input_audio,
'--output', output_path,
'--model', model_path,
'--pitch', str(pitch_shift),
'--index_rate', str(index_rate),
'--filter_radius', str(filter_radius),
'--rms_mix_rate', str(rms_mix_rate),
'--protect', str(protect)
]
if index_path:
cmd.extend(['--index', index_path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None, f"RVC error: {result.stderr}"
if progress_callback:
progress_callback(1.0, "Done!")
return output_path, "Voice conversion complete"
return None, "No RVC backend available"
except Exception as e:
return None, f"Conversion error: {str(e)}"
# ============================================================================
# RVC TRAINING
# ============================================================================
@spaces.GPU(duration=600)
def train_voice_model(
name: str,
training_files: List[str],
description: str = "",
epochs: int = 100,
batch_size: int = 8,
sample_rate: int = 40000,
user_email: str = "",
progress_callback=None
) -> Tuple[Optional[str], str]:
"""
Train a custom RVC voice model (GPU accelerated).
Args:
name: Name for the voice model
training_files: List of audio files for training
description: Model description
epochs: Number of training epochs
batch_size: Training batch size
sample_rate: Target sample rate
user_email: User email for tracking
progress_callback: Progress callback
Returns:
(model_id, status_message)
"""
has_rvc, rvc_info = check_rvc_installation()
if not has_rvc:
# Return simulated training for demo purposes
if progress_callback:
progress_callback(0.5, "RVC not installed - simulating training...")
progress_callback(1.0, "Training simulation complete")
return None, f"RVC not installed: {rvc_info}\nInstall with: pip install rvc-python"
try:
# Create training directory
train_id = f"train_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
train_dir = TRAINING_DIR / train_id
train_dir.mkdir(parents=True, exist_ok=True)
if progress_callback:
progress_callback(0.1, "Preparing training data...")
# Preprocess and copy training files
processed_dir = train_dir / 'processed'
processed_dir.mkdir(exist_ok=True)
all_segments = []
for i, tf in enumerate(training_files):
if os.path.exists(tf):
# Preprocess
preprocessed = preprocess_audio(
tf,
target_sr=sample_rate,
normalize=True,
trim_silence=True
)
# Split into segments
segments = split_audio_for_training(
preprocessed,
processed_dir / f"file_{i:03d}",
segment_duration=10.0
)
all_segments.extend(segments)
if not all_segments:
return None, "No valid training audio found"
if progress_callback:
progress_callback(0.2, f"Prepared {len(all_segments)} training segments")
# Try rvc-python training
try:
from rvc_python import RVC
if progress_callback:
progress_callback(0.3, "Starting model training...")
model_output = train_dir / 'model.pth'
index_output = train_dir / 'index.index'
# Note: Actual RVC training would happen here
# This is a placeholder for the training process
rvc = RVC()
rvc.train(
dataset_path=str(processed_dir),
model_name=name,
epochs=epochs,
batch_size=batch_size,
output_path=str(model_output)
)
if progress_callback:
progress_callback(0.9, "Registering model...")
# Register the trained model
registry = VoiceModelRegistry()
model_id = registry.register_model(
name=name,
model_path=str(model_output),
index_path=str(index_output) if index_output.exists() else None,
description=description,
voice_type="custom",
trained_by=user_email,
sample_rate=sample_rate
)
if progress_callback:
progress_callback(1.0, "Training complete!")
return model_id, f"Model trained and saved: {model_id}"
except ImportError:
return None, "RVC training requires rvc-python package"
except Exception as e:
return None, f"Training error: {str(e)}"
# ============================================================================
# VOICE DATASET MANAGEMENT
# ============================================================================
class VoiceDataset:
"""Manage voice training datasets"""
def __init__(self, user_email: str = "demo"):
self.user_email = user_email
self.user_dir = DATASET_DIR / user_email.replace('@', '_at_').replace('.', '_')
self.user_dir.mkdir(parents=True, exist_ok=True)
self.index_file = self.user_dir / 'datasets.json'
self.datasets = self._load_index()
def _load_index(self) -> Dict:
if self.index_file.exists():
try:
return json.loads(self.index_file.read_text())
except:
pass
return {}
def _save_index(self):
self.index_file.write_text(json.dumps(self.datasets, indent=2))
def create_dataset(self, name: str, description: str = "") -> str:
"""Create a new voice dataset"""
dataset_id = f"ds_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
dataset_dir = self.user_dir / dataset_id
dataset_dir.mkdir(exist_ok=True)
self.datasets[dataset_id] = {
'id': dataset_id,
'name': name,
'description': description,
'created': datetime.now().isoformat(),
'files': [],
'total_duration': 0
}
self._save_index()
return dataset_id
def add_audio(self, dataset_id: str, audio_path: str) -> bool:
"""Add audio file to dataset"""
if dataset_id not in self.datasets:
return False
dataset_dir = self.user_dir / dataset_id
filename = Path(audio_path).name
dest = dataset_dir / filename
# Preprocess and copy
preprocessed = preprocess_audio(audio_path, str(dest))
# Get duration
duration = 0
if HAS_AUDIO:
y, sr = librosa.load(preprocessed, sr=None)
duration = len(y) / sr
self.datasets[dataset_id]['files'].append({
'filename': filename,
'path': str(dest),
'duration': duration,
'added': datetime.now().isoformat()
})
self.datasets[dataset_id]['total_duration'] += duration
self._save_index()
return True
def get_dataset(self, dataset_id: str) -> Optional[Dict]:
return self.datasets.get(dataset_id)
def list_datasets(self) -> List[Dict]:
return list(self.datasets.values())
def get_training_files(self, dataset_id: str) -> List[str]:
"""Get list of file paths for training"""
ds = self.datasets.get(dataset_id)
if ds:
return [f['path'] for f in ds['files'] if os.path.exists(f['path'])]
return []
# ============================================================================
# HIGH-LEVEL API
# ============================================================================
def clone_voice(
source_audio: str,
target_voice: str,
pitch_shift: int = 0,
progress_callback=None
) -> Tuple[Optional[str], str]:
"""
Clone voice from target and apply to source audio.
Args:
source_audio: Audio to convert
target_voice: Voice model ID or preset name
pitch_shift: Pitch adjustment in semitones
progress_callback: Progress callback
Returns:
(output_path, status_message)
"""
# Check for preset voice
if target_voice in PRESET_VOICES:
preset = PRESET_VOICES[target_voice]
pitch_shift += preset.get('pitch_shift', 0)
# For presets without models, just apply pitch shift
if HAS_AUDIO:
y, sr = librosa.load(source_audio, sr=None)
y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=pitch_shift)
output_path = tempfile.mktemp(suffix='.wav')
sf.write(output_path, y_shifted, sr)
return output_path, f"Applied {preset['name']} preset"
# Use RVC model
return convert_voice(
input_audio=source_audio,
model_id=target_voice,
pitch_shift=pitch_shift,
progress_callback=progress_callback
)
# ============================================================================
# SINGLETON INSTANCES
# ============================================================================
_model_registry = None
_voice_dataset = None
def get_model_registry() -> VoiceModelRegistry:
global _model_registry
if _model_registry is None:
_model_registry = VoiceModelRegistry()
return _model_registry
def get_voice_dataset(user_email: str = "demo") -> VoiceDataset:
return VoiceDataset(user_email)
# ============================================================================
# CLI
# ============================================================================
if __name__ == "__main__":
print("VYNL RVC Voice Cloning")
print("=" * 50)
has_rvc, info = check_rvc_installation()
print(f"RVC available: {has_rvc}")
print(f"Info: {info}")
print("\nPreset voices:")
for key, preset in PRESET_VOICES.items():
print(f" - {key}: {preset['name']}")
print("\nRegistered models:")
registry = get_model_registry()
for model in registry.list_models():
print(f" - {model['id']}: {model['name']}")