chtrbx / chatterbox /vc.py
Karay Akar
f1
60660de
from pathlib import Path
import logging
import librosa
import torch
import perth
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
import torch.nn.functional as F
from .models.t3 import T3
from .models.s3tokenizer import S3_SR, drop_invalid_tokens
from .models.s3gen import S3GEN_SR, S3Gen
from .models.tokenizers import EnTokenizer
from .models.voice_encoder import VoiceEncoder
from .models.t3.modules.cond_enc import T3Cond
from .models.t3.llama_configs import LLAMA_CONFIGS
#from .models.t3.t3_config import T3Config
from .models.t3.modules.t3_config import T3Config
from chatterbox.tts import Conditionals
REPO_ID = "ResembleAI/chatterbox"
def smart_load_t3_model(t3_state_dict, device="cpu"):
"""
Smart loading function that automatically detects and adjusts T3Config
based on the checkpoint dimensions, particularly text_tokens_dict_size.
"""
logger = logging.getLogger(__name__)
# Try loading with default config first
config = T3Config()
t3 = T3(config)
try:
t3.load_state_dict(t3_state_dict)
logger.info(f"Successfully loaded T3 model with default config (text_tokens_dict_size={config.text_tokens_dict_size})")
return t3.to(device).eval()
except RuntimeError as e:
error_msg = str(e)
logger.warning(f"Initial loading failed with default config: {error_msg}")
# Check if it's a size mismatch error for text embeddings
if "text_emb.weight" in error_msg and "size mismatch" in error_msg:
# Parse the error to extract the correct size
import re
# Look for pattern like "torch.Size([704, 1024])" in the error message
checkpoint_size_match = re.search(r'copying a param with shape torch\.Size\(\[(\d+), \d+\]\)', error_msg)
if checkpoint_size_match:
correct_text_tokens_dict_size = int(checkpoint_size_match.group(1))
logger.info(f"Detected correct text_tokens_dict_size from checkpoint: {correct_text_tokens_dict_size}")
# Create new config with correct size
corrected_config = T3Config()
corrected_config.text_tokens_dict_size = correct_text_tokens_dict_size
# Create new model with corrected config
t3_corrected = T3(corrected_config)
try:
t3_corrected.load_state_dict(t3_state_dict)
logger.info(f"Successfully loaded T3 model with corrected config (text_tokens_dict_size={correct_text_tokens_dict_size})")
return t3_corrected.to(device).eval()
except RuntimeError as retry_error:
logger.error(f"Failed to load even with corrected config: {retry_error}")
raise retry_error
else:
logger.error(f"Could not parse checkpoint size from error message: {error_msg}")
raise e
else:
# Different type of error, re-raise
logger.error(f"Non-size-mismatch error during loading: {error_msg}")
raise e
class ChatterboxVC:
ENC_COND_LEN = 6 * S3_SR
DEC_COND_LEN = 10 * S3GEN_SR
def __init__(
self,
s3gen: S3Gen,
device: str,
ref_dict: dict=None,
):
self.sr = S3GEN_SR
self.s3gen = s3gen
self.device = device
self.watermarker = perth.PerthImplicitWatermarker()
if ref_dict is None:
self.ref_dict = None
else:
self.ref_dict = {
k: v.to(device) if torch.is_tensor(v) else v
for k, v in ref_dict.items()
}
@classmethod
def from_specified(
cls,
voice_encoder_path,
t3_path,
s3gen_path,
tokenizer_path,
conds_path,
device
):
if device in ["cpu", "mps"]:
map_location = torch.device('cpu')
else:
map_location = None
ve = VoiceEncoder()
ve.load_state_dict(
load_file(voice_encoder_path)
)
ve.to(device).eval()
# Load T3 model with smart loading
t3_state = load_file(t3_path)
# if "model" in t3_state.keys():
# t3_state = t3_state["model"][0]
if any(k.startswith("t3.") for k in t3_state):
t3_state = {k[len("t3."):]: v for k, v in t3_state.items()}
t3 = smart_load_t3_model(t3_state, device)
s3gen = S3Gen()
s3gen.load_state_dict(
load_file(s3gen_path), strict=False
)
s3gen.to(device).eval()
tokenizer = EnTokenizer(
str(tokenizer_path)
)
conds = None
if (builtin_voice := conds_path).exists():
conds = Conditionals.load(builtin_voice, map_location=map_location).to(device)
return cls(t3, s3gen, ve, tokenizer, device, conds=conds)
@classmethod
def from_specifiedVC(
cls,
voice_encoder_path,
t3_path,
s3gen_path,
tokenizer_path,
conds_path,
device
):
if device in ["cpu", "mps"]:
map_location = torch.device('cpu')
else:
map_location = None
ve = VoiceEncoder()
ve.load_state_dict(
load_file(voice_encoder_path)
)
ve.to(device).eval()
# Load T3 model with smart loading
t3_state = load_file(t3_path)
# if "model" in t3_state.keys():
# t3_state = t3_state["model"][0]
if any(k.startswith("t3.") for k in t3_state):
t3_state = {k[len("t3."):]: v for k, v in t3_state.items()}
t3 = smart_load_t3_model(t3_state, device)
s3gen = S3Gen()
s3gen.load_state_dict(
load_file(s3gen_path), strict=False
)
s3gen.to(device).eval()
tokenizer = EnTokenizer(
str(tokenizer_path)
)
conds = None
if (builtin_voice := conds_path).exists():
conds = Conditionals.load(builtin_voice, map_location=map_location).to(device)
return cls.from_local("G:\\OPENAI\\chatterbox\\chathherbox2train", device)
#return cls(t3, s3gen, ve, tokenizer, device, conds=conds)
@classmethod
def from_local(cls, ckpt_dir, device) -> 'ChatterboxVC':
ckpt_dir = Path(ckpt_dir)
print(f"ckpt_dir:{ckpt_dir}")
# Always load to CPU first for non-CUDA devices to handle CUDA-saved models
if device in ["cpu", "mps"]:
map_location = torch.device('cpu')
else:
map_location = None
ref_dict = None
if (builtin_voice := ckpt_dir / "conds.pt").exists():
states = torch.load(builtin_voice, map_location=map_location)
ref_dict = states['gen']
s3gen = S3Gen()
s3gen.load_state_dict(
load_file(ckpt_dir / "s3gen.safetensors"), strict=False
)
s3gen.to(device).eval()
return cls(s3gen, device, ref_dict=ref_dict)
@classmethod
def from_pretrained(cls, device) -> 'ChatterboxVC':
# Check if MPS is available on macOS
if device == "mps" and not torch.backends.mps.is_available():
if not torch.backends.mps.is_built():
print("MPS not available because the current PyTorch install was not built with MPS enabled.")
else:
print("MPS not available because the current MacOS version is not 12.3+ and/or you do not have an MPS-enabled device on this machine.")
device = "cpu"
for fpath in ["s3gen.safetensors", "conds.pt"]:
local_path = hf_hub_download(repo_id=REPO_ID, filename=fpath)
print(f"Path(local_path).parent:{Path(local_path).parent}")
return cls.from_local(Path(local_path).parent, device)
def set_target_voice(self, wav_fpath):
## Load reference wav
print(f"ref:{wav_fpath}")
s3gen_ref_wav, _sr = librosa.load(wav_fpath, sr=S3GEN_SR)
s3gen_ref_wav = s3gen_ref_wav[:self.DEC_COND_LEN]
self.ref_dict = self.s3gen.embed_ref(s3gen_ref_wav, S3GEN_SR, device=self.device)
def generate(
self,
audio,
target_voice_path=None,
):
if target_voice_path:
self.set_target_voice(target_voice_path)
else:
assert self.ref_dict is not None, "Please `prepare_conditionals` first or specify `target_voice_path`"
with torch.inference_mode():
audio_16, _ = librosa.load(audio, sr=S3_SR)
audio_16 = torch.from_numpy(audio_16).float().to(self.device)[None, ]
s3_tokens, _ = self.s3gen.tokenizer(audio_16)
wav, _ = self.s3gen.inference(
speech_tokens=s3_tokens,
ref_dict=self.ref_dict,
)
wav = wav.squeeze(0).detach().cpu().numpy()
#watermarked_wav = self.watermarker.apply_watermark(wav, sample_rate=self.sr)
#return torch.from_numpy(watermarked_wav).unsqueeze(0)
return torch.from_numpy(wav).unsqueeze(0)