Nymbo commited on
Commit
e1583dc
·
verified ·
1 Parent(s): fee3c6d

removing Supertonic from Generate_Speech

Browse files
Files changed (1) hide show
  1. Modules/Generate_Speech.py +77 -554
Modules/Generate_Speech.py CHANGED
@@ -1,22 +1,14 @@
1
  from __future__ import annotations
2
 
3
- import json
 
4
  import os
5
- import time
6
- from contextlib import contextmanager
7
- from typing import Optional, Annotated
8
- from unicodedata import normalize
9
- import re
10
  import uuid
11
- import io
12
  import wave
 
13
 
14
- import numpy as np
15
- import onnxruntime as ort
16
- import scipy.io.wavfile
17
- import gradio as gr
18
 
19
- from .File_System import ROOT_DIR
20
  from app import _log_call_end, _log_call_start, _truncate_for_log
21
  from ._docstrings import autodoc
22
 
@@ -31,359 +23,6 @@ except Exception: # pragma: no cover
31
  KModel = None # type: ignore
32
  KPipeline = None # type: ignore
33
 
34
- try:
35
- from huggingface_hub import snapshot_download, list_repo_files
36
- except ImportError:
37
- snapshot_download = None
38
- list_repo_files = None
39
-
40
-
41
- # --- Supertonic Helper Classes & Functions ---
42
-
43
- class UnicodeProcessor:
44
- def __init__(self, unicode_indexer_path: str):
45
- with open(unicode_indexer_path, "r") as f:
46
- self.indexer = json.load(f)
47
-
48
- def _preprocess_text(self, text: str) -> str:
49
- # TODO: add more preprocessing
50
- text = normalize("NFKD", text)
51
- return text
52
-
53
- def _get_text_mask(self, text_ids_lengths: np.ndarray) -> np.ndarray:
54
- text_mask = length_to_mask(text_ids_lengths)
55
- return text_mask
56
-
57
- def _text_to_unicode_values(self, text: str) -> np.ndarray:
58
- unicode_values = np.array(
59
- [ord(char) for char in text], dtype=np.uint16
60
- ) # 2 bytes
61
- return unicode_values
62
-
63
- def __call__(self, text_list: list[str]) -> tuple[np.ndarray, np.ndarray]:
64
- text_list = [self._preprocess_text(t) for t in text_list]
65
- text_ids_lengths = np.array([len(text) for text in text_list], dtype=np.int64)
66
- text_ids = np.zeros((len(text_list), text_ids_lengths.max()), dtype=np.int64)
67
- for i, text in enumerate(text_list):
68
- unicode_vals = self._text_to_unicode_values(text)
69
- text_ids[i, : len(unicode_vals)] = np.array(
70
- [self.indexer[val] for val in unicode_vals], dtype=np.int64
71
- )
72
- text_mask = self._get_text_mask(text_ids_lengths)
73
- return text_ids, text_mask
74
-
75
-
76
- class Style:
77
- def __init__(self, style_ttl_onnx: np.ndarray, style_dp_onnx: np.ndarray):
78
- self.ttl = style_ttl_onnx
79
- self.dp = style_dp_onnx
80
-
81
-
82
- class TextToSpeech:
83
- def __init__(
84
- self,
85
- cfgs: dict,
86
- text_processor: UnicodeProcessor,
87
- dp_ort: ort.InferenceSession,
88
- text_enc_ort: ort.InferenceSession,
89
- vector_est_ort: ort.InferenceSession,
90
- vocoder_ort: ort.InferenceSession,
91
- ):
92
- self.cfgs = cfgs
93
- self.text_processor = text_processor
94
- self.dp_ort = dp_ort
95
- self.text_enc_ort = text_enc_ort
96
- self.vector_est_ort = vector_est_ort
97
- self.vocoder_ort = vocoder_ort
98
- self.sample_rate = cfgs["ae"]["sample_rate"]
99
- self.base_chunk_size = cfgs["ae"]["base_chunk_size"]
100
- self.chunk_compress_factor = cfgs["ttl"]["chunk_compress_factor"]
101
- self.ldim = cfgs["ttl"]["latent_dim"]
102
-
103
- def sample_noisy_latent(
104
- self, duration: np.ndarray
105
- ) -> tuple[np.ndarray, np.ndarray]:
106
- bsz = len(duration)
107
- wav_len_max = duration.max() * self.sample_rate
108
- wav_lengths = (duration * self.sample_rate).astype(np.int64)
109
- chunk_size = self.base_chunk_size * self.chunk_compress_factor
110
- latent_len = ((wav_len_max + chunk_size - 1) / chunk_size).astype(np.int32)
111
- latent_dim = self.ldim * self.chunk_compress_factor
112
- noisy_latent = np.random.randn(bsz, latent_dim, latent_len).astype(np.float32)
113
- latent_mask = get_latent_mask(
114
- wav_lengths, self.base_chunk_size, self.chunk_compress_factor
115
- )
116
-
117
- noisy_latent = noisy_latent * latent_mask
118
- return noisy_latent, latent_mask
119
-
120
- def _infer(
121
- self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05
122
- ) -> tuple[np.ndarray, np.ndarray]:
123
- assert (
124
- len(text_list) == style.ttl.shape[0]
125
- ), "Number of texts must match number of style vectors"
126
- bsz = len(text_list)
127
- text_ids, text_mask = self.text_processor(text_list)
128
- dur_onnx, *_ = self.dp_ort.run(
129
- None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask}
130
- )
131
- dur_onnx = dur_onnx / speed
132
- text_emb_onnx, *_ = self.text_enc_ort.run(
133
- None,
134
- {"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask},
135
- ) # dur_onnx: [bsz]
136
- xt, latent_mask = self.sample_noisy_latent(dur_onnx)
137
- total_step_np = np.array([total_step] * bsz, dtype=np.float32)
138
- for step in range(total_step):
139
- current_step = np.array([step] * bsz, dtype=np.float32)
140
- xt, *_ = self.vector_est_ort.run(
141
- None,
142
- {
143
- "noisy_latent": xt,
144
- "text_emb": text_emb_onnx,
145
- "style_ttl": style.ttl,
146
- "text_mask": text_mask,
147
- "latent_mask": latent_mask,
148
- "current_step": current_step,
149
- "total_step": total_step_np,
150
- },
151
- )
152
- wav, *_ = self.vocoder_ort.run(None, {"latent": xt})
153
- return wav, dur_onnx
154
-
155
- def __call__(
156
- self,
157
- text: str,
158
- style: Style,
159
- total_step: int,
160
- speed: float = 1.05,
161
- silence_duration: float = 0.3,
162
- max_len: int = 300,
163
- ) -> tuple[np.ndarray, np.ndarray]:
164
- assert (
165
- style.ttl.shape[0] == 1
166
- ), "Single speaker text to speech only supports single style"
167
- text_list = chunk_text(text, max_len=max_len)
168
- wav_cat = None
169
- dur_cat = None
170
- for text in text_list:
171
- wav, dur_onnx = self._infer([text], style, total_step, speed)
172
- if wav_cat is None:
173
- wav_cat = wav
174
- dur_cat = dur_onnx
175
- else:
176
- silence = np.zeros(
177
- (1, int(silence_duration * self.sample_rate)), dtype=np.float32
178
- )
179
- wav_cat = np.concatenate([wav_cat, silence, wav], axis=1)
180
- dur_cat += dur_onnx + silence_duration
181
- return wav_cat, dur_cat
182
-
183
- def stream(
184
- self,
185
- text: str,
186
- style: Style,
187
- total_step: int,
188
- speed: float = 1.05,
189
- silence_duration: float = 0.3,
190
- max_len: int = 300,
191
- ):
192
- assert (
193
- style.ttl.shape[0] == 1
194
- ), "Single speaker text to speech only supports single style"
195
- text_list = chunk_text(text, max_len=max_len)
196
-
197
- for i, text in enumerate(text_list):
198
- wav, _ = self._infer([text], style, total_step, speed)
199
- yield wav.flatten()
200
-
201
- if i < len(text_list) - 1:
202
- silence = np.zeros(
203
- (int(silence_duration * self.sample_rate),), dtype=np.float32
204
- )
205
- yield silence
206
-
207
- def batch(
208
- self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05
209
- ) -> tuple[np.ndarray, np.ndarray]:
210
- return self._infer(text_list, style, total_step, speed)
211
-
212
-
213
- def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray:
214
- """
215
- Convert lengths to binary mask.
216
-
217
- Args:
218
- lengths: (B,)
219
- max_len: int
220
-
221
- Returns:
222
- mask: (B, 1, max_len)
223
- """
224
- max_len = max_len or lengths.max()
225
- ids = np.arange(0, max_len)
226
- mask = (ids < np.expand_dims(lengths, axis=1)).astype(np.float32)
227
- return mask.reshape(-1, 1, max_len)
228
-
229
-
230
- def get_latent_mask(
231
- wav_lengths: np.ndarray, base_chunk_size: int, chunk_compress_factor: int
232
- ) -> np.ndarray:
233
- latent_size = base_chunk_size * chunk_compress_factor
234
- latent_lengths = (wav_lengths + latent_size - 1) // latent_size
235
- latent_mask = length_to_mask(latent_lengths)
236
- return latent_mask
237
-
238
-
239
- def load_onnx(
240
- onnx_path: str, opts: ort.SessionOptions, providers: list[str]
241
- ) -> ort.InferenceSession:
242
- return ort.InferenceSession(onnx_path, sess_options=opts, providers=providers)
243
-
244
-
245
- def load_onnx_all(
246
- onnx_dir: str, opts: ort.SessionOptions, providers: list[str]
247
- ) -> tuple[
248
- ort.InferenceSession,
249
- ort.InferenceSession,
250
- ort.InferenceSession,
251
- ort.InferenceSession,
252
- ]:
253
- dp_onnx_path = os.path.join(onnx_dir, "duration_predictor.onnx")
254
- text_enc_onnx_path = os.path.join(onnx_dir, "text_encoder.onnx")
255
- vector_est_onnx_path = os.path.join(onnx_dir, "vector_estimator.onnx")
256
- vocoder_onnx_path = os.path.join(onnx_dir, "vocoder.onnx")
257
-
258
- dp_ort = load_onnx(dp_onnx_path, opts, providers)
259
- text_enc_ort = load_onnx(text_enc_onnx_path, opts, providers)
260
- vector_est_ort = load_onnx(vector_est_onnx_path, opts, providers)
261
- vocoder_ort = load_onnx(vocoder_onnx_path, opts, providers)
262
- return dp_ort, text_enc_ort, vector_est_ort, vocoder_ort
263
-
264
-
265
- def load_cfgs(onnx_dir: str) -> dict:
266
- cfg_path = os.path.join(onnx_dir, "tts.json")
267
- with open(cfg_path, "r") as f:
268
- cfgs = json.load(f)
269
- return cfgs
270
-
271
-
272
- def load_text_processor(onnx_dir: str) -> UnicodeProcessor:
273
- unicode_indexer_path = os.path.join(onnx_dir, "unicode_indexer.json")
274
- text_processor = UnicodeProcessor(unicode_indexer_path)
275
- return text_processor
276
-
277
-
278
- def load_text_to_speech(onnx_dir: str, use_gpu: bool = False) -> TextToSpeech:
279
- opts = ort.SessionOptions()
280
- if use_gpu:
281
- raise NotImplementedError("GPU mode is not fully tested")
282
- else:
283
- providers = ["CPUExecutionProvider"]
284
- print("Using CPU for inference")
285
- cfgs = load_cfgs(onnx_dir)
286
- dp_ort, text_enc_ort, vector_est_ort, vocoder_ort = load_onnx_all(
287
- onnx_dir, opts, providers
288
- )
289
- text_processor = load_text_processor(onnx_dir)
290
- return TextToSpeech(
291
- cfgs, text_processor, dp_ort, text_enc_ort, vector_est_ort, vocoder_ort
292
- )
293
-
294
-
295
- def load_voice_style(voice_style_paths: list[str], verbose: bool = False) -> Style:
296
- bsz = len(voice_style_paths)
297
-
298
- # Read first file to get dimensions
299
- with open(voice_style_paths[0], "r") as f:
300
- first_style = json.load(f)
301
- ttl_dims = first_style["style_ttl"]["dims"]
302
- dp_dims = first_style["style_dp"]["dims"]
303
-
304
- # Pre-allocate arrays with full batch size
305
- ttl_style = np.zeros([bsz, ttl_dims[1], ttl_dims[2]], dtype=np.float32)
306
- dp_style = np.zeros([bsz, dp_dims[1], dp_dims[2]], dtype=np.float32)
307
-
308
- # Fill in the data
309
- for i, voice_style_path in enumerate(voice_style_paths):
310
- with open(voice_style_path, "r") as f:
311
- voice_style = json.load(f)
312
-
313
- ttl_data = np.array(
314
- voice_style["style_ttl"]["data"], dtype=np.float32
315
- ).flatten()
316
- ttl_style[i] = ttl_data.reshape(ttl_dims[1], ttl_dims[2])
317
-
318
- dp_data = np.array(
319
- voice_style["style_dp"]["data"], dtype=np.float32
320
- ).flatten()
321
- dp_style[i] = dp_data.reshape(dp_dims[1], dp_dims[2])
322
-
323
- if verbose:
324
- print(f"Loaded {bsz} voice styles")
325
- return Style(ttl_style, dp_style)
326
-
327
-
328
- @contextmanager
329
- def timer(name: str):
330
- start = time.time()
331
- print(f"{name}...")
332
- yield
333
- print(f" -> {name} completed in {time.time() - start:.2f} sec")
334
-
335
-
336
- def sanitize_filename(text: str, max_len: int) -> str:
337
- """Sanitize filename by replacing non-alphanumeric characters with underscores"""
338
- prefix = text[:max_len]
339
- return re.sub(r"[^a-zA-Z0-9]", "_", prefix)
340
-
341
-
342
- def chunk_text(text: str, max_len: int = 300) -> list[str]:
343
- """
344
- Split text into chunks by paragraphs and sentences.
345
-
346
- Args:
347
- text: Input text to chunk
348
- max_len: Maximum length of each chunk (default: 300)
349
-
350
- Returns:
351
- List of text chunks
352
- """
353
- # Split by paragraph (two or more newlines)
354
- paragraphs = [p.strip() for p in re.split(r"\n\s*\n+", text.strip()) if p.strip()]
355
-
356
- chunks = []
357
-
358
- for paragraph in paragraphs:
359
- paragraph = paragraph.strip()
360
- if not paragraph:
361
- continue
362
-
363
- # Split by sentence boundaries (period, question mark, exclamation mark followed by space)
364
- # But exclude common abbreviations like Mr., Mrs., Dr., etc. and single capital letters like F.
365
- pattern = r"(?<!Mr\.)(?<!Mrs\.)(?<!Ms\.)(?<!Dr\.)(?<!Prof\.)(?<!Sr\.)(?<!Jr\.)(?<!Ph\.D\.)(?<!etc\.)(?<!e\.g\.)(?<!i\.e\.)(?<!vs\.)(?<!Inc\.)(?<!Ltd\.)(?<!Co\.)(?<!Corp\.)(?<!St\.)(?<!Ave\.)(?<!Blvd\.)(?<!\b[A-Z]\.)(?<=[.!?])\s+"
366
- sentences = re.split(pattern, paragraph)
367
-
368
- current_chunk = ""
369
-
370
- for sentence in sentences:
371
- if len(current_chunk) + len(sentence) + 1 <= max_len:
372
- current_chunk += (" " if current_chunk else "") + sentence
373
- else:
374
- if current_chunk:
375
- chunks.append(current_chunk.strip())
376
- current_chunk = sentence
377
-
378
- if current_chunk:
379
- chunks.append(current_chunk.strip())
380
-
381
- return chunks
382
-
383
-
384
- # --- Main Tool Logic ---
385
-
386
- # --- Kokoro State ---
387
  _KOKORO_STATE = {
388
  "initialized": False,
389
  "device": "cpu",
@@ -391,27 +30,15 @@ _KOKORO_STATE = {
391
  "pipelines": {},
392
  }
393
 
394
- # --- Supertonic State ---
395
- _SUPERTONIC_STATE = {
396
- "initialized": False,
397
- "tts": None,
398
- "assets_dir": None,
399
- }
400
-
401
- def _audio_np_to_int16(audio_np: np.ndarray) -> np.ndarray:
402
- audio_clipped = np.clip(audio_np, -1.0, 1.0)
403
- return (audio_clipped * 32767.0).astype(np.int16)
404
-
405
- # --- Kokoro Functions ---
406
 
407
  def get_kokoro_voices() -> list[str]:
408
  try:
409
- if list_repo_files:
410
- files = list_repo_files("hexgrad/Kokoro-82M")
411
- voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")]
412
- voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files]
413
- return sorted(voices) if voices else _get_fallback_voices()
414
- return _get_fallback_voices()
415
  except Exception:
416
  return _get_fallback_voices()
417
 
@@ -453,63 +80,14 @@ def _init_kokoro() -> None:
453
  pass
454
  _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines})
