Nymbo commited on
Commit
379c8b8
·
verified ·
1 Parent(s): 3a5757f

adding Supertonic-66M TTS model to `Generate_Speech` tool

Browse files
Files changed (1) hide show
  1. Modules/Generate_Speech.py +621 -88
Modules/Generate_Speech.py CHANGED
@@ -1,14 +1,22 @@
1
  from __future__ import annotations
2
 
3
- import numpy as np
4
- import gradio as gr
5
  import os
 
 
 
 
 
6
  import uuid
7
- import scipy.io.wavfile
8
- from .File_System import ROOT_DIR
9
 
10
- from typing import Annotated
 
 
 
11
 
 
12
  from app import _log_call_end, _log_call_start, _truncate_for_log
13
  from ._docstrings import autodoc
14
 
@@ -23,6 +31,359 @@ except Exception: # pragma: no cover
23
  KModel = None # type: ignore
24
  KPipeline = None # type: ignore
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  _KOKORO_STATE = {
27
  "initialized": False,
28
  "device": "cpu",
@@ -30,15 +391,27 @@ _KOKORO_STATE = {
30
  "pipelines": {},
31
  }
32
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
  def get_kokoro_voices() -> list[str]:
35
  try:
36
- from huggingface_hub import list_repo_files
37
-
38
- files = list_repo_files("hexgrad/Kokoro-82M")
39
- voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")]
40
- voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files]
41
- return sorted(voices) if voices else _get_fallback_voices()
42
  except Exception:
43
  return _get_fallback_voices()
44
 
@@ -80,14 +453,63 @@ def _init_kokoro() -> None:
80
  pass
81
  _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines})
82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
  def List_Kokoro_Voices() -> list[str]:
85
  return get_kokoro_voices()
86
 
 
 
 
87
 
88
  # Single source of truth for the LLM-facing tool description
89
  TOOL_SUMMARY = (
90
- "Synthesize speech from text using Kokoro-82M; choose voice and speed; returns (sample_rate, waveform). "
 
 
91
  "Return the generated media to the user in this format `![Alt text](URL)`."
92
  )
93
 
@@ -97,100 +519,211 @@ TOOL_SUMMARY = (
97
  )
98
  def Generate_Speech(
99
  text: Annotated[str, "The text to synthesize (English)."],
100
- speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25,
101
- voice: Annotated[
102
- str,
103
- (
104
- "Voice identifier from 54 available options. "
105
- "Voice Legend: af=American female, am=American male, bf=British female, bm=British male, ef=European female, "
106
- "em=European male, hf=Hindi female, hm=Hindi male, if=Italian female, im=Italian male, jf=Japanese female, "
107
- "jm=Japanese male, pf=Portuguese female, pm=Portuguese male, zf=Chinese female, zm=Chinese male, ff=French female. "
108
- "All Voices: af_alloy, af_aoede, af_bella, af_heart, af_jessica, af_kore, af_nicole, af_nova, af_river, af_sarah, af_sky, "
109
- "am_adam, am_echo, am_eric, am_fenrir, am_liam, am_michael, am_onyx, am_puck, am_santa, bf_alice, bf_emma, bf_isabella, "
110
- "bf_lily, bm_daniel, bm_fable, bm_george, bm_lewis, ef_dora, em_alex, em_santa, ff_siwis, hf_alpha, hf_beta, hm_omega, hm_psi, "
111
- "if_sara, im_nicola, jf_alpha, jf_gongitsune, jf_nezumi, jf_tebukuro, jm_kumo, pf_dora, pm_alex, pm_santa, zf_xiaobei, "
112
- "zf_xiaoni, zf_xiaoxiao, zf_xiaoyi, zm_yunjian, zm_yunxi, zm_yunxia, zm_yunyang."
113
- ),
114
- ] = "af_heart",
115
  ) -> str:
116
- _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice)
 
117
  if not text or not text.strip():
118
  try:
119
  _log_call_end("Generate_Speech", "error=empty text")
120
  finally:
121
  pass
