| from __future__ import annotations |
|
|
| import json |
| import os |
| import time |
| from contextlib import contextmanager |
| from typing import Optional, Annotated |
| from unicodedata import normalize |
| import re |
| import uuid |
| import io |
| import wave |
|
|
| import numpy as np |
| import onnxruntime as ort |
| import scipy.io.wavfile |
| import gradio as gr |
|
|
| from .File_System import ROOT_DIR |
| from app import _log_call_end, _log_call_start, _truncate_for_log |
| from ._docstrings import autodoc |
|
|
| try: |
| import torch |
| except Exception: |
| torch = None |
|
|
| try: |
| from kokoro import KModel, KPipeline |
| except Exception: |
| KModel = None |
| KPipeline = None |
|
|
| try: |
| from huggingface_hub import snapshot_download, list_repo_files |
| except ImportError: |
| snapshot_download = None |
| list_repo_files = None |
|
|
|
|
| |
|
|
| class UnicodeProcessor: |
| def __init__(self, unicode_indexer_path: str): |
| with open(unicode_indexer_path, "r") as f: |
| self.indexer = json.load(f) |
|
|
| def _preprocess_text(self, text: str) -> str: |
| |
| text = normalize("NFKD", text) |
| return text |
|
|
| def _get_text_mask(self, text_ids_lengths: np.ndarray) -> np.ndarray: |
| text_mask = length_to_mask(text_ids_lengths) |
| return text_mask |
|
|
| def _text_to_unicode_values(self, text: str) -> np.ndarray: |
| unicode_values = np.array( |
| [ord(char) for char in text], dtype=np.uint16 |
| ) |
| return unicode_values |
|
|
| def __call__(self, text_list: list[str]) -> tuple[np.ndarray, np.ndarray]: |
| text_list = [self._preprocess_text(t) for t in text_list] |
| text_ids_lengths = np.array([len(text) for text in text_list], dtype=np.int64) |
| text_ids = np.zeros((len(text_list), text_ids_lengths.max()), dtype=np.int64) |
| for i, text in enumerate(text_list): |
| unicode_vals = self._text_to_unicode_values(text) |
| text_ids[i, : len(unicode_vals)] = np.array( |
| [self.indexer[val] for val in unicode_vals], dtype=np.int64 |
| ) |
| text_mask = self._get_text_mask(text_ids_lengths) |
| return text_ids, text_mask |
|
|
|
|
| class Style: |
| def __init__(self, style_ttl_onnx: np.ndarray, style_dp_onnx: np.ndarray): |
| self.ttl = style_ttl_onnx |
| self.dp = style_dp_onnx |
|
|
|
|
| class TextToSpeech: |
| def __init__( |
| self, |
| cfgs: dict, |
| text_processor: UnicodeProcessor, |
| dp_ort: ort.InferenceSession, |
| text_enc_ort: ort.InferenceSession, |
| vector_est_ort: ort.InferenceSession, |
| vocoder_ort: ort.InferenceSession, |
| ): |
| self.cfgs = cfgs |
| self.text_processor = text_processor |
| self.dp_ort = dp_ort |
| self.text_enc_ort = text_enc_ort |
| self.vector_est_ort = vector_est_ort |
| self.vocoder_ort = vocoder_ort |
| self.sample_rate = cfgs["ae"]["sample_rate"] |
| self.base_chunk_size = cfgs["ae"]["base_chunk_size"] |
| self.chunk_compress_factor = cfgs["ttl"]["chunk_compress_factor"] |
| self.ldim = cfgs["ttl"]["latent_dim"] |
|
|
| def sample_noisy_latent( |
| self, duration: np.ndarray |
| ) -> tuple[np.ndarray, np.ndarray]: |
| bsz = len(duration) |
| wav_len_max = duration.max() * self.sample_rate |
| wav_lengths = (duration * self.sample_rate).astype(np.int64) |
| chunk_size = self.base_chunk_size * self.chunk_compress_factor |
| latent_len = ((wav_len_max + chunk_size - 1) / chunk_size).astype(np.int32) |
| latent_dim = self.ldim * self.chunk_compress_factor |
| noisy_latent = np.random.randn(bsz, latent_dim, latent_len).astype(np.float32) |
| latent_mask = get_latent_mask( |
| wav_lengths, self.base_chunk_size, self.chunk_compress_factor |
| ) |
|
|
| noisy_latent = noisy_latent * latent_mask |
| return noisy_latent, latent_mask |
|
|
| def _infer( |
| self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05 |
| ) -> tuple[np.ndarray, np.ndarray]: |
| assert ( |
| len(text_list) == style.ttl.shape[0] |
| ), "Number of texts must match number of style vectors" |
| bsz = len(text_list) |
| text_ids, text_mask = self.text_processor(text_list) |
| dur_onnx, *_ = self.dp_ort.run( |
| None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask} |
| ) |
| dur_onnx = dur_onnx / speed |
| text_emb_onnx, *_ = self.text_enc_ort.run( |
| None, |
| {"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask}, |
| ) |
| xt, latent_mask = self.sample_noisy_latent(dur_onnx) |
| total_step_np = np.array([total_step] * bsz, dtype=np.float32) |
| for step in range(total_step): |
| current_step = np.array([step] * bsz, dtype=np.float32) |
| xt, *_ = self.vector_est_ort.run( |
| None, |
| { |
| "noisy_latent": xt, |
| "text_emb": text_emb_onnx, |
| "style_ttl": style.ttl, |
| "text_mask": text_mask, |
| "latent_mask": latent_mask, |
| "current_step": current_step, |
| "total_step": total_step_np, |
| }, |
| ) |
| wav, *_ = self.vocoder_ort.run(None, {"latent": xt}) |
| return wav, dur_onnx |
|
|
| def __call__( |
| self, |
| text: str, |
| style: Style, |
| total_step: int, |
| speed: float = 1.05, |
| silence_duration: float = 0.3, |
| max_len: int = 300, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| assert ( |
| style.ttl.shape[0] == 1 |
| ), "Single speaker text to speech only supports single style" |
| text_list = chunk_text(text, max_len=max_len) |
| wav_cat = None |
| dur_cat = None |
| for text in text_list: |
| wav, dur_onnx = self._infer([text], style, total_step, speed) |
| if wav_cat is None: |
| wav_cat = wav |
| dur_cat = dur_onnx |
| else: |
| silence = np.zeros( |
| (1, int(silence_duration * self.sample_rate)), dtype=np.float32 |
| ) |
| wav_cat = np.concatenate([wav_cat, silence, wav], axis=1) |
| dur_cat += dur_onnx + silence_duration |
| return wav_cat, dur_cat |
|
|
| def stream( |
| self, |
| text: str, |
| style: Style, |
| total_step: int, |
| speed: float = 1.05, |
| silence_duration: float = 0.3, |
| max_len: int = 300, |
| ): |
| assert ( |
| style.ttl.shape[0] == 1 |
| ), "Single speaker text to speech only supports single style" |
| text_list = chunk_text(text, max_len=max_len) |
|
|
| for i, text in enumerate(text_list): |
| wav, _ = self._infer([text], style, total_step, speed) |
| yield wav.flatten() |
|
|
| if i < len(text_list) - 1: |
| silence = np.zeros( |
| (int(silence_duration * self.sample_rate),), dtype=np.float32 |
| ) |
| yield silence |
|
|
| def batch( |
| self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05 |
| ) -> tuple[np.ndarray, np.ndarray]: |
| return self._infer(text_list, style, total_step, speed) |
|
|
|
|
| def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray: |
| """ |
| Convert lengths to binary mask. |
| |
| Args: |
| lengths: (B,) |
| max_len: int |
| |
| Returns: |
| mask: (B, 1, max_len) |
| """ |
| max_len = max_len or lengths.max() |
| ids = np.arange(0, max_len) |
| mask = (ids < np.expand_dims(lengths, axis=1)).astype(np.float32) |
| return mask.reshape(-1, 1, max_len) |
|
|
|
|
| def get_latent_mask( |
| wav_lengths: np.ndarray, base_chunk_size: int, chunk_compress_factor: int |
| ) -> np.ndarray: |
| latent_size = base_chunk_size * chunk_compress_factor |
| latent_lengths = (wav_lengths + latent_size - 1) // latent_size |
| latent_mask = length_to_mask(latent_lengths) |
| return latent_mask |
|
|
|
|
| def load_onnx( |
| onnx_path: str, opts: ort.SessionOptions, providers: list[str] |
| ) -> ort.InferenceSession: |
| return ort.InferenceSession(onnx_path, sess_options=opts, providers=providers) |
|
|
|
|
| def load_onnx_all( |
| onnx_dir: str, opts: ort.SessionOptions, providers: list[str] |
| ) -> tuple[ |
| ort.InferenceSession, |
| ort.InferenceSession, |
| ort.InferenceSession, |
| ort.InferenceSession, |
| ]: |
| dp_onnx_path = os.path.join(onnx_dir, "duration_predictor.onnx") |
| text_enc_onnx_path = os.path.join(onnx_dir, "text_encoder.onnx") |
| vector_est_onnx_path = os.path.join(onnx_dir, "vector_estimator.onnx") |
| vocoder_onnx_path = os.path.join(onnx_dir, "vocoder.onnx") |
|
|
| dp_ort = load_onnx(dp_onnx_path, opts, providers) |
| text_enc_ort = load_onnx(text_enc_onnx_path, opts, providers) |
| vector_est_ort = load_onnx(vector_est_onnx_path, opts, providers) |
| vocoder_ort = load_onnx(vocoder_onnx_path, opts, providers) |
| return dp_ort, text_enc_ort, vector_est_ort, vocoder_ort |
|
|
|
|
| def load_cfgs(onnx_dir: str) -> dict: |
| cfg_path = os.path.join(onnx_dir, "tts.json") |
| with open(cfg_path, "r") as f: |
| cfgs = json.load(f) |
| return cfgs |
|
|
|
|
| def load_text_processor(onnx_dir: str) -> UnicodeProcessor: |
| unicode_indexer_path = os.path.join(onnx_dir, "unicode_indexer.json") |
| text_processor = UnicodeProcessor(unicode_indexer_path) |
| return text_processor |
|
|
|
|
| def load_text_to_speech(onnx_dir: str, use_gpu: bool = False) -> TextToSpeech: |
| opts = ort.SessionOptions() |
| if use_gpu: |
| raise NotImplementedError("GPU mode is not fully tested") |
| else: |
| providers = ["CPUExecutionProvider"] |
| print("Using CPU for inference") |
| cfgs = load_cfgs(onnx_dir) |
| dp_ort, text_enc_ort, vector_est_ort, vocoder_ort = load_onnx_all( |
| onnx_dir, opts, providers |
| ) |
| text_processor = load_text_processor(onnx_dir) |
| return TextToSpeech( |
| cfgs, text_processor, dp_ort, text_enc_ort, vector_est_ort, vocoder_ort |
| ) |
|
|
|
|
| def load_voice_style(voice_style_paths: list[str], verbose: bool = False) -> Style: |
| bsz = len(voice_style_paths) |
|
|
| |
| with open(voice_style_paths[0], "r") as f: |
| first_style = json.load(f) |
| ttl_dims = first_style["style_ttl"]["dims"] |
| dp_dims = first_style["style_dp"]["dims"] |
|
|
| |
| ttl_style = np.zeros([bsz, ttl_dims[1], ttl_dims[2]], dtype=np.float32) |
| dp_style = np.zeros([bsz, dp_dims[1], dp_dims[2]], dtype=np.float32) |
|
|
| |
| for i, voice_style_path in enumerate(voice_style_paths): |
| with open(voice_style_path, "r") as f: |
| voice_style = json.load(f) |
|
|
| ttl_data = np.array( |
| voice_style["style_ttl"]["data"], dtype=np.float32 |
| ).flatten() |
| ttl_style[i] = ttl_data.reshape(ttl_dims[1], ttl_dims[2]) |
|
|
| dp_data = np.array( |
| voice_style["style_dp"]["data"], dtype=np.float32 |
| ).flatten() |
| dp_style[i] = dp_data.reshape(dp_dims[1], dp_dims[2]) |
|
|
| if verbose: |
| print(f"Loaded {bsz} voice styles") |
| return Style(ttl_style, dp_style) |
|
|
|
|
| @contextmanager |
| def timer(name: str): |
| start = time.time() |
| print(f"{name}...") |
| yield |
| print(f" -> {name} completed in {time.time() - start:.2f} sec") |
|
|
|
|
| def sanitize_filename(text: str, max_len: int) -> str: |
| """Sanitize filename by replacing non-alphanumeric characters with underscores""" |
| prefix = text[:max_len] |
| return re.sub(r"[^a-zA-Z0-9]", "_", prefix) |
|
|
|
|
| def chunk_text(text: str, max_len: int = 300) -> list[str]: |
| """ |
| Split text into chunks by paragraphs and sentences. |
| |
| Args: |
| text: Input text to chunk |
| max_len: Maximum length of each chunk (default: 300) |
| |
| Returns: |
| List of text chunks |
| """ |
| |
| paragraphs = [p.strip() for p in re.split(r"\n\s*\n+", text.strip()) if p.strip()] |
|
|
| chunks = [] |
|
|
| for paragraph in paragraphs: |
| paragraph = paragraph.strip() |
| if not paragraph: |
| continue |
|
|
| |
| |
| pattern = r"(?<!Mr\.)(?<!Mrs\.)(?<!Ms\.)(?<!Dr\.)(?<!Prof\.)(?<!Sr\.)(?<!Jr\.)(?<!Ph\.D\.)(?<!etc\.)(?<!e\.g\.)(?<!i\.e\.)(?<!vs\.)(?<!Inc\.)(?<!Ltd\.)(?<!Co\.)(?<!Corp\.)(?<!St\.)(?<!Ave\.)(?<!Blvd\.)(?<!\b[A-Z]\.)(?<=[.!?])\s+" |
| sentences = re.split(pattern, paragraph) |
|
|
| current_chunk = "" |
|
|
| for sentence in sentences: |
| if len(current_chunk) + len(sentence) + 1 <= max_len: |
| current_chunk += (" " if current_chunk else "") + sentence |
| else: |
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
| current_chunk = sentence |
|
|
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
|
|
| return chunks |
|
|
|
|
| |
|
|
| |
| _KOKORO_STATE = { |
| "initialized": False, |
| "device": "cpu", |
| "model": None, |
| "pipelines": {}, |
| } |
|
|
| |
| _SUPERTONIC_STATE = { |
| "initialized": False, |
| "tts": None, |
| "assets_dir": None, |
| } |
|
|
| def _audio_np_to_int16(audio_np: np.ndarray) -> np.ndarray: |
| audio_clipped = np.clip(audio_np, -1.0, 1.0) |
| return (audio_clipped * 32767.0).astype(np.int16) |
|
|
| |
|
|
| def get_kokoro_voices() -> list[str]: |
| try: |
| if list_repo_files: |
| files = list_repo_files("hexgrad/Kokoro-82M") |
| voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")] |
| voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files] |
| return sorted(voices) if voices else _get_fallback_voices() |
| return _get_fallback_voices() |
| except Exception: |
| return _get_fallback_voices() |
|
|
|
|
| def _get_fallback_voices() -> list[str]: |
| return [ |
| "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", |
| "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", "am_michael", "am_onyx", "am_puck", "am_santa", |
| "bf_alice", "bf_emma", "bf_isabella", "bf_lily", |
| "bm_daniel", "bm_fable", "bm_george", "bm_lewis", |
| "ef_dora", "em_alex", "em_santa", |
| "ff_siwis", |
| "hf_alpha", "hf_beta", "hm_omega", "hm_psi", |
| "if_sara", "im_nicola", |
| "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", |
| "pf_dora", "pm_alex", "pm_santa", |
| "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", |
| "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang", |
| ] |
|
|
|
|
| def _init_kokoro() -> None: |
| if _KOKORO_STATE["initialized"]: |
| return |
| if KModel is None or KPipeline is None: |
| raise RuntimeError("Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4).") |
| device = "cpu" |
| if torch is not None: |
| try: |
| if torch.cuda.is_available(): |
| device = "cuda" |
| except Exception: |
| device = "cpu" |
| model = KModel(repo_id="hexgrad/Kokoro-82M").to(device).eval() |
| pipelines = {"a": KPipeline(lang_code="a", model=False, repo_id="hexgrad/Kokoro-82M")} |
| try: |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" |
| except Exception: |
| pass |
| _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines}) |
|
|
| |
|
|
| def _init_supertonic() -> None: |
| if _SUPERTONIC_STATE["initialized"]: |
| return |
| |
| if snapshot_download is None: |
| raise RuntimeError("huggingface_hub is not installed.") |
|
|
| |
| |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| assets_dir = os.path.join(base_dir, "assets", "supertonic") |
| |
| if not os.path.exists(assets_dir): |
| print(f"Downloading Supertonic models to {assets_dir}...") |
| snapshot_download(repo_id="Supertone/supertonic", local_dir=assets_dir) |
| |
| onnx_dir = os.path.join(assets_dir, "onnx") |
| tts = load_text_to_speech(onnx_dir, use_gpu=False) |
| |
| _SUPERTONIC_STATE.update({"initialized": True, "tts": tts, "assets_dir": assets_dir}) |
|
|
|
|
| def get_supertonic_voices() -> list[str]: |
| |
| if not _SUPERTONIC_STATE["initialized"]: |
| |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| assets_dir = os.path.join(base_dir, "assets", "supertonic") |
| if not os.path.exists(assets_dir): |
| |
| return ["F1", "F2", "M1", "M2"] |
| else: |
| assets_dir = _SUPERTONIC_STATE["assets_dir"] |
|
|
| voice_styles_dir = os.path.join(assets_dir, "voice_styles") |
| if not os.path.exists(voice_styles_dir): |
| return ["F1", "F2", "M1", "M2"] |
| |
| files = os.listdir(voice_styles_dir) |
| voices = [f.replace('.json', '') for f in files if f.endswith('.json')] |
| return sorted(voices) |
|
|
|
|
| def List_Kokoro_Voices() -> list[str]: |
| return get_kokoro_voices() |
|
|
| def List_Supertonic_Voices() -> list[str]: |
| return get_supertonic_voices() |
|
|
|
|
| |
| TOOL_SUMMARY = ( |
| "Synthesize speech from text using Supertonic-66M (default) or Kokoro-82M. " |
| "Supertonic: faster, supports steps/silence/chunking. " |
| "Kokoro: slower, supports many languages/accents. " |
| "Return the generated media to the user in this format ``." |
| ) |
|
|
|
|
| @autodoc( |
| summary=TOOL_SUMMARY, |
| ) |
| def Generate_Speech( |
| text: Annotated[str, "The text to synthesize (English)."], |
| model: Annotated[str, "The TTS model to use: 'Supertonic' or 'Kokoro'."] = "Supertonic", |
| speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.3, |
| steps: Annotated[int, "Supertonic only. Diffusion steps (1-50). Higher = better quality but slower."] = 5, |
| voice: Annotated[str, "Voice identifier. Default 'F1' for Supertonic, 'af_heart' for Kokoro."] = "F1", |
| silence_duration: Annotated[float, "Supertonic only. Silence duration between chunks (0.0-2.0s)."] = 0.3, |
| max_chunk_size: Annotated[int, "Supertonic only. Max text chunk length (50-1000)."] = 300, |
| ) -> str: |
| _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), model=model, speed=speed, voice=voice) |
| |
| if not text or not text.strip(): |
| try: |
| _log_call_end("Generate_Speech", "error=empty text") |
| finally: |
| pass |
| raise gr.Error("Please provide non-empty text to synthesize.") |
|
|
| model_lower = model.lower() |
| |
| |
| if model_lower == "kokoro" and voice == "F1": |
| voice = "af_heart" |
| elif model_lower == "supertonic" and voice == "af_heart": |
| voice = "F1" |
|
|
| try: |
| if model_lower == "kokoro": |
| return _generate_kokoro(text, speed, voice) |
| else: |
| |
| return _generate_supertonic(text, speed, voice, steps, silence_duration, max_chunk_size) |
| |
| except gr.Error as exc: |
| _log_call_end("Generate_Speech", f"gr_error={str(exc)}") |
| raise |
| except Exception as exc: |
| _log_call_end("Generate_Speech", f"error={str(exc)[:120]}") |
| raise gr.Error(f"Error during speech generation: {exc}") |
|
|
|
|
| def _generate_kokoro(text: str, speed: float, voice: str) -> str: |
| _init_kokoro() |
| model = _KOKORO_STATE["model"] |
| pipelines = _KOKORO_STATE["pipelines"] |
| pipeline = pipelines.get("a") |
| if pipeline is None: |
| raise gr.Error("Kokoro English pipeline not initialized.") |
| |
| audio_segments = [] |
| pack = pipeline.load_voice(voice) |
| |
| segments = list(pipeline(text, voice, speed)) |
| total_segments = len(segments) |
| for segment_idx, (text_chunk, ps, _) in enumerate(segments): |
| ref_s = pack[len(ps) - 1] |
| try: |
| audio = model(ps, ref_s, float(speed)) |
| audio_segments.append(audio.detach().cpu().numpy()) |
| if total_segments > 10 and (segment_idx + 1) % 5 == 0: |
| print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") |
| except Exception as exc: |
| raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}") |
| |
| if not audio_segments: |
| raise gr.Error("No audio was generated (empty synthesis result).") |
| |
| if len(audio_segments) == 1: |
| final_audio = audio_segments[0] |
| else: |
| final_audio = np.concatenate(audio_segments, axis=0) |
| if total_segments > 1: |
| duration = len(final_audio) / 24_000 |
| print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") |
| |
| |
| filename = f"speech_kokoro_{uuid.uuid4().hex[:8]}.wav" |
| output_path = os.path.join(ROOT_DIR, filename) |
| |
| |
| audio_int16 = (final_audio * 32767).astype(np.int16) |
| scipy.io.wavfile.write(output_path, 24000, audio_int16) |
| |
| _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}") |
| return output_path |
|
|
|
|
| def _generate_supertonic(text: str, speed: float, voice: str, steps: int, silence_duration: float, max_chunk_size: int) -> str: |
| _init_supertonic() |
| tts = _SUPERTONIC_STATE["tts"] |
| assets_dir = _SUPERTONIC_STATE["assets_dir"] |
| |
| voice_path = os.path.join(assets_dir, "voice_styles", f"{voice}.json") |
| if not os.path.exists(voice_path): |
| |
| |
| if not os.path.exists(voice_path): |
| raise gr.Error(f"Voice style {voice} not found for Supertonic.") |
|
|
| style = load_voice_style([voice_path]) |
| |
| sr = tts.sample_rate |
| |
| |
| |
| |
| |
| wav_cat, _ = tts(text, style, steps, speed, silence_duration, max_chunk_size) |
| |
| if wav_cat is None or wav_cat.size == 0: |
| raise gr.Error("No audio generated.") |
|
|
| |
| final_audio = wav_cat.flatten() |
| |
| |
| filename = f"speech_supertonic_{uuid.uuid4().hex[:8]}.wav" |
| output_path = os.path.join(ROOT_DIR, filename) |
| |
| audio_int16 = _audio_np_to_int16(final_audio) |
| scipy.io.wavfile.write(output_path, sr, audio_int16) |
| |
| _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/sr:.2f}") |
| return output_path |
|
|
|
|
| def build_interface() -> gr.Interface: |
| kokoro_voices = get_kokoro_voices() |
| supertonic_voices = get_supertonic_voices() |
| all_voices = sorted(list(set(kokoro_voices + supertonic_voices))) |
|
|
| return gr.Interface( |
| fn=Generate_Speech, |
| inputs=[ |
| gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4, info="The text to synthesize (English)"), |
| gr.Dropdown(label="Model", choices=["Supertonic", "Kokoro"], value="Supertonic", info="The TTS model to use"), |
| gr.Slider(minimum=0.5, maximum=2.0, value=1.3, step=0.1, label="Speed", info="Speech speed multiplier (1.0 = normal)"), |
| gr.Slider(minimum=1, maximum=50, value=5, step=1, label="Steps", info="Supertonic only: Diffusion steps (1-50)"), |
| gr.Dropdown( |
| label="Voice", |
| choices=all_voices, |
| value="F1", |
| info="Select voice (F1/F2/M1/M2 for Supertonic, others for Kokoro)", |
| ), |
| gr.Slider(minimum=0.0, maximum=2.0, value=0.3, step=0.1, label="Silence Duration", info="Supertonic only: Silence duration between chunks"), |
| gr.Slider(minimum=50, maximum=1000, value=300, step=10, label="Max Chunk Size", info="Supertonic only: Max text chunk length"), |
| ], |
| outputs=gr.Audio(label="Audio", type="filepath", format="wav"), |
| title="Generate Speech", |
| description=( |
| "<div style=\"text-align:center\">Generate speech with Supertonic-66M or Kokoro-82M. Runs on CPU.</div>" |
| ), |
| api_description=TOOL_SUMMARY, |
| flagging_mode="never", |
| ) |
|
|
|
|
| __all__ = ["Generate_Speech", "List_Kokoro_Voices", "List_Supertonic_Voices", "build_interface"] |
|
|