455
 
456
- # --- Supertonic Functions ---
457
-
458
- def _init_supertonic() -> None:
459
- if _SUPERTONIC_STATE["initialized"]:
460
- return
461
-
462
- if snapshot_download is None:
463
- raise RuntimeError("huggingface_hub is not installed.")
464
-
465
- # Use a local assets directory within Nymbo-Tools
466
- # Assuming this file is in Nymbo-Tools/Modules
467
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
468
- assets_dir = os.path.join(base_dir, "assets", "supertonic")
469
-
470
- if not os.path.exists(assets_dir):
471
- print(f"Downloading Supertonic models to {assets_dir}...")
472
- snapshot_download(repo_id="Supertone/supertonic", local_dir=assets_dir)
473
-
474
- onnx_dir = os.path.join(assets_dir, "onnx")
475
- tts = load_text_to_speech(onnx_dir, use_gpu=False)
476
-
477
- _SUPERTONIC_STATE.update({"initialized": True, "tts": tts, "assets_dir": assets_dir})
478
-
479
-
480
- def get_supertonic_voices() -> list[str]:
481
- # We need assets to list voices. If not initialized, try to find them or init.
482
- if not _SUPERTONIC_STATE["initialized"]:
483
- # Check if assets exist without full init
484
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
485
- assets_dir = os.path.join(base_dir, "assets", "supertonic")
486
- if not os.path.exists(assets_dir):
487
- # If we can't list, return a default list or empty
488
- return ["F1", "F2", "M1", "M2"] # Known defaults
489
- else:
490
- assets_dir = _SUPERTONIC_STATE["assets_dir"]
491
-
492
- voice_styles_dir = os.path.join(assets_dir, "voice_styles")
493
- if not os.path.exists(voice_styles_dir):
494
- return ["F1", "F2", "M1", "M2"]
495
-
496
- files = os.listdir(voice_styles_dir)
497
- voices = [f.replace('.json', '') for f in files if f.endswith('.json')]
498
- return sorted(voices)
499
-
500
 