122
  raise gr.Error("Please provide non-empty text to synthesize.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  _init_kokoro()
124
  model = _KOKORO_STATE["model"]
125
  pipelines = _KOKORO_STATE["pipelines"]
126
  pipeline = pipelines.get("a")
127
  if pipeline is None:
128
  raise gr.Error("Kokoro English pipeline not initialized.")
 
129
  audio_segments = []
130
  pack = pipeline.load_voice(voice)
131
- try:
132
- segments = list(pipeline(text, voice, speed))
133
- total_segments = len(segments)
134
- for segment_idx, (text_chunk, ps, _) in enumerate(segments):
135
- ref_s = pack[len(ps) - 1]
136
- try:
137
- audio = model(ps, ref_s, float(speed))
138
- audio_segments.append(audio.detach().cpu().numpy())
139
- if total_segments > 10 and (segment_idx + 1) % 5 == 0:
140
- print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...")
141
- except Exception as exc:
142
- raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}")
143
- if not audio_segments:
144
- raise gr.Error("No audio was generated (empty synthesis result).")
145
- if len(audio_segments) == 1:
146
- final_audio = audio_segments[0]
147
- else:
148
- final_audio = np.concatenate(audio_segments, axis=0)
149
- if total_segments > 1:
150
- duration = len(final_audio) / 24_000
151
- print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio")
152
-
153
- # Save to file
154
- filename = f"speech_{uuid.uuid4().hex[:8]}.wav"
155
- output_path = os.path.join(ROOT_DIR, filename)
156
 
157
- # Normalize to 16-bit PCM
158
- # final_audio is float32, likely in [-1, 1]. Scale to int16 range.
159
- audio_int16 = (final_audio * 32767).astype(np.int16)
160
- scipy.io.wavfile.write(output_path, 24000, audio_int16)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
162
- _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}")
163
- return output_path
164
- except gr.Error as exc:
165
- _log_call_end("Generate_Speech", f"gr_error={str(exc)}")
166
- raise
167
- except Exception as exc: # pylint: disable=broad-except
168
- _log_call_end("Generate_Speech", f"error={str(exc)[:120]}")
169
- raise gr.Error(f"Error during speech generation: {exc}")
 
 
 
 
 
 
 
 
 
 
 
 
170
 
 
 
 
171
 
172
- def build_interface() -> gr.Interface:
173
- available_voices = get_kokoro_voices()
174
- return gr.Interface(
175
- fn=Generate_Speech,
176
- inputs=[
177
- gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4),
178
- gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"),
179
- gr.Dropdown(
180
- label="Voice",
181
- choices=available_voices,
182
- value="af_heart",
183
- info="Select from 54 available voices across multiple languages and accents",
184
- ),
185
- ],
186
- outputs=gr.Audio(label="Audio", type="numpy", format="wav", buttons=["download"]),
187
- title="Generate Speech",
188
- description=(
189
- "<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>"
190
- ),
191
- api_description=TOOL_SUMMARY,
192
- flagging_mode="never",
193
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
194
 
195
 
196
- __all__ = ["Generate_Speech", "List_Kokoro_Voices", "build_interface"]
 
1
  from __future__ import annotations
2
 
3
+ import json
 
4
  import os
5
+ import time
6
+ from contextlib import contextmanager
7
+ from typing import Optional, Annotated
8
+ from unicodedata import normalize
9
+ import re
10
  import uuid
11
+ import io
12
+ import wave
13
 
14
+ import numpy as np
15
+ import onnxruntime as ort
16
+ import scipy.io.wavfile
17
+ import gradio as gr
18
 
19
+ from .File_System import ROOT_DIR
20
  from app import _log_call_end, _log_call_start, _truncate_for_log
21
  from ._docstrings import autodoc
22
 
 
31
  KModel = None # type: ignore
32
  KPipeline = None # type: ignore
33
 
