|
|
import os
|
|
|
import re
|
|
|
import time
|
|
|
import torch
|
|
|
import librosa
|
|
|
import numpy as np
|
|
|
import json
|
|
|
from pathlib import Path
|
|
|
from typing import Generator
|
|
|
from huggingface_hub import snapshot_download
|
|
|
|
|
|
|
|
|
import neucodec.model
|
|
|
|
|
|
def _apply_robust_patch(target_cls):
|
|
|
"""Vá lỗi AssertionError và TypeError cho thư viện neucodec"""
|
|
|
orig_func = target_cls._from_pretrained
|
|
|
|
|
|
@classmethod
|
|
|
def _patched_func(cls, *args, **kwargs):
|
|
|
|
|
|
official_id = "neuphonic/distill-neucodec" if "distill" in str(cls).lower() else "neuphonic/neucodec"
|
|
|
|
|
|
|
|
|
if args:
|
|
|
kwargs["model_id"] = official_id
|
|
|
return orig_func(*args[1:], **kwargs)
|
|
|
else:
|
|
|
kwargs["model_id"] = official_id
|
|
|
return orig_func(**kwargs)
|
|
|
|
|
|
target_cls._from_pretrained = _patched_func
|
|
|
|
|
|
|
|
|
_apply_robust_patch(neucodec.model.NeuCodec)
|
|
|
_apply_robust_patch(neucodec.model.DistillNeuCodec)
|
|
|
|
|
|
|
|
|
from neucodec import NeuCodec, DistillNeuCodec
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM
|
|
|
from utils.phonemize_text import phonemize_text, phonemize_with_dict
|
|
|
|
|
|
def _linear_overlap_add(frames: list[np.ndarray], stride: int) -> np.ndarray:
|
|
|
assert len(frames)
|
|
|
dtype = frames[0].dtype
|
|
|
shape = frames[0].shape[:-1]
|
|
|
total_size = max(stride * i + frame.shape[-1] for i, frame in enumerate(frames))
|
|
|
sum_weight = np.zeros(total_size, dtype=dtype)
|
|
|
out = np.zeros(*shape, total_size, dtype=dtype)
|
|
|
offset: int = 0
|
|
|
for frame in frames:
|
|
|
frame_length = frame.shape[-1]
|
|
|
t = np.linspace(0, 1, frame_length + 2, dtype=dtype)[1:-1]
|
|
|
weight = np.abs(0.5 - (t - 0.5))
|
|
|
out[..., offset : offset + frame_length] += weight * frame
|
|
|
sum_weight[offset : offset + frame_length] += weight
|
|
|
offset += stride
|
|
|
return out / sum_weight
|
|
|
|
|
|
class VoiceEngine:
|
|
|
def __init__(self, backbone_repo="ktvoice/Backbone", backbone_device="cpu", codec_repo="ktvoice/Codec", codec_device="cpu"):
|
|
|
self.sample_rate = 24_000
|
|
|
self.max_context = 2048
|
|
|
self._is_quantized_model = False
|
|
|
self.tokenizer = None
|
|
|
self._load_backbone(backbone_repo, backbone_device)
|
|
|
self._load_codec(codec_repo, codec_device)
|
|
|
|
|
|
def _load_backbone(self, repo, device):
|
|
|
print(f"Loading backbone from: {repo} on {device} ...")
|
|
|
if "gguf" in repo.lower():
|
|
|
from llama_cpp import Llama
|
|
|
self.backbone = Llama.from_pretrained(repo_id=repo, filename="*.gguf", n_ctx=self.max_context)
|
|
|
self._is_quantized_model = True
|
|
|
else:
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(repo)
|
|
|
self.backbone = AutoModelForCausalLM.from_pretrained(repo).to(torch.device(device))
|
|
|
|
|
|
def _load_codec(self, repo, device):
|
|
|
print(f"Loading codec from your repo: {repo} ...")
|
|
|
|
|
|
local_dir = snapshot_download(repo_id=repo)
|
|
|
|
|
|
|
|
|
|
|
|
tmp_config = os.path.join(local_dir, "config.json")
|
|
|
if not os.path.exists(tmp_config):
|
|
|
with open(tmp_config, "w") as f: json.dump({"model_type": "neucodec"}, f)
|
|
|
|
|
|
if "distill" in repo.lower():
|
|
|
self.codec = DistillNeuCodec.from_pretrained(local_dir)
|
|
|
else:
|
|
|
self.codec = NeuCodec.from_pretrained(local_dir)
|
|
|
self.codec.eval().to(device)
|
|
|
|
|
|
def encode_reference(self, path):
|
|
|
wav, _ = librosa.load(path, sr=16000, mono=True)
|
|
|
wav_tensor = torch.from_numpy(wav).float().unsqueeze(0).unsqueeze(0)
|
|
|
with torch.no_grad():
|
|
|
return self.codec.encode_code(audio_or_path=wav_tensor).squeeze(0).squeeze(0)
|
|
|
|
|
|
def infer(self, text, ref_codes, ref_text):
|
|
|
prompt_ids = self._apply_chat_template(ref_codes, ref_text, text)
|
|
|
prompt_tensor = torch.tensor(prompt_ids).unsqueeze(0).to(self.backbone.device)
|
|
|
with torch.no_grad():
|
|
|
out = self.backbone.generate(prompt_tensor, max_length=self.max_context, do_sample=True, temperature=1)
|
|
|
|
|
|
tokens = self.tokenizer.decode(out[0, prompt_tensor.shape[-1]:], add_special_tokens=False)
|
|
|
return self._decode(tokens)
|
|
|
|
|
|
def _decode(self, codes_str):
|
|
|
speech_ids = [int(n) for n in re.findall(r"<\|speech_(\d+)\|>", codes_str)]
|
|
|
with torch.no_grad():
|
|
|
codes_tensor = torch.tensor(speech_ids, dtype=torch.long)[None, None, :].to(self.codec.device)
|
|
|
return self.codec.decode_code(codes_tensor).cpu().numpy()[0, 0, :]
|
|
|
|
|
|
def _apply_chat_template(self, ref_codes, ref_text, text):
|
|
|
input_text = phonemize_with_dict(ref_text) + " " + phonemize_with_dict(text)
|
|
|
chat = f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{input_text}<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>"
|
|
|
c_str = "".join([f"<|speech_{i}|>" for i in ref_codes])
|
|
|
return self.tokenizer.encode(chat + c_str) |