501
  def List_Kokoro_Voices() -> list[str]:
502
  return get_kokoro_voices()
503
 
504
- def List_Supertonic_Voices() -> list[str]:
505
- return get_supertonic_voices()
506
-
507
 
508
  # Single source of truth for the LLM-facing tool description
509
  TOOL_SUMMARY = (
510
- "Synthesize speech from text using Supertonic-66M (default) or Kokoro-82M. "
511
- "Supertonic: faster, supports steps/silence/chunking. "
512
- "Kokoro: slower, supports many languages/accents. "
513
  "Return the generated media to the user in this format `![Alt text](URL)`."
514
  )
515
 
@@ -519,159 +97,104 @@ TOOL_SUMMARY = (
519
  )
520
  def Generate_Speech(
521
  text: Annotated[str, "The text to synthesize (English)."],
522
- model: Annotated[str, "The TTS model to use: 'Supertonic' or 'Kokoro'."] = "Supertonic",
523
- speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.3,
524
- steps: Annotated[int, "Supertonic only. Diffusion steps (1-50). Higher = better quality but slower."] = 5,
525
- voice: Annotated[str, "Voice identifier. Default 'F1' for Supertonic, 'af_heart' for Kokoro."] = "F1",
526
- silence_duration: Annotated[float, "Supertonic only. Silence duration between chunks (0.0-2.0s)."] = 0.3,
527
- max_chunk_size: Annotated[int, "Supertonic only. Max text chunk length (50-1000)."] = 300,
 
 
 
 
 
 
 
 
 
528
  ) -> str:
529
- _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), model=model, speed=speed, voice=voice)
530
-
531
  if not text or not text.strip():
532
  try:
533
  _log_call_end("Generate_Speech", "error=empty text")
534
  finally:
535
  pass
536
  raise gr.Error("Please provide non-empty text to synthesize.")
537
-
538
- model_lower = model.lower()
539
-
540
- # Handle default voice switching if user didn't specify appropriate voice for model
541
- if model_lower == "kokoro" and voice == "F1":
542
- voice = "af_heart"
543
- elif model_lower == "supertonic" and voice == "af_heart":
544
- voice = "F1"
545
-
546
- try:
547
- if model_lower == "kokoro":
548
- return _generate_kokoro(text, speed, voice)
549
- else:
550
- # Default to Supertonic
551
- return _generate_supertonic(text, speed, voice, steps, silence_duration, max_chunk_size)
552
-
553
- except gr.Error as exc:
554
- _log_call_end("Generate_Speech", f"gr_error={str(exc)}")
555
- raise
556
- except Exception as exc: # pylint: disable=broad-except
557
- _log_call_end("Generate_Speech", f"error={str(exc)[:120]}")
558
- raise gr.Error(f"Error during speech generation: {exc}")
559
-
560
-
561
- def _generate_kokoro(text: str, speed: float, voice: str) -> str:
562
  _init_kokoro()