34
+ try:
35
+ from huggingface_hub import snapshot_download, list_repo_files
36
+ except ImportError:
37
+ snapshot_download = None
38
+ list_repo_files = None
39
+
40
+
41
+ # --- Supertonic Helper Classes & Functions ---
42
+
43
+ class UnicodeProcessor:
44
+ def __init__(self, unicode_indexer_path: str):
45
+ with open(unicode_indexer_path, "r") as f:
46
+ self.indexer = json.load(f)
47
+
48
+ def _preprocess_text(self, text: str) -> str:
49
+ # TODO: add more preprocessing
50
+ text = normalize("NFKD", text)
51
+ return text
52
+
53
+ def _get_text_mask(self, text_ids_lengths: np.ndarray) -> np.ndarray:
54
+ text_mask = length_to_mask(text_ids_lengths)
55
+ return text_mask
56
+
57
+ def _text_to_unicode_values(self, text: str) -> np.ndarray:
58
+ unicode_values = np.array(
59
+ [ord(char) for char in text], dtype=np.uint16
60
+ ) # 2 bytes
61
+ return unicode_values
62
+
63
+ def __call__(self, text_list: list[str]) -> tuple[np.ndarray, np.ndarray]:
64
+ text_list = [self._preprocess_text(t) for t in text_list]
65
+ text_ids_lengths = np.array([len(text) for text in text_list], dtype=np.int64)
66
+ text_ids = np.zeros((len(text_list), text_ids_lengths.max()), dtype=np.int64)
67
+ for i, text in enumerate(text_list):
68
+ unicode_vals = self._text_to_unicode_values(text)
69
+ text_ids[i, : len(unicode_vals)] = np.array(
70
+ [self.indexer[val] for val in unicode_vals], dtype=np.int64
71
+ )
72
+ text_mask = self._get_text_mask(text_ids_lengths)
73
+ return text_ids, text_mask
74
+
75
+
76
+ class Style:
77
+ def __init__(self, style_ttl_onnx: np.ndarray, style_dp_onnx: np.ndarray):
78
+ self.ttl = style_ttl_onnx
79
+ self.dp = style_dp_onnx
80
+
81
+
82
+ class TextToSpeech:
83
+ def __init__(
84
+ self,
85
+ cfgs: dict,
86
+ text_processor: UnicodeProcessor,
87
+ dp_ort: ort.InferenceSession,
88
+ text_enc_ort: ort.InferenceSession,
89
+ vector_est_ort: ort.InferenceSession,
90
+ vocoder_ort: ort.InferenceSession,
91
+ ):
92
+ self.cfgs = cfgs
93
+ self.text_processor = text_processor
94
+ self.dp_ort = dp_ort
95
+ self.text_enc_ort = text_enc_ort
96
+ self.vector_est_ort = vector_est_ort
97
+ self.vocoder_ort = vocoder_ort
98
+ self.sample_rate = cfgs["ae"]["sample_rate"]
99
+ self.base_chunk_size = cfgs["ae"]["base_chunk_size"]
100
+ self.chunk_compress_factor = cfgs["ttl"]["chunk_compress_factor"]
101
+ self.ldim = cfgs["ttl"]["latent_dim"]
102
+
103
+ def sample_noisy_latent(
104
+ self, duration: np.ndarray
105
+ ) -> tuple[np.ndarray, np.ndarray]:
106
+ bsz = len(duration)
107
+ wav_len_max = duration.max() * self.sample_rate
108
+ wav_lengths = (duration * self.sample_rate).astype(np.int64)
109
+ chunk_size = self.base_chunk_size * self.chunk_compress_factor
110
+ latent_len = ((wav_len_max + chunk_size - 1) / chunk_size).astype(np.int32)
111
+ latent_dim = self.ldim * self.chunk_compress_factor
112
+ noisy_latent = np.random.randn(bsz, latent_dim, latent_len).astype(np.float32)
113
+ latent_mask = get_latent_mask(
114
+ wav_lengths, self.base_chunk_size, self.chunk_compress_factor
115
+ )
116
+
117
+ noisy_latent = noisy_latent * latent_mask
118
+ return noisy_latent, latent_mask
119
+
120
+ def _infer(
121
+ self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05
122
+ ) -> tuple[np.ndarray, np.ndarray]:
123
+ assert (
124
+ len(text_list) == style.ttl.shape[0]
125
+ ), "Number of texts must match number of style vectors"
126
+ bsz = len(text_list)
127
+ text_ids, text_mask = self.text_processor(text_list)
128
+ dur_onnx, *_ = self.dp_ort.run(
129
+ None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask}
130
+ )
131
+ dur_onnx = dur_onnx / speed
132
+ text_emb_onnx, *_ = self.text_enc_ort.run(
133
+ None,
134
+ {"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask},
135
+ ) # dur_onnx: [bsz]
136
+ xt, latent_mask = self.sample_noisy_latent(dur_onnx)
137
+ total_step_np = np.array([total_step] * bsz, dtype=np.float32)
138
+ for step in range(total_step):
139
+ current_step = np.array([step] * bsz, dtype=np.float32)
140
+ xt, *_ = self.vector_est_ort.run(
141
+ None,
142
+ {
143
+ "noisy_latent": xt,
144
+ "text_emb": text_emb_onnx,
145
+ "style_ttl": style.ttl,
146
+ "text_mask": text_mask,
147
+ "latent_mask": latent_mask,
148
+ "current_step": current_step,
149
+ "total_step": total_step_np,
150
+ },
151
+ )
152
+ wav, *_ = self.vocoder_ort.run(None, {"latent": xt})
153
+ return wav, dur_onnx
154
+
155
+ def __call__(
156
+ self,
157
+ text: str,
158
+ style: Style,
159
+ total_step: int,
160
+ speed: float = 1.05,
161
+ silence_duration: float = 0.3,
162
+ max_len: int = 300,
163
+ ) -> tuple[np.ndarray, np.ndarray]:
164
+ assert (
165
+ style.ttl.shape[0] == 1
166
+ ), "Single speaker text to speech only supports single style"
167
+ text_list = chunk_text(text, max_len=max_len)
168
+ wav_cat = None
169
+ dur_cat = None
170
+ for text in text_list:
171
+ wav, dur_onnx = self._infer([text], style, total_step, speed)
172
+ if wav_cat is None:
173
+ wav_cat = wav
174
+ dur_cat = dur_onnx
175
+ else:
176
+ silence = np.zeros(
177
+ (1, int(silence_duration * self.sample_rate)), dtype=np.float32
178
+ )
179
+ wav_cat = np.concatenate([wav_cat, silence, wav], axis=1)
180
+ dur_cat += dur_onnx + silence_duration
181
+ return wav_cat, dur_cat
182
+
183
+ def stream(
184
+ self,
185
+ text: str,
186
+ style: Style,
187
+ total_step: int,
188
+ speed: float = 1.05,
189
+ silence_duration: float = 0.3,
190
+ max_len: int = 300,
191
+ ):
192
+ assert (
193
+ style.ttl.shape[0] == 1
194
+ ), "Single speaker text to speech only supports single style"
195
+ text_list = chunk_text(text, max_len=max_len)
196
+
197
+ for i, text in enumerate(text_list):
198
+ wav, _ = self._infer([text], style, total_step, speed)
199
+ yield wav.flatten()
200
+
201
+ if i < len(text_list) - 1:
202
+ silence = np.zeros(
203
+ (int(silence_duration * self.sample_rate),), dtype=np.float32
204
+ )
205
+ yield silence
206
+
207
+ def batch(
208
+ self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05
209
+ ) -> tuple[np.ndarray, np.ndarray]:
210
+ return self._infer(text_list, style, total_step, speed)
211
+
212
+
213
+ def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray:
214
+ """
215
+ Convert lengths to binary mask.
216
+
217
+ Args:
218
+ lengths: (B,)
219
+ max_len: int
220
+
221
+ Returns:
222
+ mask: (B, 1, max_len)
223
+ """
224
+ max_len = max_len or lengths.max()
225
+ ids = np.arange(0, max_len)
226
+ mask = (ids < np.expand_dims(lengths, axis=1)).astype(np.float32)
227
+ return mask.reshape(-1, 1, max_len)
228
+
229
+
230
+ def get_latent_mask(
231
+ wav_lengths: np.ndarray, base_chunk_size: int, chunk_compress_factor: int
232
+ ) -> np.ndarray:
233
+ latent_size = base_chunk_size * chunk_compress_factor
234
+ latent_lengths = (wav_lengths + latent_size - 1) // latent_size
235
+ latent_mask = length_to_mask(latent_lengths)
236
+ return latent_mask
237
+
238
+
239
+ def load_onnx(
240
+ onnx_path: str, opts: ort.SessionOptions, providers: list[str]
241
+ ) -> ort.InferenceSession:
242
+ return ort.InferenceSession(onnx_path, sess_options=opts, providers=providers)
243
+
244
+
245
+ def load_onnx_all(
246
+ onnx_dir: str, opts: ort.SessionOptions, providers: list[str]
247
+ ) -> tuple[
248
+ ort.InferenceSession,
249
+ ort.InferenceSession,
250
+ ort.InferenceSession,
251
+ ort.InferenceSession,
252
+ ]:
253
+ dp_onnx_path = os.path.join(onnx_dir, "duration_predictor.onnx")
254
+ text_enc_onnx_path = os.path.join(onnx_dir, "text_encoder.onnx")
255
+ vector_est_onnx_path = os.path.join(onnx_dir, "vector_estimator.onnx")
256
+ vocoder_onnx_path = os.path.join(onnx_dir, "vocoder.onnx")
257
+
258
+ dp_ort = load_onnx(dp_onnx_path, opts, providers)
259
+ text_enc_ort = load_onnx(text_enc_onnx_path, opts, providers)
260
+ vector_est_ort = load_onnx(vector_est_onnx_path, opts, providers)
261
+ vocoder_ort = load_onnx(vocoder_onnx_path, opts, providers)
262
+ return dp_ort, text_enc_ort, vector_est_ort, vocoder_ort
263
+
264
+
265
+ def load_cfgs(onnx_dir: str) -> dict:
266
+ cfg_path = os.path.join(onnx_dir, "tts.json")
267
+ with open(cfg_path, "r") as f:
268
+ cfgs = json.load(f)
269
+ return cfgs
270
+
271
+
272
+ def load_text_processor(onnx_dir: str) -> UnicodeProcessor:
273
+ unicode_indexer_path = os.path.join(onnx_dir, "unicode_indexer.json")
274
+ text_processor = UnicodeProcessor(unicode_indexer_path)
275
+ return text_processor
276
+
277
+
278
+ def load_text_to_speech(onnx_dir: str, use_gpu: bool = False) -> TextToSpeech:
279
+ opts = ort.SessionOptions()
280
+ if use_gpu:
281
+ raise NotImplementedError("GPU mode is not fully tested")
282
+ else:
283
+ providers = ["CPUExecutionProvider"]
284
+ print("Using CPU for inference")
285
+ cfgs = load_cfgs(onnx_dir)
286
+ dp_ort, text_enc_ort, vector_est_ort, vocoder_ort = load_onnx_all(
287
+ onnx_dir, opts, providers
288
+ )
289
+ text_processor = load_text_processor(onnx_dir)
290
+ return TextToSpeech(
291
+ cfgs, text_processor, dp_ort, text_enc_ort, vector_est_ort, vocoder_ort
292
+ )
293
+
294
+
295
+ def load_voice_style(voice_style_paths: list[str], verbose: bool = False) -> Style:
296
+ bsz = len(voice_style_paths)
297
+
298
+ # Read first file to get dimensions
299
+ with open(voice_style_paths[0], "r") as f:
300
+ first_style = json.load(f)
301
+ ttl_dims = first_style["style_ttl"]["dims"]
302
+ dp_dims = first_style["style_dp"]["dims"]
303
+
304
+ # Pre-allocate arrays with full batch size
305
+ ttl_style = np.zeros([bsz, ttl_dims[1], ttl_dims[2]], dtype=np.float32)
306
+ dp_style = np.zeros([bsz, dp_dims[1], dp_dims[2]], dtype=np.float32)
307
+
308
+ # Fill in the data
309
+ for i, voice_style_path in enumerate(voice_style_paths):
310
+ with open(voice_style_path, "r") as f:
311
+ voice_style = json.load(f)
312
+
313
+ ttl_data = np.array(
314
+ voice_style["style_ttl"]["data"], dtype=np.float32
315
+ ).flatten()
316
+ ttl_style[i] = ttl_data.reshape(ttl_dims[1], ttl_dims[2])
317
+
318
+ dp_data = np.array(
319
+ voice_style["style_dp"]["data"], dtype=np.float32
320
+ ).flatten()
321
+ dp_style[i] = dp_data.reshape(dp_dims[1], dp_dims[2])
322
+
323
+ if verbose:
324
+ print(f"Loaded {bsz} voice styles")
325
+ return Style(ttl_style, dp_style)
326
+
327
+
328
+ @contextmanager
329
+ def timer(name: str):
330
+ start = time.time()
331
+ print(f"{name}...")
332
+ yield
333
+ print(f" -> {name} completed in {time.time() - start:.2f} sec")
334
+
335
+
336
+ def sanitize_filename(text: str, max_len: int) -> str:
337
+ """Sanitize filename by replacing non-alphanumeric characters with underscores"""
338
+ prefix = text[:max_len]
339
+ return re.sub(r"[^a-zA-Z0-9]", "_", prefix)
340
+
341
+
342
+ def chunk_text(text: str, max_len: int = 300) -> list[str]:
343
+ """
344
+ Split text into chunks by paragraphs and sentences.
345
+
346
+ Args:
347
+ text: Input text to chunk
348
+ max_len: Maximum length of each chunk (default: 300)
349
+
350
+ Returns:
351
+ List of text chunks
352
+ """
353
+ # Split by paragraph (two or more newlines)
354
+ paragraphs = [p.strip() for p in re.split(r"\n\s*\n+", text.strip()) if p.strip()]
355
+
356
+ chunks = []
357
+
358
+ for paragraph in paragraphs:
359
+ paragraph = paragraph.strip()
360
+ if not paragraph:
361
+ continue
362
+
363
+ # Split by sentence boundaries (period, question mark, exclamation mark followed by space)
364
+ # But exclude common abbreviations like Mr., Mrs., Dr., etc. and single capital letters like F.
365
+ pattern = r"(?<!Mr\.)(?<!Mrs\.)(?<!Ms\.)(?<!Dr\.)(?<!Prof\.)(?<!Sr\.)(?<!Jr\.)(?<!Ph\.D\.)(?<!etc\.)(?<!e\.g\.)(?<!i\.e\.)(?<!vs\.)(?<!Inc\.)(?<!Ltd\.)(?<!Co\.)(?<!Corp\.)(?<!St\.)(?<!Ave\.)(?<!Blvd\.)(?<!\b[A-Z]\.)(?<=[.!?])\s+"
366
+ sentences = re.split(pattern, paragraph)
367
+
368
+ current_chunk = ""
369
+
370
+ for sentence in sentences:
371
+ if len(current_chunk) + len(sentence) + 1 <= max_len:
372
+ current_chunk += (" " if current_chunk else "") + sentence
373
+ else:
374
+ if current_chunk:
375
+ chunks.append(current_chunk.strip())
376
+ current_chunk = sentence
377
+
378
+ if current_chunk:
379
+ chunks.append(current_chunk.strip())
380
+
381
+ return chunks
382
+
383
+
384
+ # --- Main Tool Logic ---
385
+
386
+ # --- Kokoro State ---
387
  _KOKORO_STATE = {
388
  "initialized": False,
389
  "device": "cpu",
 
391
  "pipelines": {},
392
  }
393
 
394
+ # --- Supertonic State ---
395
+ _SUPERTONIC_STATE = {
396
+ "initialized": False,
397
+ "tts": None,
398
+ "assets_dir": None,
399
+ }
400
+
401
+ def _audio_np_to_int16(audio_np: np.ndarray) -> np.ndarray:
402
+ audio_clipped = np.clip(audio_np, -1.0, 1.0)
403
+ return (audio_clipped * 32767.0).astype(np.int16)
404
+
405
+ # --- Kokoro Functions ---
406
 
407
  def get_kokoro_voices() -> list[str]:
408
  try:
409
+ if list_repo_files:
410
+ files = list_repo_files("hexgrad/Kokoro-82M")
411
+ voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")]
412
+ voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files]
413
+ return sorted(voices) if voices else _get_fallback_voices()
414
+ return _get_fallback_voices()
415
  except Exception:
416
  return _get_fallback_voices()
417
 
 
453
  pass
454
  _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines})
455
 
456
+ # --- Supertonic Functions ---
457
+
458
+ def _init_supertonic() -> None:
459
+ if _SUPERTONIC_STATE["initialized"]:
460
+ return
461
+
462
+ if snapshot_download is None:
463
+ raise RuntimeError("huggingface_hub is not installed.")
464
+
465
+ # Use a local assets directory within Nymbo-Tools
466
+ # Assuming this file is in Nymbo-Tools/Modules
467
+ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
468
+ assets_dir = os.path.join(base_dir, "assets", "supertonic")
469
+
470
+ if not os.path.exists(assets_dir):
471
+ print(f"Downloading Supertonic models to {assets_dir}...")
472
+ snapshot_download(repo_id="Supertone/supertonic", local_dir=assets_dir)
473
+
474
+ onnx_dir = os.path.join(assets_dir, "onnx")
475
+ tts = load_text_to_speech(onnx_dir, use_gpu=False)
476
+
477
+ _SUPERTONIC_STATE.update({"initialized": True, "tts": tts, "assets_dir": assets_dir})
478
+
479
+
480
+ def get_supertonic_voices() -> list[str]:
481
+ # We need assets to list voices. If not initialized, try to find them or init.
482
+ if not _SUPERTONIC_STATE["initialized"]:
483
+ # Check if assets exist without full init
484
+ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
485
+ assets_dir = os.path.join(base_dir, "assets", "supertonic")
486
+ if not os.path.exists(assets_dir):
487
+ # If we can't list, return a default list or empty
488
+ return ["F1", "F2", "M1", "M2"] # Known defaults
489
+ else:
490
+ assets_dir = _SUPERTONIC_STATE["assets_dir"]
491
+
492
+ voice_styles_dir = os.path.join(assets_dir, "voice_styles")
493
+ if not os.path.exists(voice_styles_dir):
494
+ return ["F1", "F2", "M1", "M2"]
495
+
496
+ files = os.listdir(voice_styles_dir)
497
+ voices = [f.replace('.json', '') for f in files if f.endswith('.json')]
498
+ return sorted(voices)
499
+
500
 
