""" LAVCO Gradio App for HuggingFace Spaces A beautiful web interface for voice conversion using LAVCO (Llasa-VC). """ import os import re import tempfile import gradio as gr import torch import torch.nn as nn import numpy as np import soundfile as sf import librosa from typing import List, Optional, Dict, Tuple from transformers import ( AutoModelForCausalLM, AutoTokenizer, WhisperModel, WhisperFeatureExtractor, ) # Constants XCODEC2_FRAME_RATE = 50 WHISPER_FRAME_RATE = 50 # Model configuration MODEL_ID = os.getenv("MODEL_ID", "AdoCleanCode/LAVCO-v3") DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Default audio files (will be in examples/ directory) EXAMPLES_DIR = "examples" DEFAULT_SOURCE_PATH = os.path.join(EXAMPLES_DIR, "sample1_source.wav") DEFAULT_REFERENCE_PATH = os.path.join(EXAMPLES_DIR, "sample1_reference.wav") # Check if files exist and use absolute paths DEFAULT_SOURCE_AUDIO = None DEFAULT_REFERENCE_AUDIO = None if os.path.exists(DEFAULT_SOURCE_PATH): DEFAULT_SOURCE_AUDIO = os.path.abspath(DEFAULT_SOURCE_PATH) print(f"✅ Found default source audio: {DEFAULT_SOURCE_AUDIO}", flush=True) else: print(f"⚠️ Default source audio not found: {DEFAULT_SOURCE_PATH}", flush=True) if os.path.exists(DEFAULT_REFERENCE_PATH): DEFAULT_REFERENCE_AUDIO = os.path.abspath(DEFAULT_REFERENCE_PATH) print(f"✅ Found default reference audio: {DEFAULT_REFERENCE_AUDIO}", flush=True) else: print(f"⚠️ Default reference audio not found: {DEFAULT_REFERENCE_PATH}", flush=True) # Global model and tokenizer (loaded once) model = None tokenizer = None class SpeechOnlyLogitsProcessor: """Only allow XCodec2 speech tokens and custom EOS.""" def __init__(self, tokenizer, eos_id: int): self.allowed = torch.zeros(len(tokenizer), dtype=torch.bool) vocab = tokenizer.get_vocab() pat = re.compile(r"^<\|s_\d+\|>$") for t, tid in vocab.items(): if pat.match(t): self.allowed[tid] = True self.allowed[eos_id] = True def __call__(self, input_ids, scores): mask = self.allowed.to(scores.device) return scores.masked_fill(~mask, float("-inf")) def apply_repetition_penalty(logits: torch.Tensor, generated_ids: List[int], penalty: float = 1.2, window: int = 5): """Apply repetition penalty ONLY to recently repeated tokens.""" if penalty == 1.0 or len(generated_ids) < 2: return logits recent_tokens = generated_ids[-window:] if len(generated_ids) >= window else generated_ids token_counts = {} for token_id in recent_tokens: token_counts[token_id] = token_counts.get(token_id, 0) + 1 for token_id, count in token_counts.items(): if count > 1: effective_penalty = penalty ** (count - 1) if logits[0, token_id] > 0: logits[0, token_id] /= effective_penalty else: logits[0, token_id] *= effective_penalty return logits def sample_with_temperature_and_top_p(logits: torch.Tensor, temperature: float = 1.0, top_p: float = 0.9): """Sample token with temperature scaling and nucleus (top-p) sampling.""" if temperature != 1.0: logits = logits / temperature probs = torch.softmax(logits, dim=-1) if top_p < 1.0: sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1) cumulative_probs = torch.cumsum(sorted_probs, dim=-1) sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = False indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) probs = probs.masked_fill(indices_to_remove, 0.0) probs = probs / probs.sum(dim=-1, keepdim=True) next_token_id = torch.multinomial(probs, num_samples=1).item() return next_token_id def greedy_generate_with_embeds( model, inputs_embeds: torch.Tensor, embed_layer, logits_processor, max_new_tokens: int, eos_token_id: int, pad_token_id: int = 0, verbose: bool = False, tokenizer=None, temperature: float = 1.0, repetition_penalty: float = 1.2, top_p: float = 0.9, repetition_window: int = 5, ) -> List[int]: """KV-cache enabled greedy generation starting from inputs_embeds.""" device = inputs_embeds.device generated = [] past_key_values = None cur_embeds = inputs_embeds dummy_input_ids = torch.zeros(1, inputs_embeds.shape[1], dtype=torch.long, device=device) with torch.no_grad(): outputs = model( inputs_embeds=cur_embeds, use_cache=True, return_dict=True, ) logits = outputs.logits[:, -1, :] past_key_values = outputs.past_key_values logits = logits_processor(dummy_input_ids, logits) logits = apply_repetition_penalty(logits, generated, repetition_penalty, repetition_window) if temperature == 1.0 and top_p == 1.0: next_token_id = torch.argmax(logits, dim=-1).item() else: next_token_id = sample_with_temperature_and_top_p(logits, temperature, top_p) generated.append(next_token_id) if next_token_id == eos_token_id: return generated for step in range(1, max_new_tokens): new_token_embed = embed_layer(torch.tensor([[next_token_id]], device=device)) with torch.no_grad(): outputs = model( inputs_embeds=new_token_embed, past_key_values=past_key_values, use_cache=True, return_dict=True, ) logits = outputs.logits[:, -1, :] past_key_values = outputs.past_key_values dummy_input_ids = torch.cat([ dummy_input_ids, torch.tensor([[next_token_id]], device=device) ], dim=1) logits = logits_processor(dummy_input_ids, logits) logits = apply_repetition_penalty(logits, generated, repetition_penalty, repetition_window) if temperature == 1.0 and top_p == 1.0: next_token_id = torch.argmax(logits, dim=-1).item() else: next_token_id = sample_with_temperature_and_top_p(logits, temperature, top_p) generated.append(next_token_id) if next_token_id == eos_token_id: break return generated class LAVCOModel(nn.Module): """LAVCO model for voice conversion.""" def __init__(self, load_dir_or_repo: str, device: str = "cuda", cache_dir: str = None): super().__init__() import json from huggingface_hub import hf_hub_download, snapshot_download from xcodec2.modeling_xcodec2 import XCodec2Model is_local = os.path.isdir(load_dir_or_repo) if is_local: config_path = os.path.join(load_dir_or_repo, "llasa_vc_config.json") proj_path = os.path.join(load_dir_or_repo, "projection.pt") llasa_path = os.path.join(load_dir_or_repo, "llasa") else: print(f"📥 Downloading from HuggingFace: {load_dir_or_repo}") config_path = hf_hub_download( repo_id=load_dir_or_repo, filename="llasa_vc_config.json", cache_dir=cache_dir, ) proj_path = hf_hub_download( repo_id=load_dir_or_repo, filename="projection.pt", cache_dir=cache_dir, ) llasa_path = snapshot_download( repo_id=load_dir_or_repo, allow_patterns=["llasa/*"], cache_dir=cache_dir, ) llasa_path = os.path.join(llasa_path, "llasa") with open(config_path, "r") as f: config = json.load(f) import sys print(f"📥 Loading LLASA from {llasa_path}...", flush=True) sys.stdout.flush() self.llasa = AutoModelForCausalLM.from_pretrained( llasa_path, trust_remote_code=True, torch_dtype=torch.bfloat16, ) self.hidden_size = self.llasa.config.hidden_size print(f" ✅ LLASA loaded (hidden_size={self.hidden_size})", flush=True) sys.stdout.flush() print(f"📥 Loading Whisper encoder from {config['whisper_model']}...", flush=True) sys.stdout.flush() whisper_full = WhisperModel.from_pretrained(config["whisper_model"]) self.whisper = whisper_full.encoder self.whisper_dim = self.whisper.config.d_model del whisper_full print(f" ✅ Whisper loaded (dim={self.whisper_dim})", flush=True) sys.stdout.flush() print(f"📥 Loading XCodec2 from {config['xcodec_model']}...", flush=True) sys.stdout.flush() self.xcodec = XCodec2Model.from_pretrained(config["xcodec_model"]) self.xcodec.eval() print(f" ✅ XCodec2 loaded", flush=True) sys.stdout.flush() print(f"📥 Loading Whisper processor...", flush=True) sys.stdout.flush() self.whisper_processor = WhisperFeatureExtractor.from_pretrained(config["whisper_model"]) print(f" ✅ Whisper processor loaded", flush=True) sys.stdout.flush() print(f"📥 Loading projection layer...", flush=True) sys.stdout.flush() proj_state = torch.load(proj_path, map_location="cpu", weights_only=False) self.projection = nn.Linear(self.whisper_dim, self.hidden_size) self.projection.load_state_dict(proj_state) print(f" ✅ Projection layer loaded", flush=True) sys.stdout.flush() self.u_start_id = config.get("u_start_id") self.u_end_id = config.get("u_end_id") self.g_start_id = config["g_start_id"] self.g_end_id = config["g_end_id"] self.pad_id = config["pad_id"] for param in self.whisper.parameters(): param.requires_grad = False self.whisper.eval() for param in self.xcodec.parameters(): param.requires_grad = False self.xcodec.eval() def set_special_token_ids(self, tokenizer): """Set special token IDs and instruction text embeddings.""" self.tokenizer = tokenizer self.u_start_id = tokenizer.convert_tokens_to_ids("<|SPEECH_UNDERSTANDING_START|>") self.u_end_id = tokenizer.convert_tokens_to_ids("<|SPEECH_UNDERSTANDING_END|>") self.g_start_id = tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_START|>") self.g_end_id = tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_END|>") self.pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0 prefix_text = "Convert " middle_text = " into speech using this speaker: " self.prefix_ids = tokenizer(prefix_text, add_special_tokens=False, return_tensors="pt")["input_ids"] self.middle_ids = tokenizer(middle_text, add_special_tokens=False, return_tensors="pt")["input_ids"] def _tokenizer_ids_to_xcodec_codes(self, tokenizer_ids: torch.Tensor) -> torch.Tensor: """Convert LLASA tokenizer IDs back to raw XCodec2 codes (0-65535).""" batch_size, seq_len = tokenizer_ids.shape xcodec_codes = torch.zeros_like(tokenizer_ids) for i in range(batch_size): tokens = self.tokenizer.convert_ids_to_tokens(tokenizer_ids[i].tolist()) for j, tok in enumerate(tokens): if tok and tok.startswith("<|s_") and tok.endswith("|>"): try: code = int(tok[4:-2]) xcodec_codes[i, j] = code except ValueError: xcodec_codes[i, j] = 0 else: xcodec_codes[i, j] = 0 return xcodec_codes def generate( self, wav_or_mel: np.ndarray, ref_ids: torch.Tensor, ref_length: int, max_new_tokens: int = 2000, tokenizer=None, temperature: float = 1.0, repetition_penalty: float = 1.2, top_p: float = 0.9, repetition_window: int = 5, verbose: bool = False, ) -> List[int]: """Generate voice conversion tokens.""" device = ref_ids.device model_dtype = next(self.llasa.parameters()).dtype mel = self.whisper_processor(wav_or_mel, sampling_rate=16000, return_tensors="pt").input_features.to(device) whisper_out = self.whisper(mel).last_hidden_state audio_dur = len(wav_or_mel) / 16000 num_frames = min(int(audio_dur * WHISPER_FRAME_RATE), 1500) soft_tokens = self.projection(whisper_out[:, :num_frames]).to(model_dtype) embed_layer = self.llasa.get_input_embeddings() prefix_emb = embed_layer(self.prefix_ids.to(device)) middle_emb = embed_layer(self.middle_ids.to(device)) u_start_emb = embed_layer(torch.tensor([[self.u_start_id]], device=device)) u_end_emb = embed_layer(torch.tensor([[self.u_end_id]], device=device)) g_start_emb = embed_layer(torch.tensor([[self.g_start_id]], device=device)) ref_embeds = embed_layer(ref_ids[:, :ref_length]) inputs_embeds = torch.cat([ prefix_emb, soft_tokens, middle_emb, u_start_emb, ref_embeds, u_end_emb, g_start_emb, ], dim=1).to(model_dtype) if tokenizer is not None: logits_processor = SpeechOnlyLogitsProcessor(tokenizer, self.g_end_id) generated = greedy_generate_with_embeds( model=self.llasa, inputs_embeds=inputs_embeds, embed_layer=embed_layer, logits_processor=logits_processor, max_new_tokens=max_new_tokens, eos_token_id=self.g_end_id, pad_token_id=self.pad_id, verbose=verbose, tokenizer=tokenizer, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p, repetition_window=repetition_window, ) return generated else: outputs = self.llasa.generate( inputs_embeds=inputs_embeds, max_new_tokens=max_new_tokens, pad_token_id=self.pad_id, eos_token_id=self.g_end_id, do_sample=False, ) return outputs[0].tolist() def load_model(): """Load model once at startup.""" global model, tokenizer if model is None: import sys import time print(f"📥 Loading model: {MODEL_ID}", flush=True) sys.stdout.flush() start_time = time.time() print(" → Loading LAVCO model components...", flush=True) model = LAVCOModel(MODEL_ID, device=DEVICE) print(f" → Moving model to {DEVICE}...", flush=True) model = model.to(DEVICE) model.eval() print(f" → Loading tokenizer...", flush=True) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) print(f" → Setting special tokens...", flush=True) model.set_special_token_ids(tokenizer) elapsed = time.time() - start_time print(f"✅ Model loaded in {elapsed:.1f}s", flush=True) sys.stdout.flush() return model, tokenizer def extract_xcodec2_from_generated(tokenizer, token_ids: list) -> list: """Extract XCodec2 token IDs from generated token IDs.""" xcodec2_ids = [] for tid in token_ids: token = tokenizer.convert_ids_to_tokens(tid) if token and token.startswith("<|s_") and token.endswith("|>"): try: xcodec2_ids.append(int(token[4:-2])) except ValueError: pass return xcodec2_ids def convert_voice(source_audio, reference_audio, temperature, repetition_penalty, top_p, repetition_window, max_tokens, progress=gr.Progress()): """Convert source voice to reference voice using LAVCO.""" if source_audio is None: return None, "❌ Please provide source audio" if reference_audio is None: return None, "❌ Please provide reference audio" try: progress(0.1, desc="Loading model...") model, tokenizer = load_model() progress(0.2, desc="Loading audio files...") if isinstance(source_audio, tuple): source_path = source_audio[1] else: source_path = source_audio if isinstance(reference_audio, tuple): reference_path = reference_audio[1] else: reference_path = reference_audio source_wav = librosa.load(source_path, sr=16000)[0].astype(np.float32) reference_wav = librosa.load(reference_path, sr=16000)[0].astype(np.float32) progress(0.4, desc="Encoding audio...") with torch.no_grad(): xcodec_device = next(model.xcodec.parameters()).device ref_tensor_audio = torch.from_numpy(reference_wav).float().unsqueeze(0).to(xcodec_device) ref_codes = model.xcodec.encode_code(input_waveform=ref_tensor_audio) if isinstance(ref_codes, torch.Tensor): ref_codes_np = ref_codes.cpu().numpy() else: ref_codes_np = np.array(ref_codes) ref_xcodec_ids = ref_codes_np.flatten().astype(int).tolist() ref_token_str = "".join([f"<|s_{rid}|>" for rid in ref_xcodec_ids]) ref_tokenizer_ids = tokenizer(ref_token_str, add_special_tokens=False)["input_ids"] ref_ids = torch.tensor(ref_tokenizer_ids, dtype=torch.long, device=DEVICE).unsqueeze(0) ref_length = len(ref_tokenizer_ids) source_tensor_audio = torch.from_numpy(source_wav).float().unsqueeze(0).to(xcodec_device) source_codes = model.xcodec.encode_code(input_waveform=source_tensor_audio) if isinstance(source_codes, torch.Tensor): source_codes_np = source_codes.cpu().numpy() else: source_codes_np = np.array(source_codes) source_xcodec_ids = source_codes_np.flatten().astype(int).tolist() source_token_str = "".join([f"<|s_{rid}|>" for rid in source_xcodec_ids]) source_tokenizer_ids = tokenizer(source_token_str, add_special_tokens=False)["input_ids"] seedvc_ids = torch.tensor(source_tokenizer_ids, dtype=torch.long, device=DEVICE).unsqueeze(0) seedvc_length = len(source_tokenizer_ids) xcodec_codes = model._tokenizer_ids_to_xcodec_codes(seedvc_ids) codes = xcodec_codes.unsqueeze(1).to(xcodec_device) wav = model.xcodec.decode_code(codes) if len(wav.shape) == 3: wav = wav.squeeze(1) num_samples_audio = int(seedvc_length / XCODEC2_FRAME_RATE * 16000) num_samples_audio = min(num_samples_audio, wav.shape[-1]) source_wav_processed = wav[0, :num_samples_audio].cpu().numpy() progress(0.7, desc="Generating voice conversion...") import inspect gen_sig = inspect.signature(model.generate) gen_params = gen_sig.parameters gen_kwargs = { 'max_new_tokens': max_tokens, 'tokenizer': tokenizer, 'verbose': False, } if 'temperature' in gen_params: gen_kwargs['temperature'] = temperature if 'repetition_penalty' in gen_params: gen_kwargs['repetition_penalty'] = repetition_penalty if 'top_p' in gen_params: gen_kwargs['top_p'] = top_p if 'repetition_window' in gen_params: gen_kwargs['repetition_window'] = repetition_window generated_token_ids = model.generate( source_wav_processed, ref_ids, ref_length, **gen_kwargs ) progress(0.9, desc="Decoding audio...") gen_xcodec_ids = extract_xcodec2_from_generated(tokenizer, generated_token_ids) if not gen_xcodec_ids: return None, "❌ No audio tokens generated!" codes = torch.tensor(gen_xcodec_ids, device=xcodec_device).unsqueeze(0).unsqueeze(0) output_wav = model.xcodec.decode_code(codes) if len(output_wav.shape) == 3: output_wav = output_wav[0, 0, :].cpu().numpy() elif len(output_wav.shape) == 2: output_wav = output_wav[0, :].cpu().numpy() else: output_wav = output_wav.cpu().numpy() with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: sf.write(tmp_file.name, output_wav, 16000) output_path = tmp_file.name progress(1.0, desc="Complete!") return output_path, f"✅ Generated {len(gen_xcodec_ids)} tokens ({len(gen_xcodec_ids)/XCODEC2_FRAME_RATE:.2f}s)" except Exception as e: import traceback error_msg = f"❌ Error: {str(e)}\n{traceback.format_exc()}" return None, error_msg # Custom CSS for beautiful UI css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; } .main-header { text-align: center; padding: 2rem 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 2rem; } """ # Create Gradio interface with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo: gr.Markdown("""

