Spaces:
Running
Running
| import gradio as gr | |
| import io | |
| import tempfile | |
| import wave | |
| import numpy as np | |
| # Optional imports for Kokoro TTS (lazy load, CPU-only) | |
| try: | |
| import torch # type: ignore | |
| except Exception: # pragma: no cover | |
| torch = None # type: ignore | |
| try: | |
| from kokoro import KModel, KPipeline # type: ignore | |
| except Exception: # pragma: no cover | |
| KModel = None # type: ignore | |
| KPipeline = None # type: ignore | |
| _KOKORO_STATE = {"initialized": False, "device": "cpu", "model": None, "pipelines": {}} | |
| def _init_kokoro() -> None: | |
| if _KOKORO_STATE["initialized"]: | |
| return | |
| if KModel is None or KPipeline is None: | |
| raise gr.Error("Kokoro is not installed. Please add 'kokoro>=0.9.4' and 'torch' to requirements and install.") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {device}") | |
| model = KModel(repo_id="hexgrad/Kokoro-82M").to(device).eval() | |
| pipelines = {"a": KPipeline(lang_code="a", model=False, repo_id="hexgrad/Kokoro-82M")} | |
| try: | |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" | |
| except Exception: | |
| pass | |
| _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines}) | |
| def get_kokoro_voices(): | |
| """Get list of available Kokoro voice IDs.""" | |
| try: | |
| from huggingface_hub import list_repo_files | |
| files = list_repo_files('hexgrad/Kokoro-82M') | |
| voice_files = [f for f in files if f.endswith('.pt') and f.startswith('voices/')] | |
| voices = [f.replace('voices/', '').replace('.pt', '') for f in voice_files] | |
| return sorted(voices) if voices else ["af_heart"] | |
| except Exception: | |
| return [ | |
| "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", | |
| "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", "am_michael", "am_onyx", "am_puck", "am_santa", | |
| "bf_alice", "bf_emma", "bf_isabella", "bf_lily", | |
| "bm_daniel", "bm_fable", "bm_george", "bm_lewis", | |
| "ef_dora", "em_alex", "em_santa", | |
| "ff_siwis", | |
| "hf_alpha", "hf_beta", "hm_omega", "hm_psi", | |
| "if_sara", "im_nicola", | |
| "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", | |
| "pf_dora", "pm_alex", "pm_santa", | |
| "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" | |
| ] | |
| def _audio_np_to_int16(audio_np: np.ndarray) -> np.ndarray: | |
| audio_clipped = np.clip(audio_np, -1.0, 1.0) | |
| return (audio_clipped * 32767.0).astype(np.int16) | |
| def _write_wav_file(audio_int16: np.ndarray, sample_rate: int = 24_000) -> str: | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| path = tmp.name | |
| with wave.open(path, "wb") as wf: | |
| wf.setnchannels(1) | |
| wf.setsampwidth(2) | |
| wf.setframerate(sample_rate) | |
| wf.writeframes(audio_int16.tobytes()) | |
| return path | |
| def _wav_bytes_from_int16(audio_int16: np.ndarray, sample_rate: int = 24_000) -> bytes: | |
| buffer = io.BytesIO() | |
| with wave.open(buffer, "wb") as wf: | |
| wf.setnchannels(1) | |
| wf.setsampwidth(2) | |
| wf.setframerate(sample_rate) | |
| wf.writeframes(audio_int16.tobytes()) | |
| return buffer.getvalue() | |
| def _kokoro_segment_generator(text: str, speed: float, voice: str): | |
| if not text or not text.strip(): | |
| raise gr.Error("Please enter text to synthesize.") | |
| _init_kokoro() | |
| model = _KOKORO_STATE["model"] | |
| pipelines = _KOKORO_STATE["pipelines"] | |
| pipeline = pipelines.get("a") | |
| if pipeline is None: | |
| raise gr.Error("Kokoro English pipeline not initialized.") | |
| pack = pipeline.load_voice(voice) | |
| try: | |
| for idx, (_, ps, _) in enumerate(pipeline(text, voice, speed)): | |
| ref_s = pack[len(ps) - 1] | |
| try: | |
| audio = model(ps, ref_s, float(speed)) | |
| audio_np = audio.detach().cpu().numpy() | |
| yield audio_np | |
| except Exception as e: | |
| raise gr.Error(f"Error generating audio for segment {idx + 1}: {str(e)[:200]}...") | |
| except gr.Error: | |
| raise | |
| except Exception as e: | |
| raise gr.Error(f"Error during speech generation: {str(e)[:200]}...") | |
| def kokoro_tts(text: str, speed: float, voice: str) -> str: | |
| sr = 24_000 | |
| segments = list(_kokoro_segment_generator(text, speed, voice)) | |
| if not segments: | |
| raise gr.Error("No audio was generated.") | |
| audio_np = segments[0] if len(segments) == 1 else np.concatenate(segments, axis=0) | |
| audio_int16 = _audio_np_to_int16(audio_np) | |
| return _write_wav_file(audio_int16, sr) | |
| def kokoro_tts_stream(text: str, speed: float, voice: str): | |
| sr = 24_000 | |
| produced_any = False | |
| for audio_np in _kokoro_segment_generator(text, speed, voice): | |
| produced_any = True | |
| audio_int16 = _audio_np_to_int16(audio_np) | |
| chunk_bytes = _wav_bytes_from_int16(audio_int16, sr) | |
| yield chunk_bytes | |
| if not produced_any: | |
| raise gr.Error("No audio was generated.") | |
| # Main dispatcher for Kokoro streaming | |
| def generate_tts(text: str, speed: float, voice: str): | |
| """Stream Kokoro speech synthesis output chunk-by-chunk.""" | |
| yield from kokoro_tts_stream(text, speed, voice) | |
| with gr.Blocks() as demo: | |
| gr.HTML("<h1 style='text-align: center;'>Kokoro-TTS</h1><p style='text-align: center;'>Powered by Kokoro-82M on CPU</p>") | |
| available_voices = get_kokoro_voices() | |
| default_kokoro_voice = ( | |
| 'af_nicole' if 'af_nicole' in available_voices | |
| else (available_voices[0] if available_voices else 'af_nicole') | |
| ) | |
| with gr.Row(variant='panel'): | |
| kokoro_speed = gr.Slider( | |
| minimum=0.5, | |
| maximum=2.0, | |
| value=1.2, | |
| step=0.1, | |
| label='Speed' | |
| ) | |
| kokoro_voice = gr.Dropdown( | |
| choices=available_voices, | |
| label='Voice', | |
| value=default_kokoro_voice, | |
| ) | |
| text_input = gr.Textbox( | |
| label="Input Text", | |
| placeholder="Enter the text you want to convert to speech here...", | |
| lines=5, | |
| ) | |
| generate_btn = gr.Button( | |
| "Generate Speech", | |
| variant="primary", | |
| ) | |
| audio_output = gr.Audio( | |
| label="Generated Speech", | |
| streaming=True, | |
| autoplay=True, | |
| buttons=["download"], | |
| ) | |
| generate_inputs = [text_input, kokoro_speed, kokoro_voice] | |
| generate_btn.click( | |
| fn=generate_tts, | |
| inputs=generate_inputs, | |
| outputs=audio_output, | |
| api_name="generate_speech" | |
| ) | |
| text_input.submit( | |
| fn=generate_tts, | |
| inputs=generate_inputs, | |
| outputs=audio_output, | |
| api_name="generate_speech_enter" | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch(debug=True, theme='Nymbo/Nymbo_Theme') |