501
  def List_Kokoro_Voices() -> list[str]:
502
  return get_kokoro_voices()
503
 
504
+ def List_Supertonic_Voices() -> list[str]:
505
+ return get_supertonic_voices()
506
+
507
 
508
  # Single source of truth for the LLM-facing tool description
509
  TOOL_SUMMARY = (
510
+ "Synthesize speech from text using Supertonic (default) or Kokoro-82M. "
511
+ "Supertonic: high quality, slower, supports steps/silence/chunking. Default voice 'F1'. "
512
+ "Kokoro: faster, supports many languages/accents. Default voice 'af_heart'. "
513
  "Return the generated media to the user in this format `![Alt text](URL)`."
514
  )
515
 
 
519
  )
520
  def Generate_Speech(
521
  text: Annotated[str, "The text to synthesize (English)."],
522
+ model: Annotated[str, "The TTS model to use: 'Supertonic' or 'Kokoro'."] = "Supertonic",
523
+ speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.3,
524
+ voice: Annotated[str, "Voice identifier. Default 'F1' for Supertonic, 'af_heart' for Kokoro."] = "F1",
525
+ steps: Annotated[int, "Diffusion steps for Supertonic (1-50). Higher = better quality but slower. Ignored for Kokoro."] = 5,
526
+ silence_duration: Annotated[float, "Silence duration between chunks for Supertonic (0.0-2.0s). Ignored for Kokoro."] = 0.3,
527
+ max_chunk_size: Annotated[int, "Max text chunk length for Supertonic (50-1000). Ignored for Kokoro."] = 300,
 
 
 
 
 
 
 
 
 
528
  ) -> str:
529
+ _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), model=model, speed=speed, voice=voice)
530
+
531
  if not text or not text.strip():
532
  try:
533
  _log_call_end("Generate_Speech", "error=empty text")
534
  finally:
535
  pass
536
  raise gr.Error("Please provide non-empty text to synthesize.")
537
+
538
+ model_lower = model.lower()
539
+
540
+ # Handle default voice switching if user didn't specify appropriate voice for model
541
+ if model_lower == "kokoro" and voice == "F1":
542
+ voice = "af_heart"
543
+ elif model_lower == "supertonic" and voice == "af_heart":
544
+ voice = "F1"
545
+
546
+ try:
547
+ if model_lower == "kokoro":
548
+ return _generate_kokoro(text, speed, voice)
549
+ else:
550
+ # Default to Supertonic
551
+ return _generate_supertonic(text, speed, voice, steps, silence_duration, max_chunk_size)
552
+
553
+ except gr.Error as exc:
554
+ _log_call_end("Generate_Speech", f"gr_error={str(exc)}")
555
+ raise
556
+ except Exception as exc: # pylint: disable=broad-except
557
+ _log_call_end("Generate_Speech", f"error={str(exc)[:120]}")
558
+ raise gr.Error(f"Error during speech generation: {exc}")
559
+
560
+
561
+ def _generate_kokoro(text: str, speed: float, voice: str) -> str:
562
  _init_kokoro()
