Spaces:
Running
Running
| from __future__ import annotations | |
| import numpy as np | |
| import gradio as gr | |
| import os | |
| import uuid | |
| import wave | |
| from .File_System import ROOT_DIR | |
| from typing import Annotated | |
| from app import _log_call_end, _log_call_start, _truncate_for_log | |
| from ._docstrings import autodoc | |
| try: | |
| import torch # type: ignore | |
| except Exception: # pragma: no cover | |
| torch = None # type: ignore | |
| try: | |
| from kokoro import KModel, KPipeline # type: ignore | |
| except Exception: # pragma: no cover | |
| KModel = None # type: ignore | |
| KPipeline = None # type: ignore | |
| _KOKORO_STATE = { | |
| "initialized": False, | |
| "device": "cpu", | |
| "model": None, | |
| "pipelines": {}, | |
| } | |
| def get_kokoro_voices() -> list[str]: | |
| try: | |
| from huggingface_hub import list_repo_files | |
| files = list_repo_files("hexgrad/Kokoro-82M") | |
| voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")] | |
| voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files] | |
| return sorted(voices) if voices else _get_fallback_voices() | |
| except Exception: | |
| return _get_fallback_voices() | |
| def _get_fallback_voices() -> list[str]: | |
| return [ | |
| "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", | |
| "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", "am_michael", "am_onyx", "am_puck", "am_santa", | |
| "bf_alice", "bf_emma", "bf_isabella", "bf_lily", | |
| "bm_daniel", "bm_fable", "bm_george", "bm_lewis", | |
| "ef_dora", "em_alex", "em_santa", | |
| "ff_siwis", | |
| "hf_alpha", "hf_beta", "hm_omega", "hm_psi", | |
| "if_sara", "im_nicola", | |
| "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", | |
| "pf_dora", "pm_alex", "pm_santa", | |
| "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", | |
| "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang", | |
| ] | |
| def _init_kokoro() -> None: | |
| if _KOKORO_STATE["initialized"]: | |
| return | |
| if KModel is None or KPipeline is None: | |
| raise RuntimeError("Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4).") | |
| device = "cpu" | |
| if torch is not None: | |
| try: | |
| if torch.cuda.is_available(): | |
| device = "cuda" | |
| except Exception: | |
| device = "cpu" | |
| model = KModel(repo_id="hexgrad/Kokoro-82M").to(device).eval() | |
| pipelines = {"a": KPipeline(lang_code="a", model=False, repo_id="hexgrad/Kokoro-82M")} | |
| try: | |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" | |
| except Exception: | |
| pass | |
| _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines}) | |
| def List_Kokoro_Voices() -> list[str]: | |
| return get_kokoro_voices() | |
| # Single source of truth for the LLM-facing tool description | |
| TOOL_SUMMARY = ( | |
| "Synthesize speech from text using Kokoro-82M; choose voice and speed; returns (sample_rate, waveform). " | |
| "Return the generated media to the user in this format ``." | |
| ) | |
| def Generate_Speech( | |
| text: Annotated[str, "The text to synthesize (English)."], | |
| speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25, | |
| voice: Annotated[ | |
| str, | |
| ( | |
| "Voice identifier from 54 available options. " | |
| "Voice Legend: af=American female, am=American male, bf=British female, bm=British male, ef=European female, " | |
| "em=European male, hf=Hindi female, hm=Hindi male, if=Italian female, im=Italian male, jf=Japanese female, " | |
| "jm=Japanese male, pf=Portuguese female, pm=Portuguese male, zf=Chinese female, zm=Chinese male, ff=French female. " | |
| "All Voices: af_alloy, af_aoede, af_bella, af_heart, af_jessica, af_kore, af_nicole, af_nova, af_river, af_sarah, af_sky, " | |
| "am_adam, am_echo, am_eric, am_fenrir, am_liam, am_michael, am_onyx, am_puck, am_santa, bf_alice, bf_emma, bf_isabella, " | |
| "bf_lily, bm_daniel, bm_fable, bm_george, bm_lewis, ef_dora, em_alex, em_santa, ff_siwis, hf_alpha, hf_beta, hm_omega, hm_psi, " | |
| "if_sara, im_nicola, jf_alpha, jf_gongitsune, jf_nezumi, jf_tebukuro, jm_kumo, pf_dora, pm_alex, pm_santa, zf_xiaobei, " | |
| "zf_xiaoni, zf_xiaoxiao, zf_xiaoyi, zm_yunjian, zm_yunxi, zm_yunxia, zm_yunyang." | |
| ), | |
| ] = "af_heart", | |
| ) -> str: | |
| _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice) | |
| if not text or not text.strip(): | |
| try: | |
| _log_call_end("Generate_Speech", "error=empty text") | |
| finally: | |
| pass | |
| raise gr.Error("Please provide non-empty text to synthesize.") | |
| _init_kokoro() | |
| model = _KOKORO_STATE["model"] | |
| pipelines = _KOKORO_STATE["pipelines"] | |
| pipeline = pipelines.get("a") | |
| if pipeline is None: | |
| raise gr.Error("Kokoro English pipeline not initialized.") | |
| audio_segments = [] | |
| pack = pipeline.load_voice(voice) | |
| try: | |
| segments = list(pipeline(text, voice, speed)) | |
| total_segments = len(segments) | |
| for segment_idx, (text_chunk, ps, _) in enumerate(segments): | |
| ref_s = pack[len(ps) - 1] | |
| try: | |
| audio = model(ps, ref_s, float(speed)) | |
| audio_segments.append(audio.detach().cpu().numpy()) | |
| if total_segments > 10 and (segment_idx + 1) % 5 == 0: | |
| print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") | |
| except Exception as exc: | |
| raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}") | |
| if not audio_segments: | |
| raise gr.Error("No audio was generated (empty synthesis result).") | |
| if len(audio_segments) == 1: | |
| final_audio = audio_segments[0] | |
| else: | |
| final_audio = np.concatenate(audio_segments, axis=0) | |
| if total_segments > 1: | |
| duration = len(final_audio) / 24_000 | |
| print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") | |
| # Save to file | |
| filename = f"speech_{uuid.uuid4().hex[:8]}.wav" | |
| output_path = os.path.join(ROOT_DIR, filename) | |
| # Normalize to 16-bit PCM | |
| # final_audio is float32, likely in [-1, 1]. Scale to int16 range. | |
| audio_int16 = (final_audio * 32767).astype(np.int16) | |
| with wave.open(output_path, 'wb') as wf: | |
| wf.setnchannels(1) | |
| wf.setsampwidth(2) # 16-bit = 2 bytes | |
| wf.setframerate(24000) | |
| wf.writeframes(audio_int16.tobytes()) | |
| _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}") | |
| return output_path | |
| except gr.Error as exc: | |
| _log_call_end("Generate_Speech", f"gr_error={str(exc)}") | |
| raise | |
| except Exception as exc: # pylint: disable=broad-except | |
| _log_call_end("Generate_Speech", f"error={str(exc)[:120]}") | |
| raise gr.Error(f"Error during speech generation: {exc}") | |
| def build_interface() -> gr.Interface: | |
| available_voices = get_kokoro_voices() | |
| return gr.Interface( | |
| fn=Generate_Speech, | |
| inputs=[ | |
| gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), | |
| gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"), | |
| gr.Dropdown( | |
| label="Voice", | |
| choices=available_voices, | |
| value="af_heart", | |
| info="Select from 54 available voices across multiple languages and accents", | |
| ), | |
| ], | |
| outputs=gr.Audio(label="Audio", type="filepath", format="wav"), | |
| title="Generate Speech", | |
| description=( | |
| "<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>" | |
| ), | |
| api_description=TOOL_SUMMARY, | |
| flagging_mode="never", | |
| ) | |
| __all__ = ["Generate_Speech", "List_Kokoro_Voices", "build_interface"] | |