563
  model = _KOKORO_STATE["model"]
564
  pipelines = _KOKORO_STATE["pipelines"]
565
  pipeline = pipelines.get("a")
566
  if pipeline is None:
567
  raise gr.Error("Kokoro English pipeline not initialized.")
568
-
569
  audio_segments = []
570
  pack = pipeline.load_voice(voice)
571
-
572
- segments = list(pipeline(text, voice, speed))
573
- total_segments = len(segments)
574
- for segment_idx, (text_chunk, ps, _) in enumerate(segments):
575
- ref_s = pack[len(ps) - 1]
576
- try:
577
- audio = model(ps, ref_s, float(speed))
578
- audio_segments.append(audio.detach().cpu().numpy())
579
- if total_segments > 10 and (segment_idx + 1) % 5 == 0:
580
- print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...")
581
- except Exception as exc:
582
- raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}")
583
-
584
- if not audio_segments:
585
- raise gr.Error("No audio was generated (empty synthesis result).")
 
 
 
 
 
 
586
 
587
- if len(audio_segments) == 1:
588
- final_audio = audio_segments[0]
589
- else:
590
- final_audio = np.concatenate(audio_segments, axis=0)
591
- if total_segments > 1:
592
- duration = len(final_audio) / 24_000
593
- print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio")
594
-
595
- # Save to file
596
- filename = f"speech_kokoro_{uuid.uuid4().hex[:8]}.wav"
597
- output_path = os.path.join(ROOT_DIR, filename)
598
-
599
- # Normalize to 16-bit PCM
600
- audio_int16 = (final_audio * 32767).astype(np.int16)
601
- scipy.io.wavfile.write(output_path, 24000, audio_int16)
602
-
603
- _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}")
604
- return output_path
605
-
606
-
607
- def _generate_supertonic(text: str, speed: float, voice: str, steps: int, silence_duration: float, max_chunk_size: int) -> str:
608
- _init_supertonic()
609
- tts = _SUPERTONIC_STATE["tts"]
610
- assets_dir = _SUPERTONIC_STATE["assets_dir"]
611
-
612
- voice_path = os.path.join(assets_dir, "voice_styles", f"{voice}.json")
613
- if not os.path.exists(voice_path):
614
- # Fallback or error?
615
- # Try to find if it's just a name mismatch or use default
616
- if not os.path.exists(voice_path):
617
- raise gr.Error(f"Voice style {voice} not found for Supertonic.")
618
-
619
- style = load_voice_style([voice_path])
620
-
621
- sr = tts.sample_rate
622
-
623
- # Supertonic returns a generator of chunks, or we can use __call__ for full audio
624
- # Using __call__ to get full audio for saving
625
- # But __call__ returns (wav_cat, dur_cat)
626
-
627
- wav_cat, _ = tts(text, style, steps, speed, silence_duration, max_chunk_size)
628
-
629
- if wav_cat is None or wav_cat.size == 0:
630
- raise gr.Error("No audio generated.")
631
-
632
- # wav_cat is (1, samples) float32
633
- final_audio = wav_cat.flatten()
634
-
635
- # Save to file
636
- filename = f"speech_supertonic_{uuid.uuid4().hex[:8]}.wav"
637
- output_path = os.path.join(ROOT_DIR, filename)
638
-
639
- audio_int16 = _audio_np_to_int16(final_audio)
640
- scipy.io.wavfile.write(output_path, sr, audio_int16)
641
-
642
- _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/sr:.2f}")
643
- return output_path
644
 
