|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import os |
|
|
import time |
|
|
from contextlib import contextmanager |
|
|
from typing import Optional, Annotated |
|
|
from unicodedata import normalize |
|
|
import re |
|
|
import uuid |
|
|
import io |
|
|
import wave |
|
|
|
|
|
import numpy as np |
|
|
import onnxruntime as ort |
|
|
import scipy.io.wavfile |
|
|
import gradio as gr |
|
|
|
|
|
from .File_System import ROOT_DIR |
|
|
from app import _log_call_end, _log_call_start, _truncate_for_log |
|
|
from ._docstrings import autodoc |
|
|
|
|
|
try: |
|
|
import torch |
|
|
except Exception: |
|
|
torch = None |
|
|
|
|
|
try: |
|
|
from kokoro import KModel, KPipeline |
|
|
except Exception: |
|
|
KModel = None |
|
|
KPipeline = None |
|
|
|
|
|
try: |
|
|
from huggingface_hub import snapshot_download, list_repo_files |
|
|
except ImportError: |
|
|
snapshot_download = None |
|
|
list_repo_files = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UnicodeProcessor: |
|
|
def __init__(self, unicode_indexer_path: str): |
|
|
with open(unicode_indexer_path, "r") as f: |
|
|
self.indexer = json.load(f) |
|
|
|
|
|
def _preprocess_text(self, text: str) -> str: |
|
|
|
|
|
text = normalize("NFKD", text) |
|
|
return text |
|
|
|
|
|
def _get_text_mask(self, text_ids_lengths: np.ndarray) -> np.ndarray: |
|
|
text_mask = length_to_mask(text_ids_lengths) |
|
|
return text_mask |
|
|
|
|
|
def _text_to_unicode_values(self, text: str) -> np.ndarray: |
|
|
unicode_values = np.array( |
|
|
[ord(char) for char in text], dtype=np.uint16 |
|
|
) |
|
|
return unicode_values |
|
|
|
|
|
def __call__(self, text_list: list[str]) -> tuple[np.ndarray, np.ndarray]: |
|
|
text_list = [self._preprocess_text(t) for t in text_list] |
|
|
text_ids_lengths = np.array([len(text) for text in text_list], dtype=np.int64) |
|
|
text_ids = np.zeros((len(text_list), text_ids_lengths.max()), dtype=np.int64) |
|
|
for i, text in enumerate(text_list): |
|
|
unicode_vals = self._text_to_unicode_values(text) |
|
|
text_ids[i, : len(unicode_vals)] = np.array( |
|
|
[self.indexer[val] for val in unicode_vals], dtype=np.int64 |
|
|
) |
|
|
text_mask = self._get_text_mask(text_ids_lengths) |
|
|
return text_ids, text_mask |
|
|
|
|
|
|
|
|
class Style: |
|
|
def __init__(self, style_ttl_onnx: np.ndarray, style_dp_onnx: np.ndarray): |
|
|
self.ttl = style_ttl_onnx |
|
|
self.dp = style_dp_onnx |
|
|
|
|
|
|
|
|
class TextToSpeech: |
|
|
def __init__( |
|
|
self, |
|
|
cfgs: dict, |
|
|
text_processor: UnicodeProcessor, |
|
|
dp_ort: ort.InferenceSession, |
|
|
text_enc_ort: ort.InferenceSession, |
|
|
vector_est_ort: ort.InferenceSession, |
|
|
vocoder_ort: ort.InferenceSession, |
|
|
): |
|
|
self.cfgs = cfgs |
|
|
self.text_processor = text_processor |
|
|
self.dp_ort = dp_ort |
|
|
self.text_enc_ort = text_enc_ort |
|
|
self.vector_est_ort = vector_est_ort |
|
|
self.vocoder_ort = vocoder_ort |
|
|
self.sample_rate = cfgs["ae"]["sample_rate"] |
|
|
self.base_chunk_size = cfgs["ae"]["base_chunk_size"] |
|
|
self.chunk_compress_factor = cfgs["ttl"]["chunk_compress_factor"] |
|
|
self.ldim = cfgs["ttl"]["latent_dim"] |
|
|
|
|
|
def sample_noisy_latent( |
|
|
self, duration: np.ndarray |
|
|
) -> tuple[np.ndarray, np.ndarray]: |
|
|
bsz = len(duration) |
|
|
wav_len_max = duration.max() * self.sample_rate |
|
|
wav_lengths = (duration * self.sample_rate).astype(np.int64) |
|
|
chunk_size = self.base_chunk_size * self.chunk_compress_factor |
|
|
latent_len = ((wav_len_max + chunk_size - 1) / chunk_size).astype(np.int32) |
|
|
latent_dim = self.ldim * self.chunk_compress_factor |
|
|
noisy_latent = np.random.randn(bsz, latent_dim, latent_len).astype(np.float32) |
|
|
latent_mask = get_latent_mask( |
|
|
wav_lengths, self.base_chunk_size, self.chunk_compress_factor |
|
|
) |
|
|
|
|
|
noisy_latent = noisy_latent * latent_mask |
|
|
return noisy_latent, latent_mask |
|
|
|
|
|
def _infer( |
|
|
self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05 |
|
|
) -> tuple[np.ndarray, np.ndarray]: |
|
|
assert ( |
|
|
len(text_list) == style.ttl.shape[0] |
|
|
), "Number of texts must match number of style vectors" |
|
|
bsz = len(text_list) |
|
|
text_ids, text_mask = self.text_processor(text_list) |
|
|
dur_onnx, *_ = self.dp_ort.run( |
|
|
None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask} |
|
|
) |
|
|
dur_onnx = dur_onnx / speed |
|
|
text_emb_onnx, *_ = self.text_enc_ort.run( |
|
|
None, |
|
|
{"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask}, |
|
|
) |
|
|
xt, latent_mask = self.sample_noisy_latent(dur_onnx) |
|
|
total_step_np = np.array([total_step] * bsz, dtype=np.float32) |
|
|
for step in range(total_step): |
|
|
current_step = np.array([step] * bsz, dtype=np.float32) |
|
|
xt, *_ = self.vector_est_ort.run( |
|
|
None, |
|
|
{ |
|
|
"noisy_latent": xt, |
|
|
"text_emb": text_emb_onnx, |
|
|
"style_ttl": style.ttl, |
|
|
"text_mask": text_mask, |
|
|
"latent_mask": latent_mask, |
|
|
"current_step": current_step, |
|
|
"total_step": total_step_np, |
|
|
}, |
|
|
) |
|
|
wav, *_ = self.vocoder_ort.run(None, {"latent": xt}) |
|
|
return wav, dur_onnx |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
text: str, |
|
|
style: Style, |
|
|
total_step: int, |
|
|
speed: float = 1.05, |
|
|
silence_duration: float = 0.3, |
|
|
max_len: int = 300, |
|
|
) -> tuple[np.ndarray, np.ndarray]: |
|
|
assert ( |
|
|
style.ttl.shape[0] == 1 |
|
|
), "Single speaker text to speech only supports single style" |
|
|
text_list = chunk_text(text, max_len=max_len) |
|
|
wav_cat = None |
|
|
dur_cat = None |
|
|
for text in text_list: |
|
|
wav, dur_onnx = self._infer([text], style, total_step, speed) |
|
|
if wav_cat is None: |
|
|
wav_cat = wav |
|
|
dur_cat = dur_onnx |
|
|
else: |
|
|
silence = np.zeros( |
|
|
(1, int(silence_duration * self.sample_rate)), dtype=np.float32 |
|
|
) |
|
|
wav_cat = np.concatenate([wav_cat, silence, wav], axis=1) |
|
|
dur_cat += dur_onnx + silence_duration |
|
|
return wav_cat, dur_cat |
|
|
|
|
|
def stream( |
|
|
self, |
|
|
text: str, |
|
|
style: Style, |
|
|
total_step: int, |
|
|
speed: float = 1.05, |
|
|
silence_duration: float = 0.3, |
|
|
max_len: int = 300, |
|
|
): |
|
|
assert ( |
|
|
style.ttl.shape[0] == 1 |
|
|
), "Single speaker text to speech only supports single style" |
|
|
text_list = chunk_text(text, max_len=max_len) |
|
|
|
|
|
for i, text in enumerate(text_list): |
|
|
wav, _ = self._infer([text], style, total_step, speed) |
|
|
yield wav.flatten() |
|
|
|
|
|
if i < len(text_list) - 1: |
|
|
silence = np.zeros( |
|
|
(int(silence_duration * self.sample_rate),), dtype=np.float32 |
|
|
) |
|
|
yield silence |
|
|
|
|
|
def batch( |
|
|
self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05 |
|
|
) -> tuple[np.ndarray, np.ndarray]: |
|
|
return self._infer(text_list, style, total_step, speed) |
|
|
|
|
|
|
|
|
def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray: |
|
|
""" |
|
|
Convert lengths to binary mask. |
|
|
|
|
|
Args: |
|
|
lengths: (B,) |
|
|
max_len: int |
|
|
|
|
|
Returns: |
|
|
mask: (B, 1, max_len) |
|
|
""" |
|
|
max_len = max_len or lengths.max() |
|
|
ids = np.arange(0, max_len) |
|
|
mask = (ids < np.expand_dims(lengths, axis=1)).astype(np.float32) |
|
|
return mask.reshape(-1, 1, max_len) |
|
|
|
|
|
|
|
|
def get_latent_mask( |
|
|
wav_lengths: np.ndarray, base_chunk_size: int, chunk_compress_factor: int |
|
|
) -> np.ndarray: |
|
|
latent_size = base_chunk_size * chunk_compress_factor |
|
|
latent_lengths = (wav_lengths + latent_size - 1) // latent_size |
|
|
latent_mask = length_to_mask(latent_lengths) |
|
|
return latent_mask |
|
|
|
|
|
|
|
|
def load_onnx( |
|
|
onnx_path: str, opts: ort.SessionOptions, providers: list[str] |
|
|
) -> ort.InferenceSession: |
|
|
return ort.InferenceSession(onnx_path, sess_options=opts, providers=providers) |
|
|
|
|
|
|
|
|
def load_onnx_all( |
|
|
onnx_dir: str, opts: ort.SessionOptions, providers: list[str] |
|
|
) -> tuple[ |
|
|
ort.InferenceSession, |
|
|
ort.InferenceSession, |
|
|
ort.InferenceSession, |
|
|
ort.InferenceSession, |
|
|
]: |
|
|
dp_onnx_path = os.path.join(onnx_dir, "duration_predictor.onnx") |
|
|
text_enc_onnx_path = os.path.join(onnx_dir, "text_encoder.onnx") |
|
|
vector_est_onnx_path = os.path.join(onnx_dir, "vector_estimator.onnx") |
|
|
vocoder_onnx_path = os.path.join(onnx_dir, "vocoder.onnx") |
|
|
|
|
|
dp_ort = load_onnx(dp_onnx_path, opts, providers) |
|
|
text_enc_ort = load_onnx(text_enc_onnx_path, opts, providers) |
|
|
vector_est_ort = load_onnx(vector_est_onnx_path, opts, providers) |
|
|
vocoder_ort = load_onnx(vocoder_onnx_path, opts, providers) |
|
|
return dp_ort, text_enc_ort, vector_est_ort, vocoder_ort |
|
|
|
|
|
|
|
|
def load_cfgs(onnx_dir: str) -> dict: |
|
|
cfg_path = os.path.join(onnx_dir, "tts.json") |
|
|
with open(cfg_path, "r") as f: |
|
|
cfgs = json.load(f) |
|
|
return cfgs |
|
|
|
|
|
|
|
|
def load_text_processor(onnx_dir: str) -> UnicodeProcessor: |
|
|
unicode_indexer_path = os.path.join(onnx_dir, "unicode_indexer.json") |
|
|
text_processor = UnicodeProcessor(unicode_indexer_path) |
|
|
return text_processor |
|
|
|
|
|
|
|
|
def load_text_to_speech(onnx_dir: str, use_gpu: bool = False) -> TextToSpeech: |
|
|
opts = ort.SessionOptions() |
|
|
if use_gpu: |
|
|
raise NotImplementedError("GPU mode is not fully tested") |
|
|
else: |
|
|
providers = ["CPUExecutionProvider"] |
|
|
print("Using CPU for inference") |
|
|
cfgs = load_cfgs(onnx_dir) |
|
|
dp_ort, text_enc_ort, vector_est_ort, vocoder_ort = load_onnx_all( |
|
|
onnx_dir, opts, providers |
|
|
) |
|
|
text_processor = load_text_processor(onnx_dir) |
|
|
return TextToSpeech( |
|
|
cfgs, text_processor, dp_ort, text_enc_ort, vector_est_ort, vocoder_ort |
|
|
) |
|
|
|
|
|
|
|
|
def load_voice_style(voice_style_paths: list[str], verbose: bool = False) -> Style: |
|
|
bsz = len(voice_style_paths) |
|
|
|
|
|
|
|
|
with open(voice_style_paths[0], "r") as f: |
|
|
first_style = json.load(f) |
|
|
ttl_dims = first_style["style_ttl"]["dims"] |
|
|
dp_dims = first_style["style_dp"]["dims"] |
|
|
|
|
|
|
|
|
ttl_style = np.zeros([bsz, ttl_dims[1], ttl_dims[2]], dtype=np.float32) |
|
|
dp_style = np.zeros([bsz, dp_dims[1], dp_dims[2]], dtype=np.float32) |
|
|
|
|
|
|
|
|
for i, voice_style_path in enumerate(voice_style_paths): |
|
|
with open(voice_style_path, "r") as f: |
|
|
voice_style = json.load(f) |
|
|
|
|
|
ttl_data = np.array( |
|
|
voice_style["style_ttl"]["data"], dtype=np.float32 |
|
|
).flatten() |
|
|
ttl_style[i] = ttl_data.reshape(ttl_dims[1], ttl_dims[2]) |
|
|
|
|
|
dp_data = np.array( |
|
|
voice_style["style_dp"]["data"], dtype=np.float32 |
|
|
).flatten() |
|
|
dp_style[i] = dp_data.reshape(dp_dims[1], dp_dims[2]) |
|
|
|
|
|
if verbose: |
|
|
print(f"Loaded {bsz} voice styles") |
|
|
return Style(ttl_style, dp_style) |
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def timer(name: str): |
|
|
start = time.time() |
|
|
print(f"{name}...") |
|
|
yield |
|
|
print(f" -> {name} completed in {time.time() - start:.2f} sec") |
|
|
|
|
|
|
|
|
def sanitize_filename(text: str, max_len: int) -> str: |
|
|
"""Sanitize filename by replacing non-alphanumeric characters with underscores""" |
|
|
prefix = text[:max_len] |
|
|
return re.sub(r"[^a-zA-Z0-9]", "_", prefix) |
|
|
|
|
|
|
|
|
def chunk_text(text: str, max_len: int = 300) -> list[str]: |
|
|
""" |
|
|
Split text into chunks by paragraphs and sentences. |
|
|
|
|
|
Args: |
|
|
text: Input text to chunk |
|
|
max_len: Maximum length of each chunk (default: 300) |
|
|
|
|
|
Returns: |
|
|
List of text chunks |
|
|
""" |
|
|
|
|
|
paragraphs = [p.strip() for p in re.split(r"\n\s*\n+", text.strip()) if p.strip()] |
|
|
|
|
|
chunks = [] |
|
|
|
|
|
for paragraph in paragraphs: |
|
|
paragraph = paragraph.strip() |
|
|
if not paragraph: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
pattern = r"(?<!Mr\.)(?<!Mrs\.)(?<!Ms\.)(?<!Dr\.)(?<!Prof\.)(?<!Sr\.)(?<!Jr\.)(?<!Ph\.D\.)(?<!etc\.)(?<!e\.g\.)(?<!i\.e\.)(?<!vs\.)(?<!Inc\.)(?<!Ltd\.)(?<!Co\.)(?<!Corp\.)(?<!St\.)(?<!Ave\.)(?<!Blvd\.)(?<!\b[A-Z]\.)(?<=[.!?])\s+" |
|
|
sentences = re.split(pattern, paragraph) |
|
|
|
|
|
current_chunk = "" |
|
|
|
|
|
for sentence in sentences: |
|
|
if len(current_chunk) + len(sentence) + 1 <= max_len: |
|
|
current_chunk += (" " if current_chunk else "") + sentence |
|
|
else: |
|
|
if current_chunk: |
|
|
chunks.append(current_chunk.strip()) |
|
|
current_chunk = sentence |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(current_chunk.strip()) |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_KOKORO_STATE = { |
|
|
"initialized": False, |
|
|
"device": "cpu", |
|
|
"model": None, |
|
|
"pipelines": {}, |
|
|
} |
|
|
|
|
|
|
|
|
_SUPERTONIC_STATE = { |
|
|
"initialized": False, |
|
|
"tts": None, |
|
|
"assets_dir": None, |
|
|
} |
|
|
|
|
|
def _audio_np_to_int16(audio_np: np.ndarray) -> np.ndarray: |
|
|
audio_clipped = np.clip(audio_np, -1.0, 1.0) |
|
|
return (audio_clipped * 32767.0).astype(np.int16) |
|
|
|
|
|
|
|
|
|
|
|
def get_kokoro_voices() -> list[str]: |
|
|
try: |
|
|
if list_repo_files: |
|
|
files = list_repo_files("hexgrad/Kokoro-82M") |
|
|
voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")] |
|
|
voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files] |
|
|
return sorted(voices) if voices else _get_fallback_voices() |
|
|
return _get_fallback_voices() |
|
|
except Exception: |
|
|
return _get_fallback_voices() |
|
|
|
|
|
|
|
|
def _get_fallback_voices() -> list[str]: |
|
|
return [ |
|
|
"af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", |
|
|
"am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", "am_michael", "am_onyx", "am_puck", "am_santa", |
|
|
"bf_alice", "bf_emma", "bf_isabella", "bf_lily", |
|
|
"bm_daniel", "bm_fable", "bm_george", "bm_lewis", |
|
|
"ef_dora", "em_alex", "em_santa", |
|
|
"ff_siwis", |
|
|
"hf_alpha", "hf_beta", "hm_omega", "hm_psi", |
|
|
"if_sara", "im_nicola", |
|
|
"jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", |
|
|
"pf_dora", "pm_alex", "pm_santa", |
|
|
"zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", |
|
|
"zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang", |
|
|
] |
|
|
|
|
|
|
|
|
def _init_kokoro() -> None: |
|
|
if _KOKORO_STATE["initialized"]: |
|
|
return |
|
|
if KModel is None or KPipeline is None: |
|
|
raise RuntimeError("Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4).") |
|
|
device = "cpu" |
|
|
if torch is not None: |
|
|
try: |
|
|
if torch.cuda.is_available(): |
|
|
device = "cuda" |
|
|
except Exception: |
|
|
device = "cpu" |
|
|
model = KModel(repo_id="hexgrad/Kokoro-82M").to(device).eval() |
|
|
pipelines = {"a": KPipeline(lang_code="a", model=False, repo_id="hexgrad/Kokoro-82M")} |
|
|
try: |
|
|
pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" |
|
|
except Exception: |
|
|
pass |
|
|
_KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines}) |
|
|
|
|
|
|
|
|
|
|
|
def _init_supertonic() -> None: |
|
|
if _SUPERTONIC_STATE["initialized"]: |
|
|
return |
|
|
|
|
|
if snapshot_download is None: |
|
|
raise RuntimeError("huggingface_hub is not installed.") |
|
|
|
|
|
|
|
|
|
|
|
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
assets_dir = os.path.join(base_dir, "assets", "supertonic") |
|
|
|
|
|
if not os.path.exists(assets_dir): |
|
|
print(f"Downloading Supertonic models to {assets_dir}...") |
|
|
snapshot_download(repo_id="Supertone/supertonic", local_dir=assets_dir) |
|
|
|
|
|
onnx_dir = os.path.join(assets_dir, "onnx") |
|
|
tts = load_text_to_speech(onnx_dir, use_gpu=False) |
|
|
|
|
|
_SUPERTONIC_STATE.update({"initialized": True, "tts": tts, "assets_dir": assets_dir}) |
|
|
|
|
|
|
|
|
def get_supertonic_voices() -> list[str]: |
|
|
|
|
|
if not _SUPERTONIC_STATE["initialized"]: |
|
|
|
|
|
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
assets_dir = os.path.join(base_dir, "assets", "supertonic") |
|
|
if not os.path.exists(assets_dir): |
|
|
|
|
|
return ["F1", "F2", "M1", "M2"] |
|
|
else: |
|
|
assets_dir = _SUPERTONIC_STATE["assets_dir"] |
|
|
|
|
|
voice_styles_dir = os.path.join(assets_dir, "voice_styles") |
|
|
if not os.path.exists(voice_styles_dir): |
|
|
return ["F1", "F2", "M1", "M2"] |
|
|
|
|
|
files = os.listdir(voice_styles_dir) |
|
|
voices = [f.replace('.json', '') for f in files if f.endswith('.json')] |
|
|
return sorted(voices) |
|
|
|
|
|
|
|
|
def List_Kokoro_Voices() -> list[str]: |
|
|
return get_kokoro_voices() |
|
|
|
|
|
def List_Supertonic_Voices() -> list[str]: |
|
|
return get_supertonic_voices() |
|
|
|
|
|
|
|
|
|
|
|
TOOL_SUMMARY = ( |
|
|
"Synthesize speech from text using Supertonic-66M (default) or Kokoro-82M. " |
|
|
"Supertonic: faster, supports steps/silence/chunking. " |
|
|
"Kokoro: slower, supports many languages/accents. " |
|
|
"Return the generated media to the user in this format ``." |
|
|
) |
|
|
|
|
|
|
|
|
@autodoc( |
|
|
summary=TOOL_SUMMARY, |
|
|
) |
|
|
def Generate_Speech( |
|
|
text: Annotated[str, "The text to synthesize (English)."], |
|
|
model: Annotated[str, "The TTS model to use: 'Supertonic' or 'Kokoro'."] = "Supertonic", |
|
|
speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.3, |
|
|
steps: Annotated[int, "Supertonic only. Diffusion steps (1-50). Higher = better quality but slower."] = 5, |
|
|
voice: Annotated[str, "Voice identifier. Default 'F1' for Supertonic, 'af_heart' for Kokoro."] = "F1", |
|
|
silence_duration: Annotated[float, "Supertonic only. Silence duration between chunks (0.0-2.0s)."] = 0.3, |
|
|
max_chunk_size: Annotated[int, "Supertonic only. Max text chunk length (50-1000)."] = 300, |
|
|
) -> str: |
|
|
_log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), model=model, speed=speed, voice=voice) |
|
|
|
|
|
if not text or not text.strip(): |
|
|
try: |
|
|
_log_call_end("Generate_Speech", "error=empty text") |
|
|
finally: |
|
|
pass |
|
|
raise gr.Error("Please provide non-empty text to synthesize.") |
|
|
|
|
|
model_lower = model.lower() |
|
|
|
|
|
|
|
|
if model_lower == "kokoro" and voice == "F1": |
|
|
voice = "af_heart" |
|
|
elif model_lower == "supertonic" and voice == "af_heart": |
|
|
voice = "F1" |
|
|
|
|
|
try: |
|
|
if model_lower == "kokoro": |
|
|
return _generate_kokoro(text, speed, voice) |
|
|
else: |
|
|
|
|
|
return _generate_supertonic(text, speed, voice, steps, silence_duration, max_chunk_size) |
|
|
|
|
|
except gr.Error as exc: |
|
|
_log_call_end("Generate_Speech", f"gr_error={str(exc)}") |
|
|
raise |
|
|
except Exception as exc: |
|
|
_log_call_end("Generate_Speech", f"error={str(exc)[:120]}") |
|
|
raise gr.Error(f"Error during speech generation: {exc}") |
|
|
|
|
|
|
|
|
def _generate_kokoro(text: str, speed: float, voice: str) -> str: |
|
|
_init_kokoro() |
|
|
model = _KOKORO_STATE["model"] |
|
|
pipelines = _KOKORO_STATE["pipelines"] |
|
|
pipeline = pipelines.get("a") |
|
|
if pipeline is None: |
|
|
raise gr.Error("Kokoro English pipeline not initialized.") |
|
|
|
|
|
audio_segments = [] |
|
|
pack = pipeline.load_voice(voice) |
|
|
|
|
|
segments = list(pipeline(text, voice, speed)) |
|
|
total_segments = len(segments) |
|
|
for segment_idx, (text_chunk, ps, _) in enumerate(segments): |
|
|
ref_s = pack[len(ps) - 1] |
|
|
try: |
|
|
audio = model(ps, ref_s, float(speed)) |
|
|
audio_segments.append(audio.detach().cpu().numpy()) |
|
|
if total_segments > 10 and (segment_idx + 1) % 5 == 0: |
|
|
print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") |
|
|
except Exception as exc: |
|
|
raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}") |
|
|
|
|
|
if not audio_segments: |
|
|
raise gr.Error("No audio was generated (empty synthesis result).") |
|
|
|
|
|
if len(audio_segments) == 1: |
|
|
final_audio = audio_segments[0] |
|
|
else: |
|
|
final_audio = np.concatenate(audio_segments, axis=0) |
|
|
if total_segments > 1: |
|
|
duration = len(final_audio) / 24_000 |
|
|
print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") |
|
|
|
|
|
|
|
|
filename = f"speech_kokoro_{uuid.uuid4().hex[:8]}.wav" |
|
|
output_path = os.path.join(ROOT_DIR, filename) |
|
|
|
|
|
|
|
|
audio_int16 = (final_audio * 32767).astype(np.int16) |
|
|
scipy.io.wavfile.write(output_path, 24000, audio_int16) |
|
|
|
|
|
_log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}") |
|
|
return output_path |
|
|
|
|
|
|
|
|
def _generate_supertonic(text: str, speed: float, voice: str, steps: int, silence_duration: float, max_chunk_size: int) -> str: |
|
|
_init_supertonic() |
|
|
tts = _SUPERTONIC_STATE["tts"] |
|
|
assets_dir = _SUPERTONIC_STATE["assets_dir"] |
|
|
|
|
|
voice_path = os.path.join(assets_dir, "voice_styles", f"{voice}.json") |
|
|
if not os.path.exists(voice_path): |
|
|
|
|
|
|
|
|
if not os.path.exists(voice_path): |
|
|
raise gr.Error(f"Voice style {voice} not found for Supertonic.") |
|
|
|
|
|
style = load_voice_style([voice_path]) |
|
|
|
|
|
sr = tts.sample_rate |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wav_cat, _ = tts(text, style, steps, speed, silence_duration, max_chunk_size) |
|
|
|
|
|
if wav_cat is None or wav_cat.size == 0: |
|
|
raise gr.Error("No audio generated.") |
|
|
|
|
|
|
|
|
final_audio = wav_cat.flatten() |
|
|
|
|
|
|
|
|
filename = f"speech_supertonic_{uuid.uuid4().hex[:8]}.wav" |
|
|
output_path = os.path.join(ROOT_DIR, filename) |
|
|
|
|
|
audio_int16 = _audio_np_to_int16(final_audio) |
|
|
scipy.io.wavfile.write(output_path, sr, audio_int16) |
|
|
|
|
|
_log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/sr:.2f}") |
|
|
return output_path |
|
|
|
|
|
|
|
|
def build_interface() -> gr.Interface: |
|
|
kokoro_voices = get_kokoro_voices() |
|
|
supertonic_voices = get_supertonic_voices() |
|
|
all_voices = sorted(list(set(kokoro_voices + supertonic_voices))) |
|
|
|
|
|
return gr.Interface( |
|
|
fn=Generate_Speech, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4, info="The text to synthesize (English)"), |
|
|
gr.Dropdown(label="Model", choices=["Supertonic", "Kokoro"], value="Supertonic", info="The TTS model to use"), |
|
|
gr.Slider(minimum=0.5, maximum=2.0, value=1.3, step=0.1, label="Speed", info="Speech speed multiplier (1.0 = normal)"), |
|
|
gr.Slider(minimum=1, maximum=50, value=5, step=1, label="Steps", info="Supertonic only: Diffusion steps (1-50)"), |
|
|
gr.Dropdown( |
|
|
label="Voice", |
|
|
choices=all_voices, |
|
|
value="F1", |
|
|
info="Select voice (F1/F2/M1/M2 for Supertonic, others for Kokoro)", |
|
|
), |
|
|
gr.Slider(minimum=0.0, maximum=2.0, value=0.3, step=0.1, label="Silence Duration", info="Supertonic only: Silence duration between chunks"), |
|
|
gr.Slider(minimum=50, maximum=1000, value=300, step=10, label="Max Chunk Size", info="Supertonic only: Max text chunk length"), |
|
|
], |
|
|
outputs=gr.Audio(label="Audio", type="filepath", format="wav"), |
|
|
title="Generate Speech", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Generate speech with Supertonic-66M or Kokoro-82M. Runs on CPU.</div>" |
|
|
), |
|
|
api_description=TOOL_SUMMARY, |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = ["Generate_Speech", "List_Kokoro_Voices", "List_Supertonic_Voices", "build_interface"] |
|
|
|