🎤 LAVCO: Voice Conversion

Convert speech to match any reference voice using semantic/acoustic interleaving

""") with gr.Row(): with gr.Column(): gr.Markdown("### 📥 Input Audio") source_audio = gr.Audio( label="Source Audio (content to convert)", type="filepath", sources=["upload", "microphone"] ) reference_audio = gr.Audio( label="Reference Audio (target voice)", type="filepath", sources=["upload", "microphone"] ) # Add examples if default files exist if DEFAULT_SOURCE_AUDIO and DEFAULT_REFERENCE_AUDIO: gr.Examples( examples=[[DEFAULT_SOURCE_AUDIO, DEFAULT_REFERENCE_AUDIO]], inputs=[source_audio, reference_audio], label="📁 Example Audio Files (Click to load)", ) with gr.Column(): gr.Markdown("### ⚙️ Generation Parameters") temperature = gr.Slider( minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Temperature", info="Higher = more diverse, lower = more deterministic" ) repetition_penalty = gr.Slider( minimum=1.0, maximum=2.0, value=1.3, step=0.1, label="Repetition Penalty", info="Penalize repeated tokens (1.0 = off)" ) top_p = gr.Slider( minimum=0.5, maximum=1.0, value=0.9, step=0.05, label="Top-P (Nucleus Sampling)", info="Sample from top P probability mass" ) repetition_window = gr.Slider( minimum=3, maximum=10, value=5, step=1, label="Repetition Window", info="Look at last N tokens for repetition" ) max_tokens = gr.Slider( minimum=100, maximum=2000, value=2000, step=100, label="Max Tokens", info="Maximum tokens to generate" ) convert_btn = gr.Button("🎯 Convert Voice", variant="primary", size="lg") with gr.Row(): output_audio = gr.Audio( label="Converted Audio", type="filepath", autoplay=True ) status_text = gr.Textbox( label="Status", interactive=False ) gr.Markdown(""" ### 📖 How to Use 1. **Upload or record** your source audio (the speech you want to convert) - Click the microphone icon to record directly from your microphone - Or upload an audio file (WAV, MP3, etc.) 2. **Upload or record** your reference audio (the voice you want to mimic) - Click the microphone icon to record the target voice - Or upload a reference audio file 3. Adjust generation parameters if needed (defaults work well) 4. Click **Convert Voice** and wait for the result ### 💡 Tips - Keep audio clips under 30 seconds for best results - Reference audio should be clear speech (1+ seconds recommended) - When recording, speak clearly and minimize background noise - Higher repetition penalty helps avoid repetitive outputs - Lower temperature = more stable, higher = more creative """) convert_btn.click( fn=convert_voice, inputs=[ source_audio, reference_audio, temperature, repetition_penalty, top_p, repetition_window, max_tokens, ], outputs=[output_audio, status_text] ) if __name__ == "__main__": import sys print("=" * 60, flush=True) print("🚀 Starting LAVCO Gradio App", flush=True) print("=" * 60, flush=True) print(f"Device: {DEVICE}", flush=True) print(f"Model: {MODEL_ID}", flush=True) print(f"\n📁 Checking for default audio files...", flush=True) print(f" Examples directory: {os.path.abspath(EXAMPLES_DIR)}", flush=True) print(f" Source audio: {DEFAULT_SOURCE_AUDIO or 'Not found'}", flush=True) print(f" Reference audio: {DEFAULT_REFERENCE_AUDIO or 'Not found'}", flush=True) sys.stdout.flush() # Pre-load model at startup (so first user doesn't wait) print("\n⏳ Pre-loading model (this may take a few minutes)...", flush=True) sys.stdout.flush() try: load_model() print("✅ Model ready! Starting Gradio interface...", flush=True) sys.stdout.flush() except Exception as e: print(f"⚠️ Model pre-loading failed: {e}", flush=True) print(" Model will load on first use instead.", flush=True) import traceback traceback.print_exc() sys.stdout.flush() print("\n🌐 Launching web interface...", flush=True) sys.stdout.flush() demo.launch( server_name="0.0.0.0", # Listen on all network interfaces server_port=7860, # The default port HF expects share=False # Don't create a public share link (HF handles this) )