645
 
646
  def build_interface() -> gr.Interface:
647
- kokoro_voices = get_kokoro_voices()
648
- supertonic_voices = get_supertonic_voices()
649
- all_voices = sorted(list(set(kokoro_voices + supertonic_voices)))
650
-
651
  return gr.Interface(
652
  fn=Generate_Speech,
653
  inputs=[
654
- gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4, info="The text to synthesize (English)"),
655
- gr.Dropdown(label="Model", choices=["Supertonic", "Kokoro"], value="Supertonic", info="The TTS model to use"),
656
- gr.Slider(minimum=0.5, maximum=2.0, value=1.3, step=0.1, label="Speed", info="Speech speed multiplier (1.0 = normal)"),
657
- gr.Slider(minimum=1, maximum=50, value=5, step=1, label="Steps", info="Supertonic only: Diffusion steps (1-50)"),
658
  gr.Dropdown(
659
  label="Voice",
660
- choices=all_voices,
661
- value="F1",
662
- info="Select voice (F1/F2/M1/M2 for Supertonic, others for Kokoro)",
663
  ),
664
- gr.Slider(minimum=0.0, maximum=2.0, value=0.3, step=0.1, label="Silence Duration", info="Supertonic only: Silence duration between chunks"),
665
- gr.Slider(minimum=50, maximum=1000, value=300, step=10, label="Max Chunk Size", info="Supertonic only: Max text chunk length"),
666
  ],