563
  model = _KOKORO_STATE["model"]
564
  pipelines = _KOKORO_STATE["pipelines"]
565
  pipeline = pipelines.get("a")
566
  if pipeline is None:
567
  raise gr.Error("Kokoro English pipeline not initialized.")
568
+
569
  audio_segments = []
570
  pack = pipeline.load_voice(voice)
571
+
572
+ segments = list(pipeline(text, voice, speed))
573
+ total_segments = len(segments)
574
+ for segment_idx, (text_chunk, ps, _) in enumerate(segments):
575
+ ref_s = pack[len(ps) - 1]
576
+ try:
577
+ audio = model(ps, ref_s, float(speed))
578
+ audio_segments.append(audio.detach().cpu().numpy())
579
+ if total_segments > 10 and (segment_idx + 1) % 5 == 0:
580
+ print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...")
581
+ except Exception as exc:
582
+ raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}")
583
+
584
+ if not audio_segments:
585
+ raise gr.Error("No audio was generated (empty synthesis result).")
 
 
 
 
 
 
 
 
 
 
586
 
587
+ if len(audio_segments) == 1:
588
+ final_audio = audio_segments[0]
589
+ else:
590
+ final_audio = np.concatenate(audio_segments, axis=0)
591
+ if total_segments > 1:
592
+ duration = len(final_audio) / 24_000
593
+ print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio")
594
+
595
+ # Save to file
596
+ filename = f"speech_kokoro_{uuid.uuid4().hex[:8]}.wav"
597
+ output_path = os.path.join(ROOT_DIR, filename)
598
+
599
+ # Normalize to 16-bit PCM
600
+ audio_int16 = (final_audio * 32767).astype(np.int16)
601
+ scipy.io.wavfile.write(output_path, 24000, audio_int16)
602
+
603
+ _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}")
604
+ return output_path
605
+
606
+
607
+ def _generate_supertonic(text: str, speed: float, voice: str, steps: int, silence_duration: float, max_chunk_size: int) -> str:
608
+ _init_supertonic()
609
+ tts = _SUPERTONIC_STATE["tts"]
610
+ assets_dir = _SUPERTONIC_STATE["assets_dir"]
611
+
612
+ voice_path = os.path.join(assets_dir, "voice_styles", f"{voice}.json")
613
+ if not os.path.exists(voice_path):
614
+ # Fallback or error?
615
+ # Try to find if it's just a name mismatch or use default
616
+ if not os.path.exists(voice_path):
617
+ raise gr.Error(f"Voice style {voice} not found for Supertonic.")
618
+
619
+ style = load_voice_style([voice_path])
620
+
621
+ sr = tts.sample_rate
622
+
623
+ # Supertonic returns a generator of chunks, or we can use __call__ for full audio
624
+ # Using __call__ to get full audio for saving
625
+ # But __call__ returns (wav_cat, dur_cat)
626
+
627
+ wav_cat, _ = tts(text, style, steps, speed, silence_duration, max_chunk_size)
628
+
629
+ if wav_cat is None or wav_cat.size == 0:
630
+ raise gr.Error("No audio generated.")
631
+
632
+ # wav_cat is (1, samples) float32
633
+ final_audio = wav_cat.flatten()
634
+
635
+ # Save to file
636
+ filename = f"speech_supertonic_{uuid.uuid4().hex[:8]}.wav"
637
+ output_path = os.path.join(ROOT_DIR, filename)
638
+
639
+ audio_int16 = _audio_np_to_int16(final_audio)
640
+ scipy.io.wavfile.write(output_path, sr, audio_int16)
641
+
642
+ _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/sr:.2f}")
643
+ return output_path
644
+
645
+
646
+ def build_interface() -> gr.Blocks:
647
+ kokoro_voices = get_kokoro_voices()
648
+ supertonic_voices = get_supertonic_voices()
649
+
650
+ with gr.Blocks(title="Generate Speech") as demo:
651
+ gr.Markdown("<div style=\"text-align:center\">Generate speech with Supertonic (default) or Kokoro-82M.</div>")
652
 
