Spaces:
Running
Running
| # import os | |
| # import json | |
| # import time | |
| # import re | |
| # import numpy as np | |
| # import onnxruntime as ort | |
| # import gradio as gr | |
| # from huggingface_hub import hf_hub_download | |
| # from misaki import en | |
| # from functools import lru_cache | |
| # from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| # import asyncio | |
| # import uvloop | |
| # import uvicorn | |
| # from concurrent.futures import ThreadPoolExecutor | |
| # # --- CONFIGURATION --- | |
| # MODEL_REPO = "onnx-community/Kokoro-82M-v1.0-ONNX" | |
| # MODEL_FILE = "onnx/model.onnx" | |
| # TOKENIZER_FILE = "tokenizer.json" | |
| # # --- VOICE UI --- | |
| # VOICE_CHOICES = { | |
| # 'πΊπΈ πΊ Heart': 'af_heart', 'πΊπΈ πΊ Bella': 'af_bella', 'πΊπΈ πΊ Nicole': 'af_nicole', | |
| # 'πΊπΈ πΊ Aoede': 'af_aoede', 'πΊπΈ πΊ Kore': 'af_kore', 'πΊπΈ πΊ Sarah': 'af_sarah', | |
| # 'πΊπΈ πΊ Nova': 'af_nova', 'πΊπΈ πΊ Sky': 'af_sky', 'πΊπΈ πΊ Alloy': 'af_alloy', | |
| # 'πΊπΈ πΊ Jessica': 'af_jessica', 'πΊπΈ πΊ River': 'af_river', 'πΊπΈ πΉ Michael': 'am_michael', | |
| # 'πΊπΈ πΉ Fenrir': 'am_fenrir', 'πΊπΈ πΉ Puck': 'am_puck', 'πΊπΈ πΉ Echo': 'am_echo', | |
| # 'πΊπΈ πΉ Eric': 'am_eric', 'πΊπΈ πΉ Liam': 'am_liam', 'πΊπΈ πΉ Onyx': 'am_onyx', | |
| # 'πΊπΈ πΉ Santa': 'am_santa', 'πΊπΈ πΉ Adam': 'am_adam', 'π¬π§ πΊ Emma': 'bf_emma', | |
| # 'π¬π§ πΊ Isabella': 'bf_isabella', 'π¬π§ πΊ Alice': 'bf_alice', 'π¬π§ πΊ Lily': 'bf_lily', | |
| # 'π¬π§ πΉ George': 'bm_george', 'π¬π§ πΉ Fable': 'bm_fable', 'π¬π§ πΉ Lewis': 'bm_lewis', | |
| # 'π¬π§ πΉ Daniel': 'bm_daniel', | |
| # } | |
| # # --- ENGINE --- | |
| # print("π BOOTING HIGH-RAM ENGINE...") | |
| # # Enable fast networking immediately | |
| # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
| # # 1. Phonemizer | |
| # G2P = en.G2P(trf=False, british=False, fallback=None) | |
| # # 2. Tokenizer | |
| # vocab_path = hf_hub_download(repo_id=MODEL_REPO, filename=TOKENIZER_FILE) | |
| # with open(vocab_path, "r", encoding="utf-8") as f: | |
| # data = json.load(f) | |
| # TOKENIZER = data["model"]["vocab"] if "model" in data else data.get("vocab", {}) | |
| # # 3. Voices (Lazy Load) | |
| # VOICE_CACHE = {} | |
| # def get_voice(name): | |
| # code = VOICE_CHOICES.get(name, name) | |
| # if code not in VOICE_CACHE: | |
| # try: | |
| # print(f"β¬οΈ Loading Voice: {code}") | |
| # path = hf_hub_download(repo_id=MODEL_REPO, filename=f"voices/{code}.bin") | |
| # VOICE_CACHE[code] = np.fromfile(path, dtype=np.float32).reshape(-1, 1, 256) | |
| # except: | |
| # if 'af_bella' not in VOICE_CACHE: | |
| # p = hf_hub_download(repo_id=MODEL_REPO, filename="voices/af_bella.bin") | |
| # VOICE_CACHE['af_bella'] = np.fromfile(p, dtype=np.float32).reshape(-1, 1, 256) | |
| # return VOICE_CACHE['af_bella'] | |
| # return VOICE_CACHE[code] | |
| # # 4. ONNX Engine | |
| # model_path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) | |
| # sess_options = ort.SessionOptions() | |
| # sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| # sess_options.add_session_config_entry("session.intra_op.allow_spinning", "0") | |
| # sess_options.intra_op_num_threads = 0 | |
| # sess_options.inter_op_num_threads = 0 | |
| # SESSION = ort.InferenceSession(model_path, sess_options, providers=["CPUExecutionProvider"]) | |
| # print("β ENGINE READY") | |
| # # --- CORE LOGIC (Shared by UI and API) --- | |
| # @lru_cache(maxsize=5000) | |
| # def get_tokens(text): | |
| # if "Kokoro" in text: text = text.replace("Kokoro", "kΛOkΙΙΉO") | |
| # phonemes, _ = G2P(text) | |
| # return [TOKENIZER.get(p, 0) for p in phonemes] | |
| # def trim_silence(audio, threshold=0.01): | |
| # if audio.size == 0: return audio | |
| # mask = np.abs(audio) > threshold | |
| # if not np.any(mask): return audio | |
| # start, end = np.argmax(mask), len(mask) - np.argmax(mask[::-1]) | |
| # return audio[max(0, start-50) : min(len(audio), end+50)] | |
| # def infer(text, voice_name, speed): | |
| # if not text.strip(): return None | |
| # ids = get_tokens(text)[:510] | |
| # if not ids: return None | |
| # voice = get_voice(voice_name) | |
| # style = voice[min(len(ids), voice.shape[0]-1)] | |
| # try: | |
| # audio = SESSION.run(None, { | |
| # "input_ids": np.array([[0] + ids + [0]], dtype=np.int64), | |
| # "style": style, | |
| # "speed": np.array([speed], dtype=np.float32) | |
| # })[0] | |
| # return 24000, (np.clip(trim_silence(audio[0]), -1.0, 1.0) * 32767).astype(np.int16) | |
| # except: return None | |
| # def tuned_splitter(text): | |
| # chunks = re.split(r'([.,!?;:\n]+)', text) | |
| # buffer = "" | |
| # chunk_count = 0 | |
| # for part in chunks: | |
| # buffer += part | |
| # if chunk_count == 0: threshold = 50 | |
| # elif chunk_count == 1: threshold = 100 | |
| # elif chunk_count == 2: threshold = 150 | |
| # else: threshold = 250 | |
| # if re.search(r'[.,!?;:\n]$', buffer) and len(buffer) >= threshold: | |
| # if buffer.strip(): | |
| # yield buffer | |
| # chunk_count += 1 | |
| # buffer = "" | |
| # if buffer.strip(): | |
| # yield buffer.strip() | |
| # def stream_generator(text, voice_name, speed): | |
| # print("--- START STREAM ---") | |
| # get_voice(voice_name) | |
| # for i, chunk in enumerate(tuned_splitter(text)): | |
| # t0 = time.time() | |
| # audio = infer(chunk, voice_name, speed) | |
| # if audio: | |
| # dur = time.time() - t0 | |
| # print(f"β‘ Chunk {i}: {len(chunk)} chars in {dur:.2f}s") | |
| # yield audio | |
| # print("--- END STREAM ---") | |
| # # --- UI DEFINITION --- | |
| # with gr.Blocks(title="Kokoro TTS") as app: | |
| # gr.Markdown("## β‘ Kokoro-82M (High-RAM Tuned)") | |
| # with gr.Row(): | |
| # with gr.Column(): | |
| # text_in = gr.Textbox(label="Input Text", lines=3, value="The system is live. Use the Gradio UI for testing, or connect to /ws/audio for the API.") | |
| # voice_in = gr.Dropdown(list(VOICE_CHOICES.keys()), value='πΊπΈ πΊ Bella', label="Voice") | |
| # speed_in = gr.Slider(0.5, 2.0, value=1.0, label="Speed") | |
| # btn = gr.Button("Generate", variant="primary") | |
| # with gr.Column(): | |
| # audio_out = gr.Audio(streaming=True, autoplay=True, label="Audio Stream") | |
| # btn.click(stream_generator, inputs=[text_in, voice_in, speed_in], outputs=[audio_out]) | |
| # # --- API INTEGRATION --- | |
| # # --- API INTEGRATION --- | |
| # from concurrent.futures import ThreadPoolExecutor | |
| # # 1. Define FastAPI | |
| # api = FastAPI() | |
| # # 2. Define Worker Pools | |
| # # We use max_workers=1 because ONNX is already multithreaded internally. | |
| # # Adding more workers on a 2 vCPU machine will actually SLOW it down due to context switching. | |
| # INFERENCE_EXECUTOR = ThreadPoolExecutor(max_workers=1) | |
| # G2P_EXECUTOR = ThreadPoolExecutor(max_workers=1) | |
| # INFERENCE_QUEUE = asyncio.Queue() | |
| # # 3. Background Tasks | |
| # def g2p_task(text): | |
| # # Reuses the exact same G2P/Tokenizer logic as the UI | |
| # if "Kokoro" in text: text = text.replace("Kokoro", "kΛOkΙΙΉO") | |
| # phonemes, _ = G2P(text) | |
| # return [TOKENIZER.get(p, 0) for p in phonemes] | |
| # # This is the "Engine Room". It pulls tickets and cooks them one by one. | |
| # async def audio_engine_loop(): | |
| # print("β‘ API AUDIO PIPELINE STARTED") | |
| # loop = asyncio.get_running_loop() | |
| # while True: | |
| # # Wait for a ticket (text tokens + websocket connection) | |
| # job = await INFERENCE_QUEUE.get() | |
| # tokens, style, speed, ws = job | |
| # try: | |
| # # Check if client is still connected before doing heavy math | |
| # # (FastAPI WS state: 1 = Connected, 2/3 = Closing/Closed) | |
| # if ws.client_state.value > 1: | |
| # continue | |
| # # Reuses the exact same SESSION as the UI | |
| # input_ids = np.array([[0, *tokens[:510], 0]], dtype=np.int64) | |
| # style_vec = style[min(len(tokens), style.shape[0]-1)] | |
| # # --- CRITICAL FIX: Run blocking math in a separate thread --- | |
| # # This allows the main server to keep talking to the other 59 users | |
| # # while this calculation happens in the background. | |
| # audio = await loop.run_in_executor( | |
| # INFERENCE_EXECUTOR, | |
| # lambda: SESSION.run(None, { | |
| # "input_ids": input_ids, | |
| # "style": style_vec, | |
| # "speed": np.array([speed], dtype=np.float32) | |
| # })[0] | |
| # ) | |
| # # Post-Process (Fast enough to run on main thread) | |
| # pcm_bytes = (np.clip(trim_silence(audio[0]), -1.0, 1.0) * 32767).astype(np.int16).tobytes() | |
| # # Send audio back to the specific user who asked for it | |
| # try: | |
| # await ws.send_bytes(pcm_bytes) | |
| # except Exception: | |
| # # If sending fails, just move on. Don't crash the engine. | |
| # pass | |
| # except Exception as e: | |
| # print(f"API Engine Error: {e}") | |
| # @api.on_event("startup") | |
| # async def startup(): | |
| # asyncio.create_task(audio_engine_loop()) | |
| # # ------------------------------------------------------- | |
| # # ROBUST WEBSOCKET ENDPOINT | |
| # # ------------------------------------------------------- | |
| # @api.websocket("/ws/audio") | |
| # async def websocket_endpoint(ws: WebSocket): | |
| # await ws.accept() | |
| # # Defaults | |
| # voice_key = "af_bella" | |
| # speed = 1.0 | |
| # loop = asyncio.get_running_loop() | |
| # print(f"β Client connected: {ws.client}") | |
| # # --- HEARTBEAT KEEPER --- | |
| # # This prevents HF Nginx from killing the connection during silence. | |
| # async def keep_alive(): | |
| # while True: | |
| # try: | |
| # await asyncio.sleep(15) # Send a ping every 15s | |
| # # We send a text frame as a ping. The browser ignores it or handles it. | |
| # await ws.send_json({"type": "ping"}) | |
| # except: | |
| # break | |
| # heartbeat_task = asyncio.create_task(keep_alive()) | |
| # try: | |
| # while True: | |
| # try: | |
| # # Wait for JSON command | |
| # data = await ws.receive_json() | |
| # except WebSocketDisconnect: | |
| # print("β Client disconnected cleanly") | |
| # break # BREAK THE LOOP | |
| # except Exception as e: | |
| # print(f"β οΈ Connection lost: {e}") | |
| # break # BREAK THE LOOP | |
| # # 1. Config Change | |
| # if "config" in data: | |
| # voice_name = data.get("voice", "πΊπΈ πΊ Bella") | |
| # voice_code = VOICE_CHOICES.get(voice_name, voice_name) | |
| # get_voice(voice_name) | |
| # voice_key = voice_code | |
| # speed = float(data.get("speed", speed)) | |
| # # print(f"βοΈ Config updated: {voice_key}") # Commented out to reduce log noise | |
| # # 2. Text Stream | |
| # if "text" in data: | |
| # text = data["text"] | |
| # # The splitter breaks "500 words" into small sentences. | |
| # # These small sentences are added to the queue instantly. | |
| # for chunk in tuned_splitter(text): | |
| # if chunk.strip(): | |
| # # Run G2P in thread to avoid blocking input | |
| # tokens = await loop.run_in_executor(G2P_EXECUTOR, g2p_task, chunk) | |
| # if tokens: | |
| # style = VOICE_CACHE.get(voice_key) | |
| # if style is None: | |
| # get_voice(voice_key) | |
| # style = VOICE_CACHE.get(voice_key) | |
| # # Put the ticket in the global queue | |
| # await INFERENCE_QUEUE.put((tokens, style, speed, ws)) | |
| # if "flush" in data: | |
| # pass | |
| # except Exception as e: | |
| # print(f"π₯ Critical WS Error: {e}") | |
| # finally: | |
| # heartbeat_task.cancel() # Clean up the heartbeat task | |
| # # --- FINAL MOUNT --- | |
| # final_app = gr.mount_gradio_app(api, app, path="/") | |
| # if __name__ == "__main__": | |
| # uvicorn.run(final_app, host="0.0.0.0", port=7860) | |
| #OLD KOKORO CHATGPT CODE | |
| import os | |
| import re | |
| import time | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import numpy as np | |
| import gradio as gr | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| import uvicorn | |
| import torch | |
| from kokoro import KPipeline | |
| # ---------------------------- | |
| # HARD LIMIT CPU THREADS (2 vCPU box) | |
| # ---------------------------- | |
| os.environ.setdefault("OMP_NUM_THREADS", "2") | |
| os.environ.setdefault("MKL_NUM_THREADS", "2") | |
| os.environ.setdefault("NUMEXPR_NUM_THREADS", "2") | |
| try: | |
| torch.set_num_threads(int(os.environ.get("TORCH_NUM_THREADS", "2"))) | |
| torch.set_num_interop_threads(int(os.environ.get("TORCH_NUM_INTEROP_THREADS", "1"))) | |
| except Exception: | |
| pass | |
| # Optional: uvloop for faster event loop on HF Linux | |
| try: | |
| import uvloop # type: ignore | |
| asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
| except Exception: | |
| pass | |
| print("π BOOTING KOKORO (OFFICIAL PIPELINE, LOW LATENCY)") | |
| # ---------------------------- | |
| # VOICES | |
| # ---------------------------- | |
| VOICE_CHOICES = { | |
| "πΊπΈ πΊ Heart": "af_heart", "πΊπΈ πΊ Bella": "af_bella", "πΊπΈ πΊ Nicole": "af_nicole", | |
| "πΊπΈ πΊ Aoede": "af_aoede", "πΊπΈ πΊ Kore": "af_kore", "πΊπΈ πΊ Sarah": "af_sarah", | |
| "πΊπΈ πΊ Nova": "af_nova", "πΊπΈ πΊ Sky": "af_sky", "πΊπΈ πΊ Alloy": "af_alloy", | |
| "πΊπΈ πΊ Jessica": "af_jessica", "πΊπΈ πΊ River": "af_river", "πΊπΈ πΉ Michael": "am_michael", | |
| "πΊπΈ πΉ Fenrir": "am_fenrir", "πΊπΈ πΉ Puck": "am_puck", "πΊπΈ πΉ Echo": "am_echo", | |
| "πΊπΈ πΉ Eric": "am_eric", "πΊπΈ πΉ Liam": "am_liam", "πΊπΈ πΉ Onyx": "am_onyx", | |
| "πΊπΈ πΉ Santa": "am_santa", "πΊπΈ πΉ Adam": "am_adam", "π¬π§ πΊ Emma": "bf_emma", | |
| "π¬π§ πΊ Isabella": "bf_isabella", "π¬π§ πΊ Alice": "bf_alice", "π¬π§ πΊ Lily": "bf_lily", | |
| "π¬π§ πΉ George": "bm_george", "π¬π§ πΉ Fable": "bm_fable", "π¬π§ πΉ Lewis": "bm_lewis", | |
| "π¬π§ πΉ Daniel": "bm_daniel", | |
| } | |
| def voice_to_lang_code(voice_code: str) -> str: | |
| if voice_code.startswith("bf_") or voice_code.startswith("bm_"): | |
| return "b" # British | |
| return "a" # American | |
| # ---------------------------- | |
| # PIPELINES (keep hot in RAM) | |
| # ---------------------------- | |
| PIPELINES = { | |
| "a": KPipeline(lang_code="a"), | |
| "b": KPipeline(lang_code="b"), | |
| } | |
| # ---------------------------- | |
| # TEXT NORMALIZATION (matches your pasted official docs) | |
| # ---------------------------- | |
| def normalize_text(text: str) -> str: | |
| if not text: | |
| return "" | |
| return text.replace("Kokoro", "[Kokoro](/kΛOkΙΙΉO/)") | |
| # ---------------------------- | |
| # LOW LATENCY SEGMENTATION | |
| # One pipeline call per request. | |
| # We inject newlines to let split_pattern=r"\n+" split inside Kokoro. | |
| # We also force a small first segment for fast first audio. | |
| # ---------------------------- | |
| _SENT_BOUNDARY = re.compile(r"([.!?;:])\s+") | |
| def inject_newlines_for_fast_stream(text: str) -> str: | |
| text = normalize_text(text).strip() | |
| if not text: | |
| return "" | |
| # Sentence boundaries -> newline so official split_pattern can segment | |
| text = _SENT_BOUNDARY.sub(r"\1\n", text) | |
| # Also split on existing multi-newlines | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| # Guarantee a small first segment for low time-to-first-audio | |
| if "\n" not in text and len(text) > 90: | |
| cut = text.rfind(" ", 0, 70) | |
| if cut < 35: | |
| cut = 70 | |
| text = text[:cut].strip() + "\n" + text[cut:].strip() | |
| return text | |
| # ---------------------------- | |
| # AUDIO CONVERSION (fast, safe) | |
| # ---------------------------- | |
| def audio_to_int16_np(audio): | |
| if isinstance(audio, torch.Tensor): | |
| audio = audio.detach().cpu() | |
| audio = torch.clamp(audio, -1.0, 1.0) | |
| return (audio * 32767.0).to(torch.int16).numpy() | |
| audio = np.asarray(audio) | |
| audio = np.clip(audio, -1.0, 1.0) | |
| return (audio * 32767.0).astype(np.int16) | |
| def audio_to_pcm_bytes(audio) -> bytes: | |
| return audio_to_int16_np(audio).tobytes() | |
| # ---------------------------- | |
| # OFFICIAL GENERATION PATH (single pipeline call) | |
| # generator = pipeline(text, voice='af_heart', speed=1, split_pattern=r'\n+') | |
| # ---------------------------- | |
| def kokoro_generator_full(text: str, voice_code: str, speed: float): | |
| lang_code = voice_to_lang_code(voice_code) | |
| pipeline = PIPELINES[lang_code] | |
| text = inject_newlines_for_fast_stream(text) | |
| if not text: | |
| return | |
| with torch.inference_mode(): | |
| generator = pipeline( | |
| text, | |
| voice=voice_code, | |
| speed=float(speed), | |
| split_pattern=r"\n+", | |
| ) | |
| for _, _, audio in generator: | |
| yield audio | |
| # ---------------------------- | |
| # WARMUP (pay cold-start cost at boot) | |
| # ---------------------------- | |
| def warmup(): | |
| try: | |
| t0 = time.time() | |
| for _ in kokoro_generator_full("Hello.", "af_bella", 1.0): | |
| break | |
| print(f"β WARMUP DONE in {time.time() - t0:.2f}s") | |
| except Exception as e: | |
| print(f"β οΈ WARMUP FAILED: {e}") | |
| # ---------------------------- | |
| # GRADIO UI STREAM | |
| # ---------------------------- | |
| def gradio_stream(text, voice_name, speed): | |
| voice_code = VOICE_CHOICES.get(voice_name, voice_name) | |
| text = normalize_text(text) | |
| i = 0 | |
| t0 = time.time() | |
| for audio in kokoro_generator_full(text, voice_code, speed): | |
| if i == 0: | |
| print(f"β‘ UI first audio in {time.time() - t0:.2f}s") | |
| i += 1 | |
| yield 24000, audio_to_int16_np(audio) | |
| # ---------------------------- | |
| # FASTAPI WS ENGINE | |
| # Single worker thread for actual generation. | |
| # Stream frames to client as soon as they exist. | |
| # No buffering a full list before sending. | |
| # ---------------------------- | |
| api = FastAPI() | |
| INFERENCE_EXECUTOR = ThreadPoolExecutor(max_workers=1) | |
| INFERENCE_QUEUE: asyncio.Queue = asyncio.Queue() | |
| async def audio_engine_loop(): | |
| print("β‘ API AUDIO PIPELINE STARTED") | |
| loop = asyncio.get_running_loop() | |
| while True: | |
| ws, voice_code, speed, text = await INFERENCE_QUEUE.get() | |
| # Skip dead clients early | |
| if ws.client_state.value > 1: | |
| continue | |
| frame_q: asyncio.Queue = asyncio.Queue(maxsize=6) | |
| def _worker(): | |
| try: | |
| for audio in kokoro_generator_full(text, voice_code, speed): | |
| b = audio_to_pcm_bytes(audio) | |
| # backpressure aware | |
| while True: | |
| try: | |
| loop.call_soon_threadsafe(frame_q.put_nowait, b) | |
| break | |
| except Exception: | |
| time.sleep(0.001) | |
| loop.call_soon_threadsafe(frame_q.put_nowait, None) | |
| except Exception as e: | |
| print(f"API Worker Error: {e}") | |
| try: | |
| loop.call_soon_threadsafe(frame_q.put_nowait, None) | |
| except Exception: | |
| pass | |
| INFERENCE_EXECUTOR.submit(_worker) | |
| first_sent = False | |
| started = time.time() | |
| while True: | |
| frame = await frame_q.get() | |
| if frame is None: | |
| break | |
| if ws.client_state.value > 1: | |
| break | |
| try: | |
| await ws.send_bytes(frame) | |
| if not first_sent: | |
| print(f"β‘ API first audio in {time.time() - started:.2f}s") | |
| first_sent = True | |
| except Exception: | |
| break | |
| async def startup(): | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor(INFERENCE_EXECUTOR, warmup) | |
| asyncio.create_task(audio_engine_loop()) | |
| async def websocket_endpoint(ws: WebSocket): | |
| await ws.accept() | |
| voice_code = "af_bella" | |
| speed = 1.0 | |
| print(f"β Client connected: {ws.client}") | |
| async def keep_alive(): | |
| while True: | |
| try: | |
| await asyncio.sleep(15) | |
| await ws.send_json({"type": "ping"}) | |
| except Exception: | |
| break | |
| heartbeat_task = asyncio.create_task(keep_alive()) | |
| try: | |
| while True: | |
| try: | |
| data = await ws.receive_json() | |
| except WebSocketDisconnect: | |
| print("β Client disconnected cleanly") | |
| break | |
| except Exception as e: | |
| print(f"β οΈ Connection lost: {e}") | |
| break | |
| if "config" in data: | |
| voice_name = data.get("voice", "πΊπΈ πΊ Bella") | |
| voice_code = VOICE_CHOICES.get(voice_name, voice_name) | |
| speed = float(data.get("speed", speed)) | |
| if "text" in data: | |
| text = normalize_text(data.get("text", "")) | |
| if text.strip(): | |
| await INFERENCE_QUEUE.put((ws, voice_code, speed, text)) | |
| if "flush" in data: | |
| pass | |
| finally: | |
| heartbeat_task.cancel() | |
| # ---------------------------- | |
| # GRADIO APP | |
| # ---------------------------- | |
| with gr.Blocks(title="Kokoro TTS") as app: | |
| gr.Markdown("## β‘ Kokoro-82M (Official Pipeline, Low Latency)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_in = gr.Textbox( | |
| label="Input Text", | |
| lines=3, | |
| value="The system is live. Use the Gradio UI, or connect to /ws/audio.", | |
| ) | |
| voice_in = gr.Dropdown( | |
| list(VOICE_CHOICES.keys()), | |
| value="πΊπΈ πΊ Bella", | |
| label="Voice", | |
| ) | |
| speed_in = gr.Slider(0.5, 2.0, value=1.0, label="Speed") | |
| btn = gr.Button("Generate", variant="primary") | |
| with gr.Column(): | |
| audio_out = gr.Audio(streaming=True, autoplay=True, label="Audio Stream") | |
| btn.click(gradio_stream, inputs=[text_in, voice_in, speed_in], outputs=[audio_out]) | |
| final_app = gr.mount_gradio_app(api, app, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run(final_app, host="0.0.0.0", port=7860) | |
| #claude code | |
| # """ | |
| # Kokoro TTS WebSocket Server - OPTIMIZED for 2 vCPU / 16GB RAM | |
| # ============================================================ | |
| # Fixes: | |
| # - Backpressure loop timeout prevents worker thread hang | |
| # - Parallel inference workers (2, one per vCPU) | |
| # - Proper error handling with traceback logging | |
| # - Generation timeout to prevent infinite hangs | |
| # - Memory-optimized with periodic garbage collection | |
| # - Aggressive batching for throughput | |
| # """ | |
| # import os | |
| # import re | |
| # import gc | |
| # import time | |
| # import asyncio | |
| # import traceback | |
| # from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
| # import numpy as np | |
| # import gradio as gr | |
| # from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| # import uvicorn | |
| # import torch | |
| # from kokoro import KPipeline | |
| # # ---------------------------- | |
| # # MAXIMIZE 2 vCPU UTILIZATION | |
| # # ---------------------------- | |
| # CPU_COUNT = 2 | |
| # os.environ["OMP_NUM_THREADS"] = str(CPU_COUNT) | |
| # os.environ["MKL_NUM_THREADS"] = str(CPU_COUNT) | |
| # os.environ["NUMEXPR_NUM_THREADS"] = str(CPU_COUNT) | |
| # os.environ["OPENBLAS_NUM_THREADS"] = str(CPU_COUNT) | |
| # try: | |
| # torch.set_num_threads(CPU_COUNT) | |
| # torch.set_num_interop_threads(CPU_COUNT) | |
| # except Exception: | |
| # pass | |
| # # Use uvloop for faster async on Linux | |
| # try: | |
| # import uvloop | |
| # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
| # print("β Using uvloop for faster async") | |
| # except ImportError: | |
| # print("β οΈ uvloop not available, using default event loop") | |
| # print(f"π BOOTING KOKORO - Optimized for {CPU_COUNT} vCPU / 16GB RAM") | |
| # # ---------------------------- | |
| # # CONFIGURATION | |
| # # ---------------------------- | |
| # GENERATION_TIMEOUT_SECONDS = 60 # Max time for a single TTS generation | |
| # BACKPRESSURE_TIMEOUT_MS = 10000 # Max wait for queue space (10 seconds) | |
| # WORKER_COUNT = 2 # One per vCPU for parallel processing | |
| # QUEUE_MAXSIZE = 12 # Buffer more frames for smoother streaming | |
| # # ---------------------------- | |
| # # VOICES | |
| # # ---------------------------- | |
| # VOICE_CHOICES = { | |
| # "πΊπΈ πΊ Heart": "af_heart", "πΊπΈ πΊ Bella": "af_bella", "πΊπΈ πΊ Nicole": "af_nicole", | |
| # "πΊπΈ πΊ Aoede": "af_aoede", "πΊπΈ πΊ Kore": "af_kore", "πΊπΈ πΊ Sarah": "af_sarah", | |
| # "πΊπΈ πΊ Nova": "af_nova", "πΊπΈ πΊ Sky": "af_sky", "πΊπΈ πΊ Alloy": "af_alloy", | |
| # "πΊπΈ πΊ Jessica": "af_jessica", "πΊπΈ πΊ River": "af_river", "πΊπΈ πΉ Michael": "am_michael", | |
| # "πΊπΈ πΉ Fenrir": "am_fenrir", "πΊπΈ πΉ Puck": "am_puck", "πΊπΈ πΉ Echo": "am_echo", | |
| # "πΊπΈ πΉ Eric": "am_eric", "πΊπΈ πΉ Liam": "am_liam", "πΊπΈ πΉ Onyx": "am_onyx", | |
| # "πΊπΈ πΉ Santa": "am_santa", "πΊπΈ πΉ Adam": "am_adam", "π¬π§ πΊ Emma": "bf_emma", | |
| # "π¬π§ πΊ Isabella": "bf_isabella", "π¬π§ πΊ Alice": "bf_alice", "π¬π§ πΊ Lily": "bf_lily", | |
| # "π¬π§ πΉ George": "bm_george", "π¬π§ πΉ Fable": "bm_fable", "π¬π§ πΉ Lewis": "bm_lewis", | |
| # "π¬π§ πΉ Daniel": "bm_daniel", | |
| # } | |
| # def voice_to_lang_code(voice_code: str) -> str: | |
| # if voice_code.startswith("bf_") or voice_code.startswith("bm_"): | |
| # return "b" # British | |
| # return "a" # American | |
| # # ---------------------------- | |
| # # PIPELINES (hot in RAM - uses ~2GB per pipeline) | |
| # # With 16GB RAM we can comfortably hold both | |
| # # ---------------------------- | |
| # print("π¦ Loading Kokoro pipelines into RAM...") | |
| # PIPELINES = { | |
| # "a": KPipeline(lang_code="a"), | |
| # "b": KPipeline(lang_code="b"), | |
| # } | |
| # print(f"β Pipelines loaded. Memory usage: ~4GB for models") | |
| # # ---------------------------- | |
| # # TEXT NORMALIZATION | |
| # # ---------------------------- | |
| # def normalize_text(text: str) -> str: | |
| # if not text: | |
| # return "" | |
| # # Kokoro pronunciation helper | |
| # text = text.replace("Kokoro", "[Kokoro](/kΛOkΙΙΉO/)") | |
| # return text | |
| # # ---------------------------- | |
| # # FAST SEGMENTATION FOR STREAMING | |
| # # ---------------------------- | |
| # _SENT_BOUNDARY = re.compile(r"([.!?;:])\s+") | |
| # def inject_newlines_for_fast_stream(text: str) -> str: | |
| # text = normalize_text(text).strip() | |
| # if not text: | |
| # return "" | |
| # # Sentence boundaries -> newline for pipeline segmentation | |
| # text = _SENT_BOUNDARY.sub(r"\1\n", text) | |
| # # Normalize excessive newlines | |
| # text = re.sub(r"\n{3,}", "\n\n", text) | |
| # # Guarantee a small first segment for low time-to-first-audio | |
| # if "\n" not in text and len(text) > 90: | |
| # cut = text.rfind(" ", 0, 70) | |
| # if cut < 35: | |
| # cut = 70 | |
| # text = text[:cut].strip() + "\n" + text[cut:].strip() | |
| # return text | |
| # # ---------------------------- | |
| # # AUDIO CONVERSION (optimized) | |
| # # ---------------------------- | |
| # def audio_to_int16_np(audio): | |
| # if isinstance(audio, torch.Tensor): | |
| # audio = audio.detach().cpu() | |
| # audio = torch.clamp(audio, -1.0, 1.0) | |
| # return (audio * 32767.0).to(torch.int16).numpy() | |
| # audio = np.asarray(audio, dtype=np.float32) | |
| # audio = np.clip(audio, -1.0, 1.0) | |
| # return (audio * 32767.0).astype(np.int16) | |
| # def audio_to_pcm_bytes(audio) -> bytes: | |
| # return audio_to_int16_np(audio).tobytes() | |
| # # ---------------------------- | |
| # # GENERATION WITH TIMEOUT | |
| # # ---------------------------- | |
| # def kokoro_generator_full(text: str, voice_code: str, speed: float): | |
| # """ | |
| # Generate audio chunks from text using Kokoro pipeline. | |
| # Yields audio tensors for each segment. | |
| # """ | |
| # lang_code = voice_to_lang_code(voice_code) | |
| # pipeline = PIPELINES[lang_code] | |
| # text = inject_newlines_for_fast_stream(text) | |
| # if not text: | |
| # return | |
| # chunk_count = 0 | |
| # start_time = time.time() | |
| # try: | |
| # with torch.inference_mode(): | |
| # generator = pipeline( | |
| # text, | |
| # voice=voice_code, | |
| # speed=float(speed), | |
| # split_pattern=r"\n+", | |
| # ) | |
| # for _, _, audio in generator: | |
| # chunk_count += 1 | |
| # elapsed = time.time() - start_time | |
| # # Timeout protection | |
| # if elapsed > GENERATION_TIMEOUT_SECONDS: | |
| # print(f"β οΈ Generation timeout after {elapsed:.1f}s, {chunk_count} chunks") | |
| # break | |
| # yield audio | |
| # print(f"β Generated {chunk_count} chunks in {time.time() - start_time:.2f}s") | |
| # except Exception as e: | |
| # print(f"β Generation error: {e}") | |
| # traceback.print_exc() | |
| # finally: | |
| # # Periodic garbage collection to prevent memory buildup | |
| # if chunk_count > 10: | |
| # gc.collect() | |
| # # ---------------------------- | |
| # # WARMUP (preload models) | |
| # # ---------------------------- | |
| # def warmup(): | |
| # try: | |
| # t0 = time.time() | |
| # for _ in kokoro_generator_full("Hello, this is a warmup test.", "af_bella", 1.0): | |
| # break | |
| # print(f"β WARMUP DONE in {time.time() - t0:.2f}s") | |
| # except Exception as e: | |
| # print(f"β οΈ WARMUP FAILED: {e}") | |
| # traceback.print_exc() | |
| # # ---------------------------- | |
| # # GRADIO UI STREAM | |
| # # ---------------------------- | |
| # def gradio_stream(text, voice_name, speed): | |
| # voice_code = VOICE_CHOICES.get(voice_name, voice_name) | |
| # text = normalize_text(text) | |
| # i = 0 | |
| # t0 = time.time() | |
| # for audio in kokoro_generator_full(text, voice_code, speed): | |
| # if i == 0: | |
| # print(f"β‘ UI first audio in {time.time() - t0:.2f}s") | |
| # i += 1 | |
| # yield 24000, audio_to_int16_np(audio) | |
| # # ---------------------------- | |
| # # FASTAPI WEBSOCKET ENGINE | |
| # # ---------------------------- | |
| # api = FastAPI() | |
| # # Use multiple workers for parallel inference | |
| # INFERENCE_EXECUTOR = ThreadPoolExecutor(max_workers=WORKER_COUNT) | |
| # INFERENCE_QUEUE: asyncio.Queue = asyncio.Queue() | |
| # async def audio_engine_loop(): | |
| # """ | |
| # Main audio processing loop. | |
| # Pulls requests from queue and streams audio back to clients. | |
| # """ | |
| # print(f"β‘ API AUDIO PIPELINE STARTED ({WORKER_COUNT} workers)") | |
| # loop = asyncio.get_running_loop() | |
| # while True: | |
| # try: | |
| # ws, voice_code, speed, text = await INFERENCE_QUEUE.get() | |
| # except Exception as e: | |
| # print(f"β οΈ Queue get error: {e}") | |
| # continue | |
| # # Skip dead clients early | |
| # try: | |
| # if ws.client_state.value > 1: | |
| # print("βοΈ Skipping dead client") | |
| # continue | |
| # except Exception: | |
| # continue | |
| # frame_q: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE) | |
| # generation_id = id(ws) | |
| # def _worker(): | |
| # """Worker thread for audio generation.""" | |
| # chunk_count = 0 | |
| # start_time = time.time() | |
| # try: | |
| # print(f"π [{generation_id}] Starting TTS: {text[:50]}...") | |
| # for audio in kokoro_generator_full(text, voice_code, speed): | |
| # b = audio_to_pcm_bytes(audio) | |
| # chunk_count += 1 | |
| # if chunk_count == 1: | |
| # print(f"β‘ [{generation_id}] First chunk ready in {time.time() - start_time:.2f}s") | |
| # # Backpressure with TIMEOUT to prevent infinite hang | |
| # attempts = 0 | |
| # max_attempts = BACKPRESSURE_TIMEOUT_MS # 10 seconds at 1ms/attempt | |
| # while attempts < max_attempts: | |
| # try: | |
| # loop.call_soon_threadsafe(frame_q.put_nowait, b) | |
| # break | |
| # except asyncio.QueueFull: | |
| # time.sleep(0.001) | |
| # attempts += 1 | |
| # else: | |
| # # Timeout reached - client too slow or disconnected | |
| # print(f"β οΈ [{generation_id}] Backpressure timeout after {attempts}ms - aborting") | |
| # break | |
| # # Send completion signal | |
| # loop.call_soon_threadsafe(frame_q.put_nowait, None) | |
| # print(f"β [{generation_id}] Completed: {chunk_count} chunks in {time.time() - start_time:.2f}s") | |
| # except Exception as e: | |
| # print(f"β [{generation_id}] Worker error: {e}") | |
| # traceback.print_exc() | |
| # try: | |
| # loop.call_soon_threadsafe(frame_q.put_nowait, None) | |
| # except Exception: | |
| # pass | |
| # # Submit to executor | |
| # INFERENCE_EXECUTOR.submit(_worker) | |
| # # Stream frames to client | |
| # first_sent = False | |
| # started = time.time() | |
| # frames_sent = 0 | |
| # while True: | |
| # try: | |
| # # Timeout on frame retrieval to prevent infinite hang | |
| # frame = await asyncio.wait_for(frame_q.get(), timeout=30.0) | |
| # except asyncio.TimeoutError: | |
| # print(f"β οΈ [{generation_id}] Frame queue timeout - no data for 30s") | |
| # break | |
| # if frame is None: | |
| # break | |
| # # Check client still alive | |
| # try: | |
| # if ws.client_state.value > 1: | |
| # print(f"βοΈ [{generation_id}] Client disconnected mid-stream") | |
| # break | |
| # except Exception: | |
| # break | |
| # try: | |
| # await ws.send_bytes(frame) | |
| # frames_sent += 1 | |
| # if not first_sent: | |
| # print(f"β‘ [{generation_id}] First audio sent in {time.time() - started:.2f}s") | |
| # first_sent = True | |
| # except Exception as e: | |
| # print(f"β οΈ [{generation_id}] Send failed: {e}") | |
| # break | |
| # print(f"π€ [{generation_id}] Streaming complete: {frames_sent} frames sent") | |
| # @api.on_event("startup") | |
| # async def startup(): | |
| # loop = asyncio.get_running_loop() | |
| # # Warmup in executor to not block startup | |
| # await loop.run_in_executor(INFERENCE_EXECUTOR, warmup) | |
| # # Start the audio engine loop | |
| # asyncio.create_task(audio_engine_loop()) | |
| # print("π Server ready!") | |
| # @api.websocket("/ws/audio") | |
| # async def websocket_endpoint(ws: WebSocket): | |
| # await ws.accept() | |
| # voice_code = "af_bella" | |
| # speed = 1.0 | |
| # client_id = id(ws) | |
| # print(f"β [{client_id}] Client connected: {ws.client}") | |
| # async def keep_alive(): | |
| # """Send periodic pings to keep connection alive.""" | |
| # while True: | |
| # try: | |
| # await asyncio.sleep(15) | |
| # await ws.send_json({"type": "ping"}) | |
| # except Exception: | |
| # break | |
| # heartbeat_task = asyncio.create_task(keep_alive()) | |
| # try: | |
| # while True: | |
| # try: | |
| # data = await asyncio.wait_for(ws.receive_json(), timeout=120.0) | |
| # except asyncio.TimeoutError: | |
| # print(f"β±οΈ [{client_id}] Connection timeout - no messages for 120s") | |
| # break | |
| # except WebSocketDisconnect: | |
| # print(f"β [{client_id}] Client disconnected cleanly") | |
| # break | |
| # except Exception as e: | |
| # print(f"β οΈ [{client_id}] Connection error: {e}") | |
| # break | |
| # # Handle config updates | |
| # if "config" in data: | |
| # voice_name = data.get("voice", "πΊπΈ πΊ Bella") | |
| # voice_code = VOICE_CHOICES.get(voice_name, voice_name) | |
| # speed = float(data.get("speed", speed)) | |
| # print(f"ποΈ [{client_id}] Config: voice={voice_code}, speed={speed}") | |
| # # Handle text-to-speech request | |
| # if "text" in data: | |
| # text = normalize_text(data.get("text", "")) | |
| # if text.strip(): | |
| # print(f"π₯ [{client_id}] TTS request: {text[:50]}...") | |
| # await INFERENCE_QUEUE.put((ws, voice_code, speed, text)) | |
| # # Handle flush (no-op for now, could clear queue) | |
| # if "flush" in data: | |
| # pass | |
| # finally: | |
| # heartbeat_task.cancel() | |
| # print(f"π [{client_id}] Connection closed") | |
| # # ---------------------------- | |
| # # HEALTH CHECK ENDPOINT | |
| # # ---------------------------- | |
| # @api.get("/health") | |
| # async def health_check(): | |
| # return { | |
| # "status": "healthy", | |
| # "workers": WORKER_COUNT, | |
| # "queue_size": INFERENCE_QUEUE.qsize(), | |
| # } | |
| # # ---------------------------- | |
| # # GRADIO APP | |
| # # ---------------------------- | |
| # with gr.Blocks(title="Kokoro TTS") as app: | |
| # gr.Markdown("## β‘ Kokoro-82M (Optimized for 2 vCPU / 16GB RAM)") | |
| # gr.Markdown("API: Connect to `/ws/audio` for real-time streaming") | |
| # with gr.Row(): | |
| # with gr.Column(): | |
| # text_in = gr.Textbox( | |
| # label="Input Text", | |
| # lines=3, | |
| # value="Hello! This is the Kokoro text-to-speech system. The server is optimized for low latency streaming.", | |
| # ) | |
| # voice_in = gr.Dropdown( | |
| # list(VOICE_CHOICES.keys()), | |
| # value="πΊπΈ πΊ Bella", | |
| # label="Voice", | |
| # ) | |
| # speed_in = gr.Slider(0.5, 2.0, value=1.0, label="Speed") | |
| # btn = gr.Button("Generate", variant="primary") | |
| # with gr.Column(): | |
| # audio_out = gr.Audio(streaming=True, autoplay=True, label="Audio Stream") | |
| # btn.click(gradio_stream, inputs=[text_in, voice_in, speed_in], outputs=[audio_out]) | |
| # final_app = gr.mount_gradio_app(api, app, path="/") | |
| # if __name__ == "__main__": | |
| # uvicorn.run( | |
| # final_app, | |
| # host="0.0.0.0", | |
| # port=7860, | |
| # workers=1, # Single process, multiple threads | |
| # log_level="info", | |
| # ) | |