667
  outputs=gr.Audio(label="Audio", type="filepath", format="wav"),
668
  title="Generate Speech",
669
  description=(
670
- "<div style=\"text-align:center\">Generate speech with Supertonic-66M or Kokoro-82M. Runs on CPU.</div>"
671
  ),
672
  api_description=TOOL_SUMMARY,
673
  flagging_mode="never",
674
  )
675
 
676
 
677
- __all__ = ["Generate_Speech", "List_Kokoro_Voices", "List_Supertonic_Voices", "build_interface"]
 
1
  from __future__ import annotations
2
 
3
+ import numpy as np
4
+ import gradio as gr
5
  import os
 
 
 
 
 
6
  import uuid
 
7
  import wave
8
+ from .File_System import ROOT_DIR
9
 
10
+ from typing import Annotated
 
 
 
11
 
 
12
  from app import _log_call_end, _log_call_start, _truncate_for_log
13
  from ._docstrings import autodoc
14
 
 
23
  KModel = None # type: ignore
24
  KPipeline = None # type: ignore
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  _KOKORO_STATE = {
27
  "initialized": False,
28
  "device": "cpu",
 
30
  "pipelines": {},
31
  }
32
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
  def get_kokoro_voices() -> list[str]:
35
  try:
36
+ from huggingface_hub import list_repo_files
37
+
38
+ files = list_repo_files("hexgrad/Kokoro-82M")
39
+ voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")]
40
+ voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files]
41
+ return sorted(voices) if voices else _get_fallback_voices()
42
  except Exception:
43
  return _get_fallback_voices()
44
 
 
80
  pass
81
  _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines})
82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
  def List_Kokoro_Voices() -> list[str]:
85
  return get_kokoro_voices()
86
 
 
 
 
87
 
88
  # Single source of truth for the LLM-facing tool description
89
  TOOL_SUMMARY = (
90
+ "Synthesize speech from text using Kokoro-82M; choose voice and speed; returns (sample_rate, waveform). "
 
 
91
  "Return the generated media to the user in this format `![Alt text](URL)`."
92
  )
93
 
 
97
  )
98
  def Generate_Speech(
99
  text: Annotated[str, "The text to synthesize (English)."],
100
+ speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25,
101
+ voice: Annotated[
102
+ str,
103
+ (
104
+ "Voice identifier from 54 available options. "
105
+ "Voice Legend: af=American female, am=American male, bf=British female, bm=British male, ef=European female, "
106
+ "em=European male, hf=Hindi female, hm=Hindi male, if=Italian female, im=Italian male, jf=Japanese female, "
107
+ "jm=Japanese male, pf=Portuguese female, pm=Portuguese male, zf=Chinese female, zm=Chinese male, ff=French female. "
108
+ "All Voices: af_alloy, af_aoede, af_bella, af_heart, af_jessica, af_kore, af_nicole, af_nova, af_river, af_sarah, af_sky, "
109
+ "am_adam, am_echo, am_eric, am_fenrir, am_liam, am_michael, am_onyx, am_puck, am_santa, bf_alice, bf_emma, bf_isabella, "
110
+ "bf_lily, bm_daniel, bm_fable, bm_george, bm_lewis, ef_dora, em_alex, em_santa, ff_siwis, hf_alpha, hf_beta, hm_omega, hm_psi, "
111
+ "if_sara, im_nicola, jf_alpha, jf_gongitsune, jf_nezumi, jf_tebukuro, jm_kumo, pf_dora, pm_alex, pm_santa, zf_xiaobei, "
112
+ "zf_xiaoni, zf_xiaoxiao, zf_xiaoyi, zm_yunjian, zm_yunxi, zm_yunxia, zm_yunyang."
113
+ ),
114
+ ] = "af_heart",
115
  ) -> str:
116
+ _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice)
 
117
  if not text or not text.strip():
118
  try:
119
  _log_call_end("Generate_Speech", "error=empty text")
120
  finally:
121
  pass
122
  raise gr.Error("Please provide non-empty text to synthesize.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  _init_kokoro()
124
  model = _KOKORO_STATE["model"]
125
  pipelines = _KOKORO_STATE["pipelines"]
126
  pipeline = pipelines.get("a")
127
  if pipeline is None:
128
  raise gr.Error("Kokoro English pipeline not initialized.")
 
129
  audio_segments = []
130
  pack = pipeline.load_voice(voice)
131
+ try:
132
+ segments = list(pipeline(text, voice, speed))
133
+ total_segments = len(segments)
134
+ for segment_idx, (text_chunk, ps, _) in enumerate(segments):
135
+ ref_s = pack[len(ps) - 1]
136
+ try:
137
+ audio = model(ps, ref_s, float(speed))
138
+ audio_segments.append(audio.detach().cpu().numpy())
139
+ if total_segments > 10 and (segment_idx + 1) % 5 == 0:
140
+ print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...")
141
+ except Exception as exc:
142
+ raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}")
143
+ if not audio_segments:
144
+ raise gr.Error("No audio was generated (empty synthesis result).")
145
+ if len(audio_segments) == 1:
146
+ final_audio = audio_segments[0]
147
+ else:
148
+ final_audio = np.concatenate(audio_segments, axis=0)
149
+ if total_segments > 1:
150
+ duration = len(final_audio) / 24_000
151
+ print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio")
152
 
153
+ # Save to file
154
+ filename = f"speech_{uuid.uuid4().hex[:8]}.wav"
155
+ output_path = os.path.join(ROOT_DIR, filename)
156
+
157
+ # Normalize to 16-bit PCM
158
+ # final_audio is float32, likely in [-1, 1]. Scale to int16 range.
159
+ audio_int16 = (final_audio * 32767).astype(np.int16)
160
+ with wave.open(output_path, 'wb') as wf:
161
+ wf.setnchannels(1)
162
+ wf.setsampwidth(2) # 16-bit = 2 bytes
163
+ wf.setframerate(24000)
164
+ wf.writeframes(audio_int16.tobytes())
165
+
166
+ _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}")
167
+ return output_path
168
+ except gr.Error as exc:
169
+ _log_call_end("Generate_Speech", f"gr_error={str(exc)}")
170
+ raise
171
+ except Exception as exc: # pylint: disable=broad-except
172
+ _log_call_end("Generate_Speech", f"error={str(exc)[:120]}")
173
+ raise gr.Error(f"Error during speech generation: {exc}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
 
175
 
176
  def build_interface() -> gr.Interface:
177
+ available_voices = get_kokoro_voices()
 
 
 
178
  return gr.Interface(
179
  fn=Generate_Speech,
180
  inputs=[
181
+ gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4),
182
+ gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"),
 
 
183
  gr.Dropdown(
184
  label="Voice",
185
+ choices=available_voices,
186
+ value="af_heart",
187
+ info="Select from 54 available voices across multiple languages and accents",
188
  ),
 
 
189
  ],
190
  outputs=gr.Audio(label="Audio", type="filepath", format="wav"),
191
  title="Generate Speech",
192
  description=(
193
+ "<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>"
194
  ),
195
  api_description=TOOL_SUMMARY,
196
  flagging_mode="never",
197
  )
198
 
199
 
200
+ __all__ = ["Generate_Speech", "List_Kokoro_Voices", "build_interface"]