653
+ with gr.Row():
654
+ with gr.Column():
655
+ text_input = gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4)
656
+ model_dropdown = gr.Dropdown(label="Model", choices=["Supertonic", "Kokoro"], value="Supertonic")
657
+
658
+ # Voice dropdown needs to update based on model
659
+ voice_dropdown = gr.Dropdown(
660
+ label="Voice",
661
+ choices=supertonic_voices,
662
+ value="F1",
663
+ info="Select voice"
664
+ )
665
+
666
+ speed_slider = gr.Slider(minimum=0.5, maximum=2.0, value=1.3, step=0.1, label="Speed")
667
+
668
+ # Supertonic specific
669
+ with gr.Group() as supertonic_params:
670
+ steps_slider = gr.Slider(minimum=1, maximum=50, value=5, step=1, label="Steps (Supertonic only)")
671
+ silence_slider = gr.Slider(minimum=0.0, maximum=2.0, value=0.3, step=0.1, label="Silence Duration (Supertonic only)")
672
+ chunk_slider = gr.Slider(minimum=50, maximum=1000, value=300, step=10, label="Max Chunk Size (Supertonic only)")
673
 
674
+ with gr.Row():
675
+ clear_btn = gr.Button("Clear")
676
+ gen_btn = gr.Button("Generate", variant="primary")
677
 
678
+ with gr.Column():
679
+ audio_output = gr.Audio(label="Audio", type="filepath", format="wav")
680
+
681
+ def update_voices(model_name):
682
+ if model_name == "Kokoro":
683
+ return {
684
+ voice_dropdown: gr.Dropdown(choices=kokoro_voices, value="af_heart"),
685
+ supertonic_params: gr.Group(visible=False)
686
+ }
687
+ else:
688
+ return {
689
+ voice_dropdown: gr.Dropdown(choices=supertonic_voices, value="F1"),
690
+ supertonic_params: gr.Group(visible=True)
691
+ }
692
+
693
+ def clear_inputs():
694
+ return [
695
+ "", # text_input
696
+ "Supertonic", # model_dropdown
697
+ "F1", # voice_dropdown
698
+ 1.3, # speed_slider
699
+ 5, # steps_slider
700
+ 0.3, # silence_slider
701
+ 300, # chunk_slider
702
+ None # audio_output
703
+ ]
704
+
705
+ clear_btn.click(
706
+ fn=clear_inputs,
707
+ inputs=[],
708
+ outputs=[text_input, model_dropdown, voice_dropdown, speed_slider, steps_slider, silence_slider, chunk_slider, audio_output]
709
+ )
710
+
711
+ model_dropdown.change(
712
+ fn=update_voices,
713
+ inputs=[model_dropdown],
714
+ outputs=[voice_dropdown, supertonic_params]
715
+ )
716
+
717
+ gen_btn.click(
718
+ fn=Generate_Speech,
719
+ inputs=[text_input, model_dropdown, speed_slider, voice_dropdown, steps_slider, silence_slider, chunk_slider],
720
+ outputs=[audio_output]
721
+ )
722
+
723
+ # Expose the function for API
724
+ demo.fn = Generate_Speech
725
+
726
+ return demo
727
 
728
 
729
+ __all__ = ["Generate_Speech", "List_Kokoro_Voices", "List_Supertonic_Voices", "build_interface"]