🎤 LAVCO: Voice Conversion
Convert speech to match any reference voice using semantic/acoustic interleaving
""" LAVCO Gradio App for HuggingFace Spaces A beautiful web interface for voice conversion using LAVCO (Llasa-VC). """ import os import re import tempfile import gradio as gr import torch import torch.nn as nn import numpy as np import soundfile as sf import librosa from typing import List, Optional, Dict, Tuple from transformers import ( AutoModelForCausalLM, AutoTokenizer, WhisperModel, WhisperFeatureExtractor, ) # Constants XCODEC2_FRAME_RATE = 50 WHISPER_FRAME_RATE = 50 # Model configuration MODEL_ID = os.getenv("MODEL_ID", "AdoCleanCode/LAVCO-v3") DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Default audio files (will be in examples/ directory) EXAMPLES_DIR = "examples" DEFAULT_SOURCE_PATH = os.path.join(EXAMPLES_DIR, "sample1_source.wav") DEFAULT_REFERENCE_PATH = os.path.join(EXAMPLES_DIR, "sample1_reference.wav") # Check if files exist and use absolute paths DEFAULT_SOURCE_AUDIO = None DEFAULT_REFERENCE_AUDIO = None if os.path.exists(DEFAULT_SOURCE_PATH): DEFAULT_SOURCE_AUDIO = os.path.abspath(DEFAULT_SOURCE_PATH) print(f"✅ Found default source audio: {DEFAULT_SOURCE_AUDIO}", flush=True) else: print(f"⚠️ Default source audio not found: {DEFAULT_SOURCE_PATH}", flush=True) if os.path.exists(DEFAULT_REFERENCE_PATH): DEFAULT_REFERENCE_AUDIO = os.path.abspath(DEFAULT_REFERENCE_PATH) print(f"✅ Found default reference audio: {DEFAULT_REFERENCE_AUDIO}", flush=True) else: print(f"⚠️ Default reference audio not found: {DEFAULT_REFERENCE_PATH}", flush=True) # Global model and tokenizer (loaded once) model = None tokenizer = None class SpeechOnlyLogitsProcessor: """Only allow XCodec2 speech tokens and custom EOS.""" def __init__(self, tokenizer, eos_id: int): self.allowed = torch.zeros(len(tokenizer), dtype=torch.bool) vocab = tokenizer.get_vocab() pat = re.compile(r"^<\|s_\d+\|>$") for t, tid in vocab.items(): if pat.match(t): self.allowed[tid] = True self.allowed[eos_id] = True def __call__(self, input_ids, scores): mask = self.allowed.to(scores.device) return scores.masked_fill(~mask, float("-inf")) def apply_repetition_penalty(logits: torch.Tensor, generated_ids: List[int], penalty: float = 1.2, window: int = 5): """Apply repetition penalty ONLY to recently repeated tokens.""" if penalty == 1.0 or len(generated_ids) < 2: return logits recent_tokens = generated_ids[-window:] if len(generated_ids) >= window else generated_ids token_counts = {} for token_id in recent_tokens: token_counts[token_id] = token_counts.get(token_id, 0) + 1 for token_id, count in token_counts.items(): if count > 1: effective_penalty = penalty ** (count - 1) if logits[0, token_id] > 0: logits[0, token_id] /= effective_penalty else: logits[0, token_id] *= effective_penalty return logits def sample_with_temperature_and_top_p(logits: torch.Tensor, temperature: float = 1.0, top_p: float = 0.9): """Sample token with temperature scaling and nucleus (top-p) sampling.""" if temperature != 1.0: logits = logits / temperature probs = torch.softmax(logits, dim=-1) if top_p < 1.0: sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1) cumulative_probs = torch.cumsum(sorted_probs, dim=-1) sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = False indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) probs = probs.masked_fill(indices_to_remove, 0.0) probs = probs / probs.sum(dim=-1, keepdim=True) next_token_id = torch.multinomial(probs, num_samples=1).item() return next_token_id def greedy_generate_with_embeds( model, inputs_embeds: torch.Tensor, embed_layer, logits_processor, max_new_tokens: int, eos_token_id: int, pad_token_id: int = 0, verbose: bool = False, tokenizer=None, temperature: float = 1.0, repetition_penalty: float = 1.2, top_p: float = 0.9, repetition_window: int = 5, ) -> List[int]: """KV-cache enabled greedy generation starting from inputs_embeds.""" device = inputs_embeds.device generated = [] past_key_values = None cur_embeds = inputs_embeds dummy_input_ids = torch.zeros(1, inputs_embeds.shape[1], dtype=torch.long, device=device) with torch.no_grad(): outputs = model( inputs_embeds=cur_embeds, use_cache=True, return_dict=True, ) logits = outputs.logits[:, -1, :] past_key_values = outputs.past_key_values logits = logits_processor(dummy_input_ids, logits) logits = apply_repetition_penalty(logits, generated, repetition_penalty, repetition_window) if temperature == 1.0 and top_p == 1.0: next_token_id = torch.argmax(logits, dim=-1).item() else: next_token_id = sample_with_temperature_and_top_p(logits, temperature, top_p) generated.append(next_token_id) if next_token_id == eos_token_id: return generated for step in range(1, max_new_tokens): new_token_embed = embed_layer(torch.tensor([[next_token_id]], device=device)) with torch.no_grad(): outputs = model( inputs_embeds=new_token_embed, past_key_values=past_key_values, use_cache=True, return_dict=True, ) logits = outputs.logits[:, -1, :] past_key_values = outputs.past_key_values dummy_input_ids = torch.cat([ dummy_input_ids, torch.tensor([[next_token_id]], device=device) ], dim=1) logits = logits_processor(dummy_input_ids, logits) logits = apply_repetition_penalty(logits, generated, repetition_penalty, repetition_window) if temperature == 1.0 and top_p == 1.0: next_token_id = torch.argmax(logits, dim=-1).item() else: next_token_id = sample_with_temperature_and_top_p(logits, temperature, top_p) generated.append(next_token_id) if next_token_id == eos_token_id: break return generated class LAVCOModel(nn.Module): """LAVCO model for voice conversion.""" def __init__(self, load_dir_or_repo: str, device: str = "cuda", cache_dir: str = None): super().__init__() import json from huggingface_hub import hf_hub_download, snapshot_download from xcodec2.modeling_xcodec2 import XCodec2Model is_local = os.path.isdir(load_dir_or_repo) if is_local: config_path = os.path.join(load_dir_or_repo, "llasa_vc_config.json") proj_path = os.path.join(load_dir_or_repo, "projection.pt") llasa_path = os.path.join(load_dir_or_repo, "llasa") else: print(f"📥 Downloading from HuggingFace: {load_dir_or_repo}") config_path = hf_hub_download( repo_id=load_dir_or_repo, filename="llasa_vc_config.json", cache_dir=cache_dir, ) proj_path = hf_hub_download( repo_id=load_dir_or_repo, filename="projection.pt", cache_dir=cache_dir, ) llasa_path = snapshot_download( repo_id=load_dir_or_repo, allow_patterns=["llasa/*"], cache_dir=cache_dir, ) llasa_path = os.path.join(llasa_path, "llasa") with open(config_path, "r") as f: config = json.load(f) import sys print(f"📥 Loading LLASA from {llasa_path}...", flush=True) sys.stdout.flush() self.llasa = AutoModelForCausalLM.from_pretrained( llasa_path, trust_remote_code=True, torch_dtype=torch.bfloat16, ) self.hidden_size = self.llasa.config.hidden_size print(f" ✅ LLASA loaded (hidden_size={self.hidden_size})", flush=True) sys.stdout.flush() print(f"📥 Loading Whisper encoder from {config['whisper_model']}...", flush=True) sys.stdout.flush() whisper_full = WhisperModel.from_pretrained(config["whisper_model"]) self.whisper = whisper_full.encoder self.whisper_dim = self.whisper.config.d_model del whisper_full print(f" ✅ Whisper loaded (dim={self.whisper_dim})", flush=True) sys.stdout.flush() print(f"📥 Loading XCodec2 from {config['xcodec_model']}...", flush=True) sys.stdout.flush() self.xcodec = XCodec2Model.from_pretrained(config["xcodec_model"]) self.xcodec.eval() print(f" ✅ XCodec2 loaded", flush=True) sys.stdout.flush() print(f"📥 Loading Whisper processor...", flush=True) sys.stdout.flush() self.whisper_processor = WhisperFeatureExtractor.from_pretrained(config["whisper_model"]) print(f" ✅ Whisper processor loaded", flush=True) sys.stdout.flush() print(f"📥 Loading projection layer...", flush=True) sys.stdout.flush() proj_state = torch.load(proj_path, map_location="cpu", weights_only=False) self.projection = nn.Linear(self.whisper_dim, self.hidden_size) self.projection.load_state_dict(proj_state) print(f" ✅ Projection layer loaded", flush=True) sys.stdout.flush() self.u_start_id = config.get("u_start_id") self.u_end_id = config.get("u_end_id") self.g_start_id = config["g_start_id"] self.g_end_id = config["g_end_id"] self.pad_id = config["pad_id"] for param in self.whisper.parameters(): param.requires_grad = False self.whisper.eval() for param in self.xcodec.parameters(): param.requires_grad = False self.xcodec.eval() def set_special_token_ids(self, tokenizer): """Set special token IDs and instruction text embeddings.""" self.tokenizer = tokenizer self.u_start_id = tokenizer.convert_tokens_to_ids("<|SPEECH_UNDERSTANDING_START|>") self.u_end_id = tokenizer.convert_tokens_to_ids("<|SPEECH_UNDERSTANDING_END|>") self.g_start_id = tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_START|>") self.g_end_id = tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_END|>") self.pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0 prefix_text = "Convert " middle_text = " into speech using this speaker: " self.prefix_ids = tokenizer(prefix_text, add_special_tokens=False, return_tensors="pt")["input_ids"] self.middle_ids = tokenizer(middle_text, add_special_tokens=False, return_tensors="pt")["input_ids"] def _tokenizer_ids_to_xcodec_codes(self, tokenizer_ids: torch.Tensor) -> torch.Tensor: """Convert LLASA tokenizer IDs back to raw XCodec2 codes (0-65535).""" batch_size, seq_len = tokenizer_ids.shape xcodec_codes = torch.zeros_like(tokenizer_ids) for i in range(batch_size): tokens = self.tokenizer.convert_ids_to_tokens(tokenizer_ids[i].tolist()) for j, tok in enumerate(tokens): if tok and tok.startswith("<|s_") and tok.endswith("|>"): try: code = int(tok[4:-2]) xcodec_codes[i, j] = code except ValueError: xcodec_codes[i, j] = 0 else: xcodec_codes[i, j] = 0 return xcodec_codes def generate( self, wav_or_mel: np.ndarray, ref_ids: torch.Tensor, ref_length: int, max_new_tokens: int = 2000, tokenizer=None, temperature: float = 1.0, repetition_penalty: float = 1.2, top_p: float = 0.9, repetition_window: int = 5, verbose: bool = False, ) -> List[int]: """Generate voice conversion tokens.""" device = ref_ids.device model_dtype = next(self.llasa.parameters()).dtype mel = self.whisper_processor(wav_or_mel, sampling_rate=16000, return_tensors="pt").input_features.to(device) whisper_out = self.whisper(mel).last_hidden_state audio_dur = len(wav_or_mel) / 16000 num_frames = min(int(audio_dur * WHISPER_FRAME_RATE), 1500) soft_tokens = self.projection(whisper_out[:, :num_frames]).to(model_dtype) embed_layer = self.llasa.get_input_embeddings() prefix_emb = embed_layer(self.prefix_ids.to(device)) middle_emb = embed_layer(self.middle_ids.to(device)) u_start_emb = embed_layer(torch.tensor([[self.u_start_id]], device=device)) u_end_emb = embed_layer(torch.tensor([[self.u_end_id]], device=device)) g_start_emb = embed_layer(torch.tensor([[self.g_start_id]], device=device)) ref_embeds = embed_layer(ref_ids[:, :ref_length]) inputs_embeds = torch.cat([ prefix_emb, soft_tokens, middle_emb, u_start_emb, ref_embeds, u_end_emb, g_start_emb, ], dim=1).to(model_dtype) if tokenizer is not None: logits_processor = SpeechOnlyLogitsProcessor(tokenizer, self.g_end_id) generated = greedy_generate_with_embeds( model=self.llasa, inputs_embeds=inputs_embeds, embed_layer=embed_layer, logits_processor=logits_processor, max_new_tokens=max_new_tokens, eos_token_id=self.g_end_id, pad_token_id=self.pad_id, verbose=verbose, tokenizer=tokenizer, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p, repetition_window=repetition_window, ) return generated else: outputs = self.llasa.generate( inputs_embeds=inputs_embeds, max_new_tokens=max_new_tokens, pad_token_id=self.pad_id, eos_token_id=self.g_end_id, do_sample=False, ) return outputs[0].tolist() def load_model(): """Load model once at startup.""" global model, tokenizer if model is None: import sys import time print(f"📥 Loading model: {MODEL_ID}", flush=True) sys.stdout.flush() start_time = time.time() print(" → Loading LAVCO model components...", flush=True) model = LAVCOModel(MODEL_ID, device=DEVICE) print(f" → Moving model to {DEVICE}...", flush=True) model = model.to(DEVICE) model.eval() print(f" → Loading tokenizer...", flush=True) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) print(f" → Setting special tokens...", flush=True) model.set_special_token_ids(tokenizer) elapsed = time.time() - start_time print(f"✅ Model loaded in {elapsed:.1f}s", flush=True) sys.stdout.flush() return model, tokenizer def extract_xcodec2_from_generated(tokenizer, token_ids: list) -> list: """Extract XCodec2 token IDs from generated token IDs.""" xcodec2_ids = [] for tid in token_ids: token = tokenizer.convert_ids_to_tokens(tid) if token and token.startswith("<|s_") and token.endswith("|>"): try: xcodec2_ids.append(int(token[4:-2])) except ValueError: pass return xcodec2_ids def convert_voice(source_audio, reference_audio, temperature, repetition_penalty, top_p, repetition_window, max_tokens, progress=gr.Progress()): """Convert source voice to reference voice using LAVCO.""" if source_audio is None: return None, "❌ Please provide source audio" if reference_audio is None: return None, "❌ Please provide reference audio" try: progress(0.1, desc="Loading model...") model, tokenizer = load_model() progress(0.2, desc="Loading audio files...") if isinstance(source_audio, tuple): source_path = source_audio[1] else: source_path = source_audio if isinstance(reference_audio, tuple): reference_path = reference_audio[1] else: reference_path = reference_audio source_wav = librosa.load(source_path, sr=16000)[0].astype(np.float32) reference_wav = librosa.load(reference_path, sr=16000)[0].astype(np.float32) progress(0.4, desc="Encoding audio...") with torch.no_grad(): xcodec_device = next(model.xcodec.parameters()).device ref_tensor_audio = torch.from_numpy(reference_wav).float().unsqueeze(0).to(xcodec_device) ref_codes = model.xcodec.encode_code(input_waveform=ref_tensor_audio) if isinstance(ref_codes, torch.Tensor): ref_codes_np = ref_codes.cpu().numpy() else: ref_codes_np = np.array(ref_codes) ref_xcodec_ids = ref_codes_np.flatten().astype(int).tolist() ref_token_str = "".join([f"<|s_{rid}|>" for rid in ref_xcodec_ids]) ref_tokenizer_ids = tokenizer(ref_token_str, add_special_tokens=False)["input_ids"] ref_ids = torch.tensor(ref_tokenizer_ids, dtype=torch.long, device=DEVICE).unsqueeze(0) ref_length = len(ref_tokenizer_ids) source_tensor_audio = torch.from_numpy(source_wav).float().unsqueeze(0).to(xcodec_device) source_codes = model.xcodec.encode_code(input_waveform=source_tensor_audio) if isinstance(source_codes, torch.Tensor): source_codes_np = source_codes.cpu().numpy() else: source_codes_np = np.array(source_codes) source_xcodec_ids = source_codes_np.flatten().astype(int).tolist() source_token_str = "".join([f"<|s_{rid}|>" for rid in source_xcodec_ids]) source_tokenizer_ids = tokenizer(source_token_str, add_special_tokens=False)["input_ids"] seedvc_ids = torch.tensor(source_tokenizer_ids, dtype=torch.long, device=DEVICE).unsqueeze(0) seedvc_length = len(source_tokenizer_ids) xcodec_codes = model._tokenizer_ids_to_xcodec_codes(seedvc_ids) codes = xcodec_codes.unsqueeze(1).to(xcodec_device) wav = model.xcodec.decode_code(codes) if len(wav.shape) == 3: wav = wav.squeeze(1) num_samples_audio = int(seedvc_length / XCODEC2_FRAME_RATE * 16000) num_samples_audio = min(num_samples_audio, wav.shape[-1]) source_wav_processed = wav[0, :num_samples_audio].cpu().numpy() progress(0.7, desc="Generating voice conversion...") import inspect gen_sig = inspect.signature(model.generate) gen_params = gen_sig.parameters gen_kwargs = { 'max_new_tokens': max_tokens, 'tokenizer': tokenizer, 'verbose': False, } if 'temperature' in gen_params: gen_kwargs['temperature'] = temperature if 'repetition_penalty' in gen_params: gen_kwargs['repetition_penalty'] = repetition_penalty if 'top_p' in gen_params: gen_kwargs['top_p'] = top_p if 'repetition_window' in gen_params: gen_kwargs['repetition_window'] = repetition_window generated_token_ids = model.generate( source_wav_processed, ref_ids, ref_length, **gen_kwargs ) progress(0.9, desc="Decoding audio...") gen_xcodec_ids = extract_xcodec2_from_generated(tokenizer, generated_token_ids) if not gen_xcodec_ids: return None, "❌ No audio tokens generated!" codes = torch.tensor(gen_xcodec_ids, device=xcodec_device).unsqueeze(0).unsqueeze(0) output_wav = model.xcodec.decode_code(codes) if len(output_wav.shape) == 3: output_wav = output_wav[0, 0, :].cpu().numpy() elif len(output_wav.shape) == 2: output_wav = output_wav[0, :].cpu().numpy() else: output_wav = output_wav.cpu().numpy() with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: sf.write(tmp_file.name, output_wav, 16000) output_path = tmp_file.name progress(1.0, desc="Complete!") return output_path, f"✅ Generated {len(gen_xcodec_ids)} tokens ({len(gen_xcodec_ids)/XCODEC2_FRAME_RATE:.2f}s)" except Exception as e: import traceback error_msg = f"❌ Error: {str(e)}\n{traceback.format_exc()}" return None, error_msg # Custom CSS for beautiful UI css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; } .main-header { text-align: center; padding: 2rem 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 2rem; } """ # Create Gradio interface with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo: gr.Markdown("""
Convert speech to match any reference voice using semantic/acoustic interleaving