|
|
""" |
|
|
LAVCO Gradio App for HuggingFace Spaces |
|
|
|
|
|
A beautiful web interface for voice conversion using LAVCO (Llasa-VC). |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import tempfile |
|
|
import gradio as gr |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import numpy as np |
|
|
import soundfile as sf |
|
|
import librosa |
|
|
from typing import List, Optional, Dict, Tuple |
|
|
from transformers import ( |
|
|
AutoModelForCausalLM, |
|
|
AutoTokenizer, |
|
|
WhisperModel, |
|
|
WhisperFeatureExtractor, |
|
|
) |
|
|
|
|
|
|
|
|
XCODEC2_FRAME_RATE = 50 |
|
|
WHISPER_FRAME_RATE = 50 |
|
|
|
|
|
|
|
|
MODEL_ID = os.getenv("MODEL_ID", "AdoCleanCode/LAVCO-v3") |
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
|
|
|
EXAMPLES_DIR = "examples" |
|
|
DEFAULT_SOURCE_PATH = os.path.join(EXAMPLES_DIR, "sample1_source.wav") |
|
|
DEFAULT_REFERENCE_PATH = os.path.join(EXAMPLES_DIR, "sample1_reference.wav") |
|
|
|
|
|
|
|
|
DEFAULT_SOURCE_AUDIO = None |
|
|
DEFAULT_REFERENCE_AUDIO = None |
|
|
|
|
|
if os.path.exists(DEFAULT_SOURCE_PATH): |
|
|
DEFAULT_SOURCE_AUDIO = os.path.abspath(DEFAULT_SOURCE_PATH) |
|
|
print(f"β
Found default source audio: {DEFAULT_SOURCE_AUDIO}", flush=True) |
|
|
else: |
|
|
print(f"β οΈ Default source audio not found: {DEFAULT_SOURCE_PATH}", flush=True) |
|
|
|
|
|
if os.path.exists(DEFAULT_REFERENCE_PATH): |
|
|
DEFAULT_REFERENCE_AUDIO = os.path.abspath(DEFAULT_REFERENCE_PATH) |
|
|
print(f"β
Found default reference audio: {DEFAULT_REFERENCE_AUDIO}", flush=True) |
|
|
else: |
|
|
print(f"β οΈ Default reference audio not found: {DEFAULT_REFERENCE_PATH}", flush=True) |
|
|
|
|
|
|
|
|
model = None |
|
|
tokenizer = None |
|
|
|
|
|
|
|
|
class SpeechOnlyLogitsProcessor: |
|
|
"""Only allow XCodec2 speech tokens and custom EOS.""" |
|
|
|
|
|
def __init__(self, tokenizer, eos_id: int): |
|
|
self.allowed = torch.zeros(len(tokenizer), dtype=torch.bool) |
|
|
vocab = tokenizer.get_vocab() |
|
|
pat = re.compile(r"^<\|s_\d+\|>$") |
|
|
for t, tid in vocab.items(): |
|
|
if pat.match(t): |
|
|
self.allowed[tid] = True |
|
|
self.allowed[eos_id] = True |
|
|
|
|
|
def __call__(self, input_ids, scores): |
|
|
mask = self.allowed.to(scores.device) |
|
|
return scores.masked_fill(~mask, float("-inf")) |
|
|
|
|
|
|
|
|
def apply_repetition_penalty(logits: torch.Tensor, generated_ids: List[int], penalty: float = 1.2, window: int = 5): |
|
|
"""Apply repetition penalty ONLY to recently repeated tokens.""" |
|
|
if penalty == 1.0 or len(generated_ids) < 2: |
|
|
return logits |
|
|
|
|
|
recent_tokens = generated_ids[-window:] if len(generated_ids) >= window else generated_ids |
|
|
token_counts = {} |
|
|
for token_id in recent_tokens: |
|
|
token_counts[token_id] = token_counts.get(token_id, 0) + 1 |
|
|
|
|
|
for token_id, count in token_counts.items(): |
|
|
if count > 1: |
|
|
effective_penalty = penalty ** (count - 1) |
|
|
if logits[0, token_id] > 0: |
|
|
logits[0, token_id] /= effective_penalty |
|
|
else: |
|
|
logits[0, token_id] *= effective_penalty |
|
|
|
|
|
return logits |
|
|
|
|
|
|
|
|
def sample_with_temperature_and_top_p(logits: torch.Tensor, temperature: float = 1.0, top_p: float = 0.9): |
|
|
"""Sample token with temperature scaling and nucleus (top-p) sampling.""" |
|
|
if temperature != 1.0: |
|
|
logits = logits / temperature |
|
|
|
|
|
probs = torch.softmax(logits, dim=-1) |
|
|
|
|
|
if top_p < 1.0: |
|
|
sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1) |
|
|
cumulative_probs = torch.cumsum(sorted_probs, dim=-1) |
|
|
sorted_indices_to_remove = cumulative_probs > top_p |
|
|
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() |
|
|
sorted_indices_to_remove[..., 0] = False |
|
|
indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) |
|
|
probs = probs.masked_fill(indices_to_remove, 0.0) |
|
|
probs = probs / probs.sum(dim=-1, keepdim=True) |
|
|
|
|
|
next_token_id = torch.multinomial(probs, num_samples=1).item() |
|
|
return next_token_id |
|
|
|
|
|
|
|
|
def greedy_generate_with_embeds( |
|
|
model, |
|
|
inputs_embeds: torch.Tensor, |
|
|
embed_layer, |
|
|
logits_processor, |
|
|
max_new_tokens: int, |
|
|
eos_token_id: int, |
|
|
pad_token_id: int = 0, |
|
|
verbose: bool = False, |
|
|
tokenizer=None, |
|
|
temperature: float = 1.0, |
|
|
repetition_penalty: float = 1.2, |
|
|
top_p: float = 0.9, |
|
|
repetition_window: int = 5, |
|
|
) -> List[int]: |
|
|
"""KV-cache enabled greedy generation starting from inputs_embeds.""" |
|
|
device = inputs_embeds.device |
|
|
generated = [] |
|
|
past_key_values = None |
|
|
|
|
|
cur_embeds = inputs_embeds |
|
|
dummy_input_ids = torch.zeros(1, inputs_embeds.shape[1], dtype=torch.long, device=device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model( |
|
|
inputs_embeds=cur_embeds, |
|
|
use_cache=True, |
|
|
return_dict=True, |
|
|
) |
|
|
logits = outputs.logits[:, -1, :] |
|
|
past_key_values = outputs.past_key_values |
|
|
|
|
|
logits = logits_processor(dummy_input_ids, logits) |
|
|
logits = apply_repetition_penalty(logits, generated, repetition_penalty, repetition_window) |
|
|
|
|
|
if temperature == 1.0 and top_p == 1.0: |
|
|
next_token_id = torch.argmax(logits, dim=-1).item() |
|
|
else: |
|
|
next_token_id = sample_with_temperature_and_top_p(logits, temperature, top_p) |
|
|
|
|
|
generated.append(next_token_id) |
|
|
|
|
|
if next_token_id == eos_token_id: |
|
|
return generated |
|
|
|
|
|
for step in range(1, max_new_tokens): |
|
|
new_token_embed = embed_layer(torch.tensor([[next_token_id]], device=device)) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model( |
|
|
inputs_embeds=new_token_embed, |
|
|
past_key_values=past_key_values, |
|
|
use_cache=True, |
|
|
return_dict=True, |
|
|
) |
|
|
logits = outputs.logits[:, -1, :] |
|
|
past_key_values = outputs.past_key_values |
|
|
|
|
|
dummy_input_ids = torch.cat([ |
|
|
dummy_input_ids, |
|
|
torch.tensor([[next_token_id]], device=device) |
|
|
], dim=1) |
|
|
logits = logits_processor(dummy_input_ids, logits) |
|
|
logits = apply_repetition_penalty(logits, generated, repetition_penalty, repetition_window) |
|
|
|
|
|
if temperature == 1.0 and top_p == 1.0: |
|
|
next_token_id = torch.argmax(logits, dim=-1).item() |
|
|
else: |
|
|
next_token_id = sample_with_temperature_and_top_p(logits, temperature, top_p) |
|
|
|
|
|
generated.append(next_token_id) |
|
|
|
|
|
if next_token_id == eos_token_id: |
|
|
break |
|
|
|
|
|
return generated |
|
|
|
|
|
|
|
|
class LAVCOModel(nn.Module): |
|
|
"""LAVCO model for voice conversion.""" |
|
|
|
|
|
def __init__(self, load_dir_or_repo: str, device: str = "cuda", cache_dir: str = None): |
|
|
super().__init__() |
|
|
import json |
|
|
from huggingface_hub import hf_hub_download, snapshot_download |
|
|
from xcodec2.modeling_xcodec2 import XCodec2Model |
|
|
|
|
|
is_local = os.path.isdir(load_dir_or_repo) |
|
|
|
|
|
if is_local: |
|
|
config_path = os.path.join(load_dir_or_repo, "llasa_vc_config.json") |
|
|
proj_path = os.path.join(load_dir_or_repo, "projection.pt") |
|
|
llasa_path = os.path.join(load_dir_or_repo, "llasa") |
|
|
else: |
|
|
print(f"π₯ Downloading from HuggingFace: {load_dir_or_repo}") |
|
|
config_path = hf_hub_download( |
|
|
repo_id=load_dir_or_repo, |
|
|
filename="llasa_vc_config.json", |
|
|
cache_dir=cache_dir, |
|
|
) |
|
|
proj_path = hf_hub_download( |
|
|
repo_id=load_dir_or_repo, |
|
|
filename="projection.pt", |
|
|
cache_dir=cache_dir, |
|
|
) |
|
|
llasa_path = snapshot_download( |
|
|
repo_id=load_dir_or_repo, |
|
|
allow_patterns=["llasa/*"], |
|
|
cache_dir=cache_dir, |
|
|
) |
|
|
llasa_path = os.path.join(llasa_path, "llasa") |
|
|
|
|
|
with open(config_path, "r") as f: |
|
|
config = json.load(f) |
|
|
|
|
|
import sys |
|
|
print(f"π₯ Loading LLASA from {llasa_path}...", flush=True) |
|
|
sys.stdout.flush() |
|
|
self.llasa = AutoModelForCausalLM.from_pretrained( |
|
|
llasa_path, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.bfloat16, |
|
|
) |
|
|
self.hidden_size = self.llasa.config.hidden_size |
|
|
print(f" β
LLASA loaded (hidden_size={self.hidden_size})", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
print(f"π₯ Loading Whisper encoder from {config['whisper_model']}...", flush=True) |
|
|
sys.stdout.flush() |
|
|
whisper_full = WhisperModel.from_pretrained(config["whisper_model"]) |
|
|
self.whisper = whisper_full.encoder |
|
|
self.whisper_dim = self.whisper.config.d_model |
|
|
del whisper_full |
|
|
print(f" β
Whisper loaded (dim={self.whisper_dim})", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
print(f"π₯ Loading XCodec2 from {config['xcodec_model']}...", flush=True) |
|
|
sys.stdout.flush() |
|
|
self.xcodec = XCodec2Model.from_pretrained(config["xcodec_model"]) |
|
|
self.xcodec.eval() |
|
|
print(f" β
XCodec2 loaded", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
print(f"π₯ Loading Whisper processor...", flush=True) |
|
|
sys.stdout.flush() |
|
|
self.whisper_processor = WhisperFeatureExtractor.from_pretrained(config["whisper_model"]) |
|
|
print(f" β
Whisper processor loaded", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
print(f"π₯ Loading projection layer...", flush=True) |
|
|
sys.stdout.flush() |
|
|
proj_state = torch.load(proj_path, map_location="cpu", weights_only=False) |
|
|
self.projection = nn.Linear(self.whisper_dim, self.hidden_size) |
|
|
self.projection.load_state_dict(proj_state) |
|
|
print(f" β
Projection layer loaded", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
self.u_start_id = config.get("u_start_id") |
|
|
self.u_end_id = config.get("u_end_id") |
|
|
self.g_start_id = config["g_start_id"] |
|
|
self.g_end_id = config["g_end_id"] |
|
|
self.pad_id = config["pad_id"] |
|
|
|
|
|
for param in self.whisper.parameters(): |
|
|
param.requires_grad = False |
|
|
self.whisper.eval() |
|
|
|
|
|
for param in self.xcodec.parameters(): |
|
|
param.requires_grad = False |
|
|
self.xcodec.eval() |
|
|
|
|
|
def set_special_token_ids(self, tokenizer): |
|
|
"""Set special token IDs and instruction text embeddings.""" |
|
|
self.tokenizer = tokenizer |
|
|
self.u_start_id = tokenizer.convert_tokens_to_ids("<|SPEECH_UNDERSTANDING_START|>") |
|
|
self.u_end_id = tokenizer.convert_tokens_to_ids("<|SPEECH_UNDERSTANDING_END|>") |
|
|
self.g_start_id = tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_START|>") |
|
|
self.g_end_id = tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_END|>") |
|
|
self.pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0 |
|
|
|
|
|
prefix_text = "Convert " |
|
|
middle_text = " into speech using this speaker: " |
|
|
|
|
|
self.prefix_ids = tokenizer(prefix_text, add_special_tokens=False, return_tensors="pt")["input_ids"] |
|
|
self.middle_ids = tokenizer(middle_text, add_special_tokens=False, return_tensors="pt")["input_ids"] |
|
|
|
|
|
def _tokenizer_ids_to_xcodec_codes(self, tokenizer_ids: torch.Tensor) -> torch.Tensor: |
|
|
"""Convert LLASA tokenizer IDs back to raw XCodec2 codes (0-65535).""" |
|
|
batch_size, seq_len = tokenizer_ids.shape |
|
|
xcodec_codes = torch.zeros_like(tokenizer_ids) |
|
|
|
|
|
for i in range(batch_size): |
|
|
tokens = self.tokenizer.convert_ids_to_tokens(tokenizer_ids[i].tolist()) |
|
|
for j, tok in enumerate(tokens): |
|
|
if tok and tok.startswith("<|s_") and tok.endswith("|>"): |
|
|
try: |
|
|
code = int(tok[4:-2]) |
|
|
xcodec_codes[i, j] = code |
|
|
except ValueError: |
|
|
xcodec_codes[i, j] = 0 |
|
|
else: |
|
|
xcodec_codes[i, j] = 0 |
|
|
|
|
|
return xcodec_codes |
|
|
|
|
|
def generate( |
|
|
self, |
|
|
wav_or_mel: np.ndarray, |
|
|
ref_ids: torch.Tensor, |
|
|
ref_length: int, |
|
|
max_new_tokens: int = 2000, |
|
|
tokenizer=None, |
|
|
temperature: float = 1.0, |
|
|
repetition_penalty: float = 1.2, |
|
|
top_p: float = 0.9, |
|
|
repetition_window: int = 5, |
|
|
verbose: bool = False, |
|
|
) -> List[int]: |
|
|
"""Generate voice conversion tokens.""" |
|
|
device = ref_ids.device |
|
|
model_dtype = next(self.llasa.parameters()).dtype |
|
|
|
|
|
mel = self.whisper_processor(wav_or_mel, sampling_rate=16000, return_tensors="pt").input_features.to(device) |
|
|
whisper_out = self.whisper(mel).last_hidden_state |
|
|
|
|
|
audio_dur = len(wav_or_mel) / 16000 |
|
|
num_frames = min(int(audio_dur * WHISPER_FRAME_RATE), 1500) |
|
|
soft_tokens = self.projection(whisper_out[:, :num_frames]).to(model_dtype) |
|
|
|
|
|
embed_layer = self.llasa.get_input_embeddings() |
|
|
|
|
|
prefix_emb = embed_layer(self.prefix_ids.to(device)) |
|
|
middle_emb = embed_layer(self.middle_ids.to(device)) |
|
|
u_start_emb = embed_layer(torch.tensor([[self.u_start_id]], device=device)) |
|
|
u_end_emb = embed_layer(torch.tensor([[self.u_end_id]], device=device)) |
|
|
g_start_emb = embed_layer(torch.tensor([[self.g_start_id]], device=device)) |
|
|
|
|
|
ref_embeds = embed_layer(ref_ids[:, :ref_length]) |
|
|
|
|
|
inputs_embeds = torch.cat([ |
|
|
prefix_emb, |
|
|
soft_tokens, |
|
|
middle_emb, |
|
|
u_start_emb, |
|
|
ref_embeds, |
|
|
u_end_emb, |
|
|
g_start_emb, |
|
|
], dim=1).to(model_dtype) |
|
|
|
|
|
if tokenizer is not None: |
|
|
logits_processor = SpeechOnlyLogitsProcessor(tokenizer, self.g_end_id) |
|
|
|
|
|
generated = greedy_generate_with_embeds( |
|
|
model=self.llasa, |
|
|
inputs_embeds=inputs_embeds, |
|
|
embed_layer=embed_layer, |
|
|
logits_processor=logits_processor, |
|
|
max_new_tokens=max_new_tokens, |
|
|
eos_token_id=self.g_end_id, |
|
|
pad_token_id=self.pad_id, |
|
|
verbose=verbose, |
|
|
tokenizer=tokenizer, |
|
|
temperature=temperature, |
|
|
repetition_penalty=repetition_penalty, |
|
|
top_p=top_p, |
|
|
repetition_window=repetition_window, |
|
|
) |
|
|
return generated |
|
|
else: |
|
|
outputs = self.llasa.generate( |
|
|
inputs_embeds=inputs_embeds, |
|
|
max_new_tokens=max_new_tokens, |
|
|
pad_token_id=self.pad_id, |
|
|
eos_token_id=self.g_end_id, |
|
|
do_sample=False, |
|
|
) |
|
|
return outputs[0].tolist() |
|
|
|
|
|
|
|
|
def load_model(): |
|
|
"""Load model once at startup.""" |
|
|
global model, tokenizer |
|
|
|
|
|
if model is None: |
|
|
import sys |
|
|
import time |
|
|
|
|
|
print(f"π₯ Loading model: {MODEL_ID}", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
start_time = time.time() |
|
|
print(" β Loading LAVCO model components...", flush=True) |
|
|
model = LAVCOModel(MODEL_ID, device=DEVICE) |
|
|
print(f" β Moving model to {DEVICE}...", flush=True) |
|
|
model = model.to(DEVICE) |
|
|
model.eval() |
|
|
print(f" β Loading tokenizer...", flush=True) |
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
|
|
print(f" β Setting special tokens...", flush=True) |
|
|
model.set_special_token_ids(tokenizer) |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
print(f"β
Model loaded in {elapsed:.1f}s", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
return model, tokenizer |
|
|
|
|
|
|
|
|
def extract_xcodec2_from_generated(tokenizer, token_ids: list) -> list: |
|
|
"""Extract XCodec2 token IDs from generated token IDs.""" |
|
|
xcodec2_ids = [] |
|
|
for tid in token_ids: |
|
|
token = tokenizer.convert_ids_to_tokens(tid) |
|
|
if token and token.startswith("<|s_") and token.endswith("|>"): |
|
|
try: |
|
|
xcodec2_ids.append(int(token[4:-2])) |
|
|
except ValueError: |
|
|
pass |
|
|
return xcodec2_ids |
|
|
|
|
|
|
|
|
def convert_voice(source_audio, reference_audio, temperature, repetition_penalty, top_p, repetition_window, max_tokens, progress=gr.Progress()): |
|
|
"""Convert source voice to reference voice using LAVCO.""" |
|
|
if source_audio is None: |
|
|
return None, "β Please provide source audio" |
|
|
|
|
|
if reference_audio is None: |
|
|
return None, "β Please provide reference audio" |
|
|
|
|
|
try: |
|
|
progress(0.1, desc="Loading model...") |
|
|
model, tokenizer = load_model() |
|
|
|
|
|
progress(0.2, desc="Loading audio files...") |
|
|
if isinstance(source_audio, tuple): |
|
|
source_path = source_audio[1] |
|
|
else: |
|
|
source_path = source_audio |
|
|
|
|
|
if isinstance(reference_audio, tuple): |
|
|
reference_path = reference_audio[1] |
|
|
else: |
|
|
reference_path = reference_audio |
|
|
|
|
|
source_wav = librosa.load(source_path, sr=16000)[0].astype(np.float32) |
|
|
reference_wav = librosa.load(reference_path, sr=16000)[0].astype(np.float32) |
|
|
|
|
|
progress(0.4, desc="Encoding audio...") |
|
|
with torch.no_grad(): |
|
|
xcodec_device = next(model.xcodec.parameters()).device |
|
|
ref_tensor_audio = torch.from_numpy(reference_wav).float().unsqueeze(0).to(xcodec_device) |
|
|
ref_codes = model.xcodec.encode_code(input_waveform=ref_tensor_audio) |
|
|
|
|
|
if isinstance(ref_codes, torch.Tensor): |
|
|
ref_codes_np = ref_codes.cpu().numpy() |
|
|
else: |
|
|
ref_codes_np = np.array(ref_codes) |
|
|
ref_xcodec_ids = ref_codes_np.flatten().astype(int).tolist() |
|
|
|
|
|
ref_token_str = "".join([f"<|s_{rid}|>" for rid in ref_xcodec_ids]) |
|
|
ref_tokenizer_ids = tokenizer(ref_token_str, add_special_tokens=False)["input_ids"] |
|
|
ref_ids = torch.tensor(ref_tokenizer_ids, dtype=torch.long, device=DEVICE).unsqueeze(0) |
|
|
ref_length = len(ref_tokenizer_ids) |
|
|
|
|
|
source_tensor_audio = torch.from_numpy(source_wav).float().unsqueeze(0).to(xcodec_device) |
|
|
source_codes = model.xcodec.encode_code(input_waveform=source_tensor_audio) |
|
|
|
|
|
if isinstance(source_codes, torch.Tensor): |
|
|
source_codes_np = source_codes.cpu().numpy() |
|
|
else: |
|
|
source_codes_np = np.array(source_codes) |
|
|
source_xcodec_ids = source_codes_np.flatten().astype(int).tolist() |
|
|
|
|
|
source_token_str = "".join([f"<|s_{rid}|>" for rid in source_xcodec_ids]) |
|
|
source_tokenizer_ids = tokenizer(source_token_str, add_special_tokens=False)["input_ids"] |
|
|
seedvc_ids = torch.tensor(source_tokenizer_ids, dtype=torch.long, device=DEVICE).unsqueeze(0) |
|
|
seedvc_length = len(source_tokenizer_ids) |
|
|
|
|
|
xcodec_codes = model._tokenizer_ids_to_xcodec_codes(seedvc_ids) |
|
|
codes = xcodec_codes.unsqueeze(1).to(xcodec_device) |
|
|
wav = model.xcodec.decode_code(codes) |
|
|
if len(wav.shape) == 3: |
|
|
wav = wav.squeeze(1) |
|
|
num_samples_audio = int(seedvc_length / XCODEC2_FRAME_RATE * 16000) |
|
|
num_samples_audio = min(num_samples_audio, wav.shape[-1]) |
|
|
source_wav_processed = wav[0, :num_samples_audio].cpu().numpy() |
|
|
|
|
|
progress(0.7, desc="Generating voice conversion...") |
|
|
import inspect |
|
|
gen_sig = inspect.signature(model.generate) |
|
|
gen_params = gen_sig.parameters |
|
|
|
|
|
gen_kwargs = { |
|
|
'max_new_tokens': max_tokens, |
|
|
'tokenizer': tokenizer, |
|
|
'verbose': False, |
|
|
} |
|
|
|
|
|
if 'temperature' in gen_params: |
|
|
gen_kwargs['temperature'] = temperature |
|
|
if 'repetition_penalty' in gen_params: |
|
|
gen_kwargs['repetition_penalty'] = repetition_penalty |
|
|
if 'top_p' in gen_params: |
|
|
gen_kwargs['top_p'] = top_p |
|
|
if 'repetition_window' in gen_params: |
|
|
gen_kwargs['repetition_window'] = repetition_window |
|
|
|
|
|
generated_token_ids = model.generate( |
|
|
source_wav_processed, |
|
|
ref_ids, |
|
|
ref_length, |
|
|
**gen_kwargs |
|
|
) |
|
|
|
|
|
progress(0.9, desc="Decoding audio...") |
|
|
gen_xcodec_ids = extract_xcodec2_from_generated(tokenizer, generated_token_ids) |
|
|
|
|
|
if not gen_xcodec_ids: |
|
|
return None, "β No audio tokens generated!" |
|
|
|
|
|
codes = torch.tensor(gen_xcodec_ids, device=xcodec_device).unsqueeze(0).unsqueeze(0) |
|
|
output_wav = model.xcodec.decode_code(codes) |
|
|
|
|
|
if len(output_wav.shape) == 3: |
|
|
output_wav = output_wav[0, 0, :].cpu().numpy() |
|
|
elif len(output_wav.shape) == 2: |
|
|
output_wav = output_wav[0, :].cpu().numpy() |
|
|
else: |
|
|
output_wav = output_wav.cpu().numpy() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: |
|
|
sf.write(tmp_file.name, output_wav, 16000) |
|
|
output_path = tmp_file.name |
|
|
|
|
|
progress(1.0, desc="Complete!") |
|
|
return output_path, f"β
Generated {len(gen_xcodec_ids)} tokens ({len(gen_xcodec_ids)/XCODEC2_FRAME_RATE:.2f}s)" |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_msg = f"β Error: {str(e)}\n{traceback.format_exc()}" |
|
|
return None, error_msg |
|
|
|
|
|
|
|
|
|
|
|
css = """ |
|
|
.gradio-container { |
|
|
font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; |
|
|
} |
|
|
.main-header { |
|
|
text-align: center; |
|
|
padding: 2rem 0; |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; |
|
|
border-radius: 10px; |
|
|
margin-bottom: 2rem; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
<div class="main-header"> |
|
|
<h1>π€ LAVCO: Voice Conversion</h1> |
|
|
<p>Convert speech to match any reference voice using semantic/acoustic interleaving</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### π₯ Input Audio") |
|
|
source_audio = gr.Audio( |
|
|
label="Source Audio (content to convert)", |
|
|
type="filepath", |
|
|
sources=["upload", "microphone"] |
|
|
) |
|
|
reference_audio = gr.Audio( |
|
|
label="Reference Audio (target voice)", |
|
|
type="filepath", |
|
|
sources=["upload", "microphone"] |
|
|
) |
|
|
|
|
|
|
|
|
if DEFAULT_SOURCE_AUDIO and DEFAULT_REFERENCE_AUDIO: |
|
|
gr.Examples( |
|
|
examples=[[DEFAULT_SOURCE_AUDIO, DEFAULT_REFERENCE_AUDIO]], |
|
|
inputs=[source_audio, reference_audio], |
|
|
label="π Example Audio Files (Click to load)", |
|
|
) |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("### βοΈ Generation Parameters") |
|
|
temperature = gr.Slider( |
|
|
minimum=0.5, |
|
|
maximum=2.0, |
|
|
value=1.0, |
|
|
step=0.1, |
|
|
label="Temperature", |
|
|
info="Higher = more diverse, lower = more deterministic" |
|
|
) |
|
|
repetition_penalty = gr.Slider( |
|
|
minimum=1.0, |
|
|
maximum=2.0, |
|
|
value=1.3, |
|
|
step=0.1, |
|
|
label="Repetition Penalty", |
|
|
info="Penalize repeated tokens (1.0 = off)" |
|
|
) |
|
|
top_p = gr.Slider( |
|
|
minimum=0.5, |
|
|
maximum=1.0, |
|
|
value=0.9, |
|
|
step=0.05, |
|
|
label="Top-P (Nucleus Sampling)", |
|
|
info="Sample from top P probability mass" |
|
|
) |
|
|
repetition_window = gr.Slider( |
|
|
minimum=3, |
|
|
maximum=10, |
|
|
value=5, |
|
|
step=1, |
|
|
label="Repetition Window", |
|
|
info="Look at last N tokens for repetition" |
|
|
) |
|
|
max_tokens = gr.Slider( |
|
|
minimum=100, |
|
|
maximum=2000, |
|
|
value=2000, |
|
|
step=100, |
|
|
label="Max Tokens", |
|
|
info="Maximum tokens to generate" |
|
|
) |
|
|
|
|
|
convert_btn = gr.Button("π― Convert Voice", variant="primary", size="lg") |
|
|
|
|
|
with gr.Row(): |
|
|
output_audio = gr.Audio( |
|
|
label="Converted Audio", |
|
|
type="filepath", |
|
|
autoplay=True |
|
|
) |
|
|
status_text = gr.Textbox( |
|
|
label="Status", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
### π How to Use |
|
|
|
|
|
1. **Upload or record** your source audio (the speech you want to convert) |
|
|
- Click the microphone icon to record directly from your microphone |
|
|
- Or upload an audio file (WAV, MP3, etc.) |
|
|
2. **Upload or record** your reference audio (the voice you want to mimic) |
|
|
- Click the microphone icon to record the target voice |
|
|
- Or upload a reference audio file |
|
|
3. Adjust generation parameters if needed (defaults work well) |
|
|
4. Click **Convert Voice** and wait for the result |
|
|
|
|
|
### π‘ Tips |
|
|
|
|
|
- Keep audio clips under 30 seconds for best results |
|
|
- Reference audio should be clear speech (1+ seconds recommended) |
|
|
- When recording, speak clearly and minimize background noise |
|
|
- Higher repetition penalty helps avoid repetitive outputs |
|
|
- Lower temperature = more stable, higher = more creative |
|
|
""") |
|
|
|
|
|
convert_btn.click( |
|
|
fn=convert_voice, |
|
|
inputs=[ |
|
|
source_audio, |
|
|
reference_audio, |
|
|
temperature, |
|
|
repetition_penalty, |
|
|
top_p, |
|
|
repetition_window, |
|
|
max_tokens, |
|
|
], |
|
|
outputs=[output_audio, status_text] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import sys |
|
|
print("=" * 60, flush=True) |
|
|
print("π Starting LAVCO Gradio App", flush=True) |
|
|
print("=" * 60, flush=True) |
|
|
print(f"Device: {DEVICE}", flush=True) |
|
|
print(f"Model: {MODEL_ID}", flush=True) |
|
|
print(f"\nπ Checking for default audio files...", flush=True) |
|
|
print(f" Examples directory: {os.path.abspath(EXAMPLES_DIR)}", flush=True) |
|
|
print(f" Source audio: {DEFAULT_SOURCE_AUDIO or 'Not found'}", flush=True) |
|
|
print(f" Reference audio: {DEFAULT_REFERENCE_AUDIO or 'Not found'}", flush=True) |
|
|
sys.stdout.flush() |
|
|
|
|
|
|
|
|
print("\nβ³ Pre-loading model (this may take a few minutes)...", flush=True) |
|
|
sys.stdout.flush() |
|
|
try: |
|
|
load_model() |
|
|
print("β
Model ready! Starting Gradio interface...", flush=True) |
|
|
sys.stdout.flush() |
|
|
except Exception as e: |
|
|
print(f"β οΈ Model pre-loading failed: {e}", flush=True) |
|
|
print(" Model will load on first use instead.", flush=True) |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
sys.stdout.flush() |
|
|
|
|
|
print("\nπ Launching web interface...", flush=True) |
|
|
sys.stdout.flush() |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False |
|
|
) |
|
|
|