BlueV2 / app.py
notmax123's picture
Hebrew dates: spell month with ordinal names (לראשון … לשנים עשר)
920d18e
"""
Gradio Space for BlueTTS — multilingual ONNX TTS (slim 4-model pipeline).
Upstream: https://github.com/maxmelichov/BlueTTS
"""
import os
import re
import json
import time
import base64
import glob
import html
import subprocess
from dataclasses import dataclass
from importlib import import_module
from typing import Any, List, Optional, Tuple, Dict, Union
from unicodedata import normalize as uni_normalize
import numpy as np
from num2words import num2words
import gradio as gr
import onnxruntime as ort
from download_models import BLUE_REPO, download_blue_models, download_default_voices, download_renikud
# ------------------------------------------------------------------
# Paths
# ------------------------------------------------------------------
ONNX_DIR = "onnx_slim"
VOICES_DIR = "voices"
RENIKUD_PATH = "renikud.onnx"
CONFIG_PATH = "tts.json" if os.path.exists("tts.json") else os.path.join(ONNX_DIR, "tts.json")
VOCAB_PATH = next(
(p for p in (os.path.join(ONNX_DIR, "vocab.json"), "vocab.json",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "vocab.json"))
if os.path.exists(p)),
os.path.join(ONNX_DIR, "vocab.json"),
)
# ------------------------------------------------------------------
# Fetch models + default voices on first run
# ------------------------------------------------------------------
def _needs_download() -> bool:
required = ["text_encoder.onnx", "vector_estimator.onnx", "vocoder.onnx",
"duration_predictor.onnx"]
repo_marker = os.path.join(ONNX_DIR, ".repo_id")
if not os.path.exists(repo_marker):
return True
with open(repo_marker) as f:
if f.read().strip() != BLUE_REPO:
return True
for fn in required:
p = os.path.join(ONNX_DIR, fn)
if not os.path.exists(p) or os.path.getsize(p) < 1000:
return True
return False
if _needs_download():
print("[INFO] Slim ONNX bundle incomplete, downloading…")
download_blue_models(ONNX_DIR)
download_default_voices(VOICES_DIR)
download_renikud(RENIKUD_PATH)
# ============================================================
# Vocab — phoneme → id map, shared with the old/new checkpoints.
# A vocab.json next to the slim ONNX files wins; otherwise we fall back to
# this built-in IPA map (same as the upstream Piper-style vocab + extras).
# ============================================================
_PIPER_MAP: dict[str, int] = {
"_": 0, "^": 1, "$": 2, " ": 3, "!": 4, "'": 5, "(": 6, ")": 7, ",": 8, "-": 9, ".": 10,
":": 11, ";": 12, "?": 13, "a": 14, "b": 15, "c": 16, "d": 17, "e": 18, "f": 19,
"h": 20, "i": 21, "j": 22, "k": 23, "l": 24, "m": 25, "n": 26, "o": 27, "p": 28, "q": 29, "r": 30, "s": 31, "t": 32, "u": 33,
"v": 34, "w": 35, "x": 36, "y": 37, "z": 38, "æ": 39, "ç": 40, "ð": 41, "ø": 42, "ħ": 43, "ŋ": 44, "œ": 45,
"ǀ": 46, "ǁ": 47, "ǂ": 48, "ǃ": 49, "ɐ": 50, "ɑ": 51, "ɒ": 52, "ɓ": 53, "ɔ": 54, "ɕ": 55,
"ɖ": 56, "ɗ": 57, "ɘ": 58, "ə": 59, "ɚ": 60, "ɛ": 61, "ɜ": 62, "ɞ": 63, "ɟ": 64, "ɠ": 65, "ɡ": 66, "ɢ": 67,
"ɣ": 68, "ɤ": 69, "ɥ": 70, "ɦ": 71, "ɧ": 72, "ɨ": 73, "ɪ": 74, "ɫ": 75, "ɬ": 76, "ɭ": 77, "ɮ": 78, "ɯ": 79,
"ɰ": 80, "ɱ": 81, "ɲ": 82, "ɳ": 83, "ɴ": 84, "ɵ": 85, "ɶ": 86, "ɸ": 87, "ɹ": 88, "ɺ": 89, "ɻ": 90, "ɽ": 91,
"ɾ": 92, "ʀ": 93, "ʁ": 94, "ʂ": 95, "ʃ": 96, "ʄ": 97, "ʈ": 98, "ʉ": 99, "ʊ": 100, "ʋ": 101, "ʌ": 102, "ʍ": 103,
"ʎ": 104, "ʏ": 105, "ʐ": 106, "ʑ": 107, "ʒ": 108, "ʔ": 109, "ʕ": 110, "ʘ": 111, "ʙ": 112, "ʛ": 113, "ʜ": 114, "ʝ": 115,
"ʟ": 116, "ʡ": 117, "ʢ": 118, "ʲ": 119, "ˈ": 120, "ˌ": 121, "ː": 122, "ˑ": 123, "˞": 124,
"β": 125, "θ": 126, "χ": 127, "ᵻ": 128, "ⱱ": 129, "0": 130, "1": 131, "2": 132, "3": 133, "4": 134,
"5": 135, "6": 136, "7": 137, "8": 138, "9": 139, "\u0327": 140, "\u0303": 141, "\u032A": 142, "\u032F": 143, "\u0329": 144,
"ʰ": 145, "ˤ": 146, "ε": 147, "↓": 148, "#": 149, '"': 150, "↑": 151, "\u033A": 152, "\u033B": 153, "g": 154, "ʦ": 155, "X": 156,
}
_EXTENDED_MAP: dict[str, int] = {
"A": 157, "B": 158, "C": 159, "D": 160, "E": 161, "F": 162, "G": 163, "H": 164, "I": 165, "J": 166, "K": 167, "L": 168, "M": 169, "N": 170,
"O": 171, "P": 172, "Q": 173, "R": 174, "S": 175, "T": 176, "U": 177, "V": 178, "W": 179, "Y": 180, "Z": 181,
"ʤ": 182, "ɝ": 183, "ʧ": 184, "ʼ": 185, "ʴ": 186, "ʱ": 187, "ʷ": 188, "ˠ": 189, "→": 190, "↗": 191, "↘": 192,
"¡": 193, "¿": 194, "…": 195, "«": 196, "»": 197, "*": 198, "~": 199, "/": 200, "\\": 201, "&": 202,
"\u0361": 203, "\u035C": 204, "\u0325": 205, "\u032C": 206, "\u0339": 207, "\u031C": 208, "\u031D": 209, "\u031E": 210, "\u031F": 211, "\u0320": 212, "\u0330": 213, "\u0334": 214, "\u031A": 215, "\u0318": 216, "\u0319": 217, "\u0348": 218, "\u0306": 219, "\u0308": 220, "\u031B": 221, "\u0324": 222, "\u033C": 223,
"\u02C0": 224, "\u02C1": 225, "\u02BE": 226, "\u02BF": 227, "\u02BB": 228, "\u02C9": 229, "\u02CA": 230, "\u02CB": 231, "\u02C6": 232,
"\u02E5": 233, "\u02E6": 234, "\u02E7": 235, "\u02E8": 236, "\u02E9": 237, "\u0300": 238, "\u0301": 239, "\u0302": 240, "\u0304": 241, "\u030C": 242, "\u0307": 243,
}
DEFAULT_CHAR_TO_ID: dict[str, int] = {**_PIPER_MAP, **_EXTENDED_MAP}
AVAILABLE_LANGS = ["en", "es", "de", "it", "he"]
BLUE_SYNTH_MAX_CHUNK_LEN = 200
# When pace blending is enabled, durations are nudged toward this many seconds
# per text token so speed feels more consistent on long or mixed-language text.
DURATION_PACE_DPT_REF = 0.0625
DEFAULT_MIXED_PACE_BLEND = 0.25
LANG_CODE_ALIASES: dict[str, str] = {"ge": "de", "en-us": "en"}
_ESPEAK_MAP = {
"en": "en-us", "en-us": "en-us", "de": "de", "ge": "de",
"it": "it", "es": "es",
}
_INLINE_LANG_PAIR = re.compile(r"<(en|en-us|he|es|de|ge|it)>(.*?)(?:</\1>|<\1>)", re.DOTALL | re.IGNORECASE)
_LANG_LIST_BLOCK_RE = re.compile(r"<lang_list\b[^>]*>.*?</lang_list>", re.DOTALL | re.IGNORECASE)
_LANG_TAG_RE = re.compile(r"</?[^>]+>")
_HEBREW_NIKUD_RE = re.compile(r"[\u0591-\u05BD\u05BF\u05C1-\u05C2\u05C4-\u05C5\u05C7]")
_HEBREW_CHAR_RE = re.compile(r"[\u0590-\u05ff]")
_EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
_LATIN_ALNUM_RE = re.compile(r"\d+[A-Za-z]+|[A-Za-z]+(?:[.'’\-][A-Za-z0-9]+)*")
_MIXED_EN_SEGMENT_RE = re.compile(
r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"
r"|\d+[A-Za-z]+"
r"|[A-Za-z]+(?:[.'’\-][A-Za-z0-9]+)*"
)
_DATE_RE = re.compile(r"(?<!\d)([0-3]?\d)[/.]([01]?\d)[/.](\d{2}|\d{4})(?!\d)")
_HEBREW_MONTH_ORDINALS = {
1: "לראשון",
2: "לשני",
3: "לשלישי",
4: "לרביעי",
5: "לחמישי",
6: "לשישי",
7: "לשביעי",
8: "לשמיני",
9: "לתשיעי",
10: "לעשירי",
11: "לאחד עשר",
12: "לשנים עשר",
}
_PERCENT_WORDS = {
"he": "אחוז",
"en": "percent",
"es": "por ciento",
"de": "Prozent",
"it": "per cento",
}
_RATIO_WORDS = {
"he": "ל",
"en": "to",
"es": "a",
"de": "zu",
"it": "a",
}
def _strip_helper_markup(text: str) -> str:
"""Remove non-spoken helper markup that can leak into synthesis prompts."""
text = _LANG_LIST_BLOCK_RE.sub(" ", text)
text = re.sub(r"</?lang_list\b[^>]*>", " ", text, flags=re.IGNORECASE)
return text
def _strip_synthesis_tags(text: str) -> str:
"""Remove XML-like tags before tokenization so tag names are never spoken."""
text = _strip_helper_markup(text)
return _LANG_TAG_RE.sub(" ", text)
def strip_language_tags_for_display(text: str) -> str:
"""Remove internal language tags from phoneme text shown to users."""
return re.sub(r"\s+", " ", _LANG_TAG_RE.sub("", text)).strip()
def strip_hebrew_nikud(text: str) -> str:
"""Remove Hebrew niqqud/cantillation marks while preserving Hebrew letters."""
return _HEBREW_NIKUD_RE.sub("", text)
def _canonical_lang(lang: str) -> str:
return LANG_CODE_ALIASES.get(lang.lower(), lang.lower())
def _has_mixed_hebrew_latin(text: str, lang: str) -> bool:
lang = _canonical_lang(lang)
return lang == "he" and bool(_HEBREW_CHAR_RE.search(text) and _LATIN_ALNUM_RE.search(text))
def strip_hebrew_abbreviation_quotes(text: str, lang: str) -> str:
"""Remove Hebrew abbreviation marks inside words, e.g. מנכ"ל -> מנכל."""
if _canonical_lang(lang) != "he":
return text
return re.sub(r"(?<=[\u0590-\u05ff])[\"'״׳](?=[\u0590-\u05ff])", "", text)
def expand_hebrew_lamed_before_latin(text: str, lang: str) -> str:
"""Avoid one-letter Hebrew chunks in mixed text: CPU ל-GPU -> CPU אל GPU."""
if _canonical_lang(lang) != "he":
return text
return re.sub(r"(?<![\u0590-\u05ff])ל\s*[-–—‑]?\s*(?=[A-Za-z0-9])", "אל ", text)
def strip_silent_separator_tokens(text: str) -> str:
"""Drop punctuation tokens that should not be sent as spoken content."""
text = re.sub(r"(?<=[\u0590-\u05ff])[-–—‑]+(?=[A-Za-z0-9])", " ", text)
text = re.sub(r"(?<=[A-Za-z0-9])[-–—‑]+(?=[\u0590-\u05ff])", " ", text)
text = re.sub(r"(?<![A-Za-z])\s*[-–—‑]+\s*(?![A-Za-z])", " ", text)
text = re.sub(r"(?<!\d)\s*:+\s*(?!\d)", " ", text)
return re.sub(r"\s+", " ", text).strip()
def email_to_spoken_english(email: str) -> str:
"""Make email addresses pronounceable before English phonemization."""
local, _, domain = email.partition("@")
def spell_short_label(label: str) -> str:
return " ".join(label) if 0 < len(label) <= 2 and label.isalpha() else label
local = re.sub(r"[._]+", " dot ", local)
local = re.sub(r"[-]+", " dash ", local)
local = re.sub(r"[+]+", " plus ", local)
domain_parts = [spell_short_label(part) for part in domain.split(".") if part]
spoken = f"{local} at {' dot '.join(domain_parts)}"
return re.sub(r"\s+", " ", spoken).strip()
def blend_duration_pace(
dur: np.ndarray,
text_mask: np.ndarray,
pace_blend: float,
pace_dpt_ref: float = DURATION_PACE_DPT_REF,
) -> np.ndarray:
"""Blend predicted seconds-per-token toward a stable reference pace."""
blend = min(max(float(pace_blend), 0.0), 1.0)
if blend <= 0.0:
return np.asarray(dur, dtype=np.float32).reshape(-1)
d = np.asarray(dur, dtype=np.float64).reshape(-1)
token_count = np.maximum(
np.asarray(text_mask, dtype=np.float64).sum(axis=(1, 2)),
1.0,
).reshape(-1)
dpt = d / token_count
blended_dpt = (1.0 - blend) * dpt + blend * float(pace_dpt_ref)
return (blended_dpt * token_count).astype(np.float32)
# ============================================================
# Phonemization (Renikud for Hebrew, espeak-ng for Latin langs)
# ============================================================
class TextProcessor:
def __init__(self, renikud_path: Optional[str] = None):
self.renikud = None
if renikud_path is None and os.path.exists("model.onnx"):
renikud_path = "model.onnx"
if renikud_path and os.path.exists(renikud_path):
try:
from renikud_onnx import G2P
self.renikud = G2P(renikud_path)
print(f"[INFO] Loaded Renikud G2P from {renikud_path}")
except ImportError as e:
raise RuntimeError(
"Hebrew G2P needs `renikud-onnx`. Install: `uv sync`."
) from e
self._espeak_backends: Dict[str, Any] = {}
self._espeak_separator = None
self._espeak_ready = False
self._init_espeak()
def _init_espeak(self):
try:
import espeakng_loader
from phonemizer.backend.espeak.wrapper import EspeakWrapper
from phonemizer.separator import Separator
EspeakWrapper.set_library(espeakng_loader.get_library_path())
if hasattr(EspeakWrapper, "set_data_path"):
EspeakWrapper.set_data_path(espeakng_loader.get_data_path())
self._espeak_separator = Separator(phone="", word=" ", syllable="")
self._espeak_ready = True
except Exception as e:
print(f"[WARN] espeak-ng setup failed: {e}")
def _get_backend(self, espeak_lang: str):
if espeak_lang not in self._espeak_backends:
from phonemizer.backend import EspeakBackend
self._espeak_backends[espeak_lang] = EspeakBackend(
espeak_lang, preserve_punctuation=True,
with_stress=True, language_switch="remove-flags",
)
return self._espeak_backends[espeak_lang]
def _espeak(self, text: str, lang: str) -> str:
espeak_lang = _ESPEAK_MAP.get(lang)
if espeak_lang is None:
return text
if self._espeak_ready:
try:
raw = self._get_backend(espeak_lang).phonemize(
[text], separator=self._espeak_separator
)[0]
return re.sub(r"\s+", " ", raw).strip()
except Exception as e:
print(f"[WARN] phonemizer failed for {lang}: {e}")
try:
r = subprocess.run(
["espeak-ng", "-q", "--ipa=1", "-v", espeak_lang, text],
check=True, capture_output=True, text=True,
)
return re.sub(r"\s+", " ", r.stdout.replace("\n", " ")).strip()
except Exception as e:
print(f"[WARN] espeak-ng subprocess failed for {lang}: {e}")
return text
def _phonemize_segment(self, content: str, lang: str) -> str:
content = strip_hebrew_nikud(_strip_synthesis_tags(content)).strip()
if not content:
return ""
lang = LANG_CODE_ALIASES.get(lang, lang)
has_hebrew = any("\u0590" <= c <= "\u05ff" for c in content)
if has_hebrew or lang == "he":
if not has_hebrew:
return content
if self.renikud is None:
raise ValueError("Hebrew text requires Renikud weights (renikud.onnx).")
return strip_silent_separator_tokens(self.renikud.phonemize(content))
return strip_silent_separator_tokens(self._espeak(content, lang))
def _phonemize_tagged_segments(self, content: str, lang: str) -> list[tuple[str, str]]:
content = strip_hebrew_nikud(_strip_synthesis_tags(content)).strip()
if not content:
return []
lang = _canonical_lang(lang)
if not _has_mixed_hebrew_latin(content, lang):
seg = self._phonemize_segment(content, lang)
return [(lang, seg)] if seg else []
pieces: list[tuple[str, str]] = []
def add(piece: str, piece_lang: str) -> None:
if piece_lang == "en" and _EMAIL_RE.fullmatch(piece):
piece = email_to_spoken_english(piece)
seg = self._phonemize_segment(piece, piece_lang)
if seg:
pieces.append((_canonical_lang(piece_lang), seg))
last_end = 0
for m in _MIXED_EN_SEGMENT_RE.finditer(content):
if m.start() > last_end:
add(content[last_end:m.start()], lang)
add(m.group(0), "en")
last_end = m.end()
if last_end < len(content):
add(content[last_end:], lang)
return pieces
@staticmethod
def _wrap_segments(segments: list[tuple[str, str]]) -> str:
return " ".join(f"<{tag}>{seg}</{tag}>" for tag, seg in segments if seg)
def phonemize(self, text: str, lang: str = "he") -> str:
"""Phonemize, preserving inline ``<xx>…</xx>`` spans and re-wrapping
every segment so the text encoder sees ``<lang>…</lang>`` boundaries."""
text = _strip_helper_markup(text)
lang = _canonical_lang(lang)
if not _INLINE_LANG_PAIR.search(text):
return self._wrap_segments(self._phonemize_tagged_segments(text, lang))
pieces: list[tuple[str, str]] = []
last_end = 0
for m in _INLINE_LANG_PAIR.finditer(text):
if m.start() > last_end:
pieces.extend(self._phonemize_tagged_segments(text[last_end:m.start()], lang))
tag = _canonical_lang(m.group(1))
pieces.extend(self._phonemize_tagged_segments(m.group(2), tag))
last_end = m.end()
if last_end < len(text):
pieces.extend(self._phonemize_tagged_segments(text[last_end:], lang))
return re.sub(r"\s+", " ", self._wrap_segments(pieces)).strip()
# ============================================================
# Char-level tokenizer (vocab.json or built-in fallback)
# ============================================================
class UnicodeProcessor:
def __init__(self, indexer_path: Optional[str] = None):
self._char_to_id: Optional[Dict[str, int]]
self._codepoint_indexer: Optional[Dict[int, int]]
self.pad_id: int = 0
if indexer_path and os.path.exists(indexer_path):
with open(indexer_path, "r") as f:
raw = json.load(f)
if isinstance(raw, dict) and "char_to_id" in raw:
self.pad_id = int(raw.get("pad_id", 0))
self._char_to_id = {k: int(v) for k, v in raw["char_to_id"].items()}
self._codepoint_indexer = None
else:
self.pad_id = 0
self._char_to_id = None
self._codepoint_indexer = {int(k): int(v) for k, v in raw.items()}
vocab_len = len(self._char_to_id) if self._char_to_id is not None else len(self._codepoint_indexer or {})
print(f"[INFO] Loaded vocab from {indexer_path} ({vocab_len} entries)")
else:
self._char_to_id = dict(DEFAULT_CHAR_TO_ID)
self._codepoint_indexer = None
print("[INFO] Using built-in default vocab.")
def _preprocess(self, text: str, lang: str) -> str:
text = _strip_synthesis_tags(text)
text = uni_normalize("NFKD", text)
text = strip_hebrew_nikud(text)
emoji_pattern = re.compile(
"[\U0001f600-\U0001f64f\U0001f300-\U0001f5ff\U0001f680-\U0001f6ff"
"\U0001f700-\U0001f77f\U0001f780-\U0001f7ff\U0001f800-\U0001f8ff"
"\U0001f900-\U0001f9ff\U0001fa00-\U0001fa6f\U0001fa70-\U0001faff"
"\u2600-\u26ff\u2700-\u27bf\U0001f1e6-\U0001f1ff]+", flags=re.UNICODE,
)
text = emoji_pattern.sub("", text)
for k, v in {
"–": "-", "‑": "-", "—": "-", "_": " ",
"\u201c": '"', "\u201d": '"', "\u2018": "'", "\u2019": "'",
"´": "'", "`": "'", "[": " ", "]": " ", "|": " ",
"/": " ", "#": " ", "→": " ", "←": " ",
}.items():
text = text.replace(k, v)
text = re.sub(r"[♥☆♡©\\]", "", text)
for k, v in {"@": " at ", "e.g.,": "for example, ", "i.e.,": "that is, "}.items():
text = text.replace(k, v)
for pat in (r" ,", r" \.", r" !", r" \?", r" ;", r" :", r" '"):
text = re.sub(pat, pat.replace(" ", "").replace("\\", ""), text)
while '""' in text:
text = text.replace('""', '"')
while "''" in text:
text = text.replace("''", "'")
text = strip_silent_separator_tokens(text)
text = re.sub(r"\s+", " ", text).strip()
if not re.search(r"[.!?;:,'\"')\]}…。」』】〉》›»]$", text):
text += "."
lang = LANG_CODE_ALIASES.get(lang, lang)
if lang not in AVAILABLE_LANGS:
raise ValueError(f"Invalid language: {lang}")
if not _INLINE_LANG_PAIR.search(text):
text = f"<{lang}>{text}</{lang}>"
return text
def _encode(self, text: str) -> np.ndarray:
text = _strip_synthesis_tags(text)
pad = self.pad_id
if self._char_to_id is not None:
ids = [self._char_to_id.get(ch, pad) for ch in text]
else:
assert self._codepoint_indexer is not None
ids = [self._codepoint_indexer.get(ord(ch), pad) for ch in text]
return np.array(ids, dtype=np.int64)
def __call__(self, text_list: List[str], lang_list: List[str]):
text_list = [self._preprocess(t, lang) for t, lang in zip(text_list, lang_list)]
encoded = [self._encode(t) for t in text_list]
lengths = np.array([len(e) for e in encoded], dtype=np.int64)
text_ids = np.full((len(encoded), int(lengths.max())), self.pad_id, dtype=np.int64)
for i, ids in enumerate(encoded):
text_ids[i, :len(ids)] = ids
mask = _length_to_mask(lengths)
return text_ids, mask
def _length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray:
max_len = max_len or int(lengths.max())
ids = np.arange(0, max_len)
m = (ids < np.expand_dims(lengths, 1)).astype(np.float32)
return m.reshape(-1, 1, max_len)
def _latent_mask(wav_lengths: np.ndarray, base_chunk: int, factor: int) -> np.ndarray:
size = base_chunk * factor
lat_len = (wav_lengths + size - 1) // size
return _length_to_mask(lat_len)
# ============================================================
# Voice style container
# ============================================================
@dataclass
class Style:
ttl: np.ndarray
dp: np.ndarray
def load_voice_style(paths: List[str]) -> Style:
with open(paths[0]) as f:
return style_from_dict(json.load(f))
def style_from_dict(payload: dict[str, Any]) -> Style:
ttl_dims = payload["style_ttl"]["dims"]
dp_dims = payload["style_dp"]["dims"]
ttl_data = np.array(payload["style_ttl"]["data"], dtype=np.float32).flatten()
dp_data = np.array(payload["style_dp"]["data"], dtype=np.float32).flatten()
return Style(
ttl=ttl_data.reshape(ttl_dims),
dp=dp_data.reshape(dp_dims),
)
def load_voice_style_batch(paths: List[str]) -> Style:
with open(paths[0]) as f:
first = json.load(f)
ttl_dims = first["style_ttl"]["dims"]
dp_dims = first["style_dp"]["dims"]
B = len(paths)
ttl = np.zeros([B, ttl_dims[1], ttl_dims[2]], dtype=np.float32)
dp = np.zeros([B, dp_dims[1], dp_dims[2]], dtype=np.float32)
for i, p in enumerate(paths):
with open(p) as f:
d = json.load(f)
ttl[i] = np.array(d["style_ttl"]["data"], dtype=np.float32).reshape(ttl_dims[1], ttl_dims[2])
dp[i] = np.array(d["style_dp"]["data"], dtype=np.float32).reshape(dp_dims[1], dp_dims[2])
return Style(ttl=ttl, dp=dp)
# ============================================================
# TextToSpeech core (slim pipeline)
# ============================================================
def _hard_split(s: str, max_len: int) -> List[str]:
"""Split ``s`` into pieces of at most ``max_len`` chars, preferring spaces."""
s = s.strip()
if len(s) <= max_len:
return [s] if s else []
out: List[str] = []
i, n = 0, len(s)
while i < n:
j = min(i + max_len, n)
if j < n:
cut = s.rfind(" ", i, j)
if cut > i + max_len // 4:
j = cut
piece = s[i:j].strip()
if piece:
out.append(piece)
i = j
while i < n and s[i] == " ":
i += 1
return out
def chunk_text(text: str, max_len: int = 300) -> List[str]:
pattern = (
r"(?<!Mr\.)(?<!Mrs\.)(?<!Ms\.)(?<!Dr\.)(?<!Prof\.)(?<!Sr\.)(?<!Jr\.)"
r"(?<!Ph\.D\.)(?<!etc\.)(?<!e\.g\.)(?<!i\.e\.)(?<!vs\.)(?<!Inc\.)"
r"(?<!Ltd\.)(?<!Co\.)(?<!Corp\.)(?<!St\.)(?<!Ave\.)(?<!Blvd\.)"
r"(?<!\b[A-Z]\.)(?<=[.!?])\s+"
)
chunks: List[str] = []
for paragraph in re.split(r"\n\s*\n+", text.strip()):
paragraph = paragraph.strip()
if not paragraph:
continue
current = ""
for sentence in re.split(pattern, paragraph):
if len(current) + len(sentence) + 1 <= max_len:
current += (" " if current else "") + sentence
else:
if current:
chunks.append(current.strip())
current = ""
if len(sentence) > max_len:
chunks.extend(_hard_split(sentence, max_len))
else:
current = sentence
if current:
chunks.append(current.strip())
base = chunks if chunks else [text.strip()]
# Defensive: guarantee nothing exceeds max_len (e.g. phonemization can blow up).
out: List[str] = []
for c in base:
out.extend(_hard_split(c, max_len))
return out
class BlueTTS:
def __init__(
self,
onnx_dir: str = ONNX_DIR,
config_path: str = CONFIG_PATH,
vocab_path: str = VOCAB_PATH,
renikud_path: Optional[str] = RENIKUD_PATH,
use_gpu: bool = False,
):
self.cfgs = self._load_cfg(config_path)
self.sample_rate = int(self.cfgs["ae"]["sample_rate"])
self.base_chunk_size = int(self.cfgs["ae"]["base_chunk_size"])
self.chunk_compress_factor = int(self.cfgs["ttl"]["chunk_compress_factor"])
self.ldim = int(self.cfgs["ttl"]["latent_dim"])
opts = ort.SessionOptions()
opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
n_threads = int(os.environ.get("ORT_NUM_THREADS", min(8, os.cpu_count() or 1)))
opts.intra_op_num_threads = n_threads
opts.inter_op_num_threads = 1
providers = ["CPUExecutionProvider"]
if use_gpu and "CUDAExecutionProvider" in ort.get_available_providers():
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
def _load(name: str) -> ort.InferenceSession:
return ort.InferenceSession(os.path.join(onnx_dir, name),
sess_options=opts, providers=providers)
self.dp_ort = _load("duration_predictor.onnx")
self.text_enc_ort = _load("text_encoder.onnx")
self.vector_est_ort = _load("vector_estimator.onnx")
self.vocoder_ort = _load("vocoder.onnx")
self._vf_inputs = {i.name for i in self.vector_est_ort.get_inputs()}
self._vocoder_input_name = self.vocoder_ort.get_inputs()[0].name
# Optional uncond embeddings for CFG (if shipped with the slim bundle).
self._u_text = self._u_ref = None
uncond_path = os.path.join(onnx_dir, "uncond.npz")
if os.path.exists(uncond_path):
u = np.load(uncond_path)
self._u_text = u["u_text"] if "u_text" in u.files else None
self._u_ref = u["u_ref"] if "u_ref" in u.files else None
self.text_processor = UnicodeProcessor(vocab_path)
self.g2p = TextProcessor(renikud_path)
@staticmethod
def _load_cfg(path: str) -> dict:
if not os.path.exists(path):
raise FileNotFoundError(f"Missing config {path}")
with open(path) as f:
return json.load(f)
def _sample_noisy_latent(self, duration: np.ndarray, seed: int = 42):
bsz = len(duration)
chunk_size = self.base_chunk_size * self.chunk_compress_factor
wav_len_max = duration.max() * self.sample_rate
wav_lengths = (duration * self.sample_rate).astype(np.int64)
latent_len = int(np.ceil(wav_len_max / chunk_size))
latent_dim = self.ldim * self.chunk_compress_factor
rng = np.random.default_rng(seed)
xt = rng.standard_normal((bsz, latent_dim, latent_len)).astype(np.float32)
latent_mask = _latent_mask(wav_lengths, self.base_chunk_size, self.chunk_compress_factor)
return xt * latent_mask, latent_mask
def _infer(
self,
text_list: List[str],
lang_list: List[str],
style: Style,
total_step: int,
speed: float,
cfg_scale: float,
seed: int,
pace_blend: float = 0.0,
pace_dpt_ref: float = DURATION_PACE_DPT_REF,
):
bsz = len(text_list)
assert style.ttl.shape[0] == bsz, "style batch mismatch"
text_ids, text_mask = self.text_processor(text_list, lang_list)
dur, *_ = self.dp_ort.run(None, {
"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask,
})
dur = np.asarray(dur, dtype=np.float32).reshape(-1)
dur = blend_duration_pace(dur, text_mask, pace_blend, pace_dpt_ref)
dur = dur / max(speed, 1e-6)
text_emb, *_ = self.text_enc_ort.run(None, {
"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask,
})
xt, latent_mask = self._sample_noisy_latent(dur, seed=seed)
total_t = np.array([total_step] * bsz, dtype=np.float32)
use_cfg = (cfg_scale != 1.0 and self._u_text is not None and self._u_ref is not None)
u_text_mask = np.ones((bsz, 1, 1), dtype=np.float32) if use_cfg else None
for step in range(total_step):
cur_t = np.array([step] * bsz, dtype=np.float32)
cond = {
"noisy_latent": xt, "text_emb": text_emb,
"style_ttl": style.ttl, "text_mask": text_mask,
"latent_mask": latent_mask,
"current_step": cur_t, "total_step": total_t,
}
if "cfg_scale" in self._vf_inputs:
cond["cfg_scale"] = np.array([float(cfg_scale)], dtype=np.float32)
xt, *_ = self.vector_est_ort.run(None, cond)
elif use_cfg:
v_cond, *_ = self.vector_est_ort.run(None, cond)
u_text_b = np.broadcast_to(self._u_text, (bsz, *self._u_text.shape[1:])).astype(np.float32)
u_ref_b = np.broadcast_to(self._u_ref, (bsz, *self._u_ref.shape[1:])).astype(np.float32)
v_uncond, *_ = self.vector_est_ort.run(None, {
"noisy_latent": xt, "text_emb": u_text_b,
"style_ttl": u_ref_b, "text_mask": u_text_mask,
"latent_mask": latent_mask,
"current_step": cur_t, "total_step": total_t,
})
xt = v_uncond + cfg_scale * (v_cond - v_uncond)
else:
xt, *_ = self.vector_est_ort.run(None, cond)
wav, *_ = self.vocoder_ort.run(None, {self._vocoder_input_name: xt})
frame_len = self.base_chunk_size * self.chunk_compress_factor
if wav.shape[-1] > 2 * frame_len:
wav = wav[..., frame_len:-frame_len]
if wav.ndim == 3 and wav.shape[1] == 1:
wav = wav[:, 0, :]
return wav, dur
def synthesize(
self,
text: Union[str, List[str]],
lang: Union[str, List[str]],
style: Style,
total_step: int = 8,
speed: float = 0.95,
cfg_scale: float = 4.0,
silence_duration: float = 0.15,
seed: int = 42,
phonemize: bool = True,
pace_blend: Optional[float] = None,
pace_dpt_ref: float = DURATION_PACE_DPT_REF,
) -> Tuple[np.ndarray, int]:
if isinstance(text, list):
has_inline_lang = any(_INLINE_LANG_PAIR.search(t) is not None for t in text)
has_auto_mixed = any(_has_mixed_hebrew_latin(t, l) for t, l in zip(text, lang)) if isinstance(lang, list) else False
else:
has_inline_lang = _INLINE_LANG_PAIR.search(text) is not None
has_auto_mixed = _has_mixed_hebrew_latin(text, lang) if isinstance(lang, str) else False
pace_blend_eff = (
float(pace_blend)
if pace_blend is not None
else (DEFAULT_MIXED_PACE_BLEND if has_inline_lang or has_auto_mixed else 0.0)
)
if isinstance(text, list):
assert isinstance(lang, list) and len(text) == len(lang)
if phonemize:
text = [self.g2p.phonemize(t, lang=l) for t, l in zip(text, lang)]
wav, _ = self._infer(
text, lang, style, total_step, speed, cfg_scale, seed,
pace_blend=pace_blend_eff, pace_dpt_ref=pace_dpt_ref,
)
return wav, self.sample_rate
assert isinstance(lang, str)
assert style.ttl.shape[0] == 1, "single-text mode needs a single style"
max_len = BLUE_SYNTH_MAX_CHUNK_LEN
chunks = chunk_text(text, max_len=max_len)
wav_cat: Optional[np.ndarray] = None
for raw_chunk in chunks:
chunk = self.g2p.phonemize(raw_chunk, lang=lang) if phonemize else raw_chunk
if not chunk:
continue
w, _ = self._infer(
[chunk], [lang], style, total_step, speed, cfg_scale, seed,
pace_blend=pace_blend_eff, pace_dpt_ref=pace_dpt_ref,
)
if wav_cat is None:
wav_cat = w
else:
silence = np.zeros((1, int(silence_duration * self.sample_rate)), dtype=np.float32)
wav_cat = np.concatenate([wav_cat, silence, w], axis=1)
if wav_cat is None:
wav_cat = np.zeros((1, 0), dtype=np.float32)
return wav_cat.squeeze(0) if wav_cat.ndim == 2 else wav_cat.squeeze(), self.sample_rate
# ============================================================
# App setup
# ============================================================
TTS = BlueTTS(ONNX_DIR, CONFIG_PATH, VOCAB_PATH, RENIKUD_PATH)
def discover_voices() -> Dict[str, str]:
out: Dict[str, str] = {}
for p in sorted(glob.glob(os.path.join(VOICES_DIR, "*.json"))):
try:
with open(p) as f:
payload = json.load(f)
ttl = payload.get("style_ttl")
if ttl:
arr = np.array(ttl["data"], dtype=np.float32)
if float(arr.std()) > 0.3:
print(f"[INFO] Skipping incompatible voice JSON {p} (style_ttl std={arr.std():.3f})")
continue
except Exception as e:
print(f"[WARN] Skipping unreadable voice JSON {p}: {e}")
continue
label = os.path.splitext(os.path.basename(p))[0]
pretty = label.replace("_", " ").replace("spk ", "Speaker ").title()
out[pretty] = p
return out
VOICES: Dict[str, str] = discover_voices()
VOICE_STYLES: Dict[str, Style] = {name: load_voice_style([path]) for name, path in VOICES.items()}
def expand_numbers(text: str, lang: str = "en") -> str:
lang = _canonical_lang(lang)
def repl(m: re.Match[str]) -> str:
raw = m.group(0)
try:
value: Union[int, float]
if "." in raw or "," in raw:
value = float(raw.replace(",", "."))
else:
value = int(raw)
return num2words(value, lang=lang)
except Exception:
return raw
return re.sub(r"(?<![\w])\d+(?:[.,]\d+)?(?![\w])", repl, text)
def expand_percent_symbols(text: str, lang: str = "en") -> str:
word = _PERCENT_WORDS.get(_canonical_lang(lang), _PERCENT_WORDS["en"])
text = re.sub(r"(\d+(?:[.,]\d+)?)\s*%", rf"\1 {word}", text)
return re.sub(r"%", f" {word} ", text)
def expand_ratios(text: str, lang: str = "en") -> str:
word = _RATIO_WORDS.get(_canonical_lang(lang), _RATIO_WORDS["en"])
return re.sub(r"(?<!\d)(\d+)\s*:\s*(\d+)(?!\d)", rf"\1 {word} \2", text)
def expand_dates(text: str, lang: str = "en") -> str:
"""Normalize numeric day/month/year dates before generic number expansion."""
lang = _canonical_lang(lang)
def repl(m: re.Match[str]) -> str:
day = int(m.group(1))
month = int(m.group(2))
raw_year = m.group(3)
if not (1 <= day <= 31 and 1 <= month <= 12):
return m.group(0)
year = int(raw_year)
if len(raw_year) == 2:
year += 2000 if year < 70 else 1900
if lang == "he":
return f"{day} {_HEBREW_MONTH_ORDINALS[month]} {year}"
return f"{day} {month} {year}"
return _DATE_RE.sub(repl, text)
def normalize_common_text(text: str) -> str:
text = strip_hebrew_nikud(text)
text = re.sub(
r"\banymore\b",
lambda m: "Any more" if m.group(0)[0].isupper() else "any more",
text,
flags=re.IGNORECASE,
)
return text
def prepare_text_for_synthesis(text: str, lang: str) -> str:
text = normalize_common_text(text)
text = strip_hebrew_abbreviation_quotes(text, lang)
text = expand_hebrew_lamed_before_latin(text, lang)
text = expand_dates(text, lang=lang)
text = expand_percent_symbols(text, lang=lang)
text = expand_ratios(text, lang=lang)
text = expand_numbers(text, lang=lang)
return strip_silent_separator_tokens(text)
def normalize_generated_audio(wav: np.ndarray, target_rms: float = 0.08, peak_limit: float = 0.95) -> np.ndarray:
"""Gently lift quiet generations while leaving normal/loud audio unclipped."""
wav = np.asarray(wav, dtype=np.float32)
if wav.size == 0 or not np.isfinite(wav).all():
return wav
peak = float(np.max(np.abs(wav)))
if peak < 1e-6:
return wav
active = np.abs(wav) > max(peak * 0.02, 1e-4)
samples = wav[active] if np.any(active) else wav
rms = float(np.sqrt(np.mean(np.square(samples))))
if rms < 1e-6:
return wav
# Cap boost so a very quiet/bad generation does not become harsh or noisy.
gain = min(target_rms / rms, peak_limit / peak, 4.0)
if gain <= 1.0:
return wav
return (wav * gain).astype(np.float32)
# Cache of styles derived from uploaded reference WAVs, keyed by file hash.
_REF_WAV_CACHE: Dict[str, Style] = {}
def _hash_file(path: str) -> str:
import hashlib
h = hashlib.sha1()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1 << 16), b""):
h.update(chunk)
return h.hexdigest()
def _env_truthy(name: str) -> bool:
return os.environ.get(name, "").strip().lower() in {"1", "true", "yes", "on"}
def _pt_marker_ok(marker_path: str, repo_id: str, stamp: str) -> bool:
if not os.path.exists(marker_path):
return False
try:
lines = open(marker_path, encoding="utf-8").read().splitlines()
except OSError:
return False
if len(lines) < 2:
return False
return lines[0].strip() == repo_id and lines[1].strip() == stamp
def _ensure_pt_weights() -> dict[str, str]:
"""Make sure v2 PyTorch/safetensors checkpoints are on disk."""
repo_id = os.environ.get("BLUE_PT_REPO", "notmax123/blue-v2")
stamp = os.environ.get("BLUE_PT_BUNDLE_STAMP", "1")
marker = os.path.join("pt_weights", ".repo_id")
force = _env_truthy("BLUE_PT_FORCE_DOWNLOAD") or not _pt_marker_ok(marker, repo_id, stamp)
needed: dict[str, Optional[str]] = {k: _find_pt_weight(v) for k, v in PT_WEIGHT_ALIASES.items()}
if force or any(v is None for v in needed.values()):
from huggingface_hub import hf_hub_download
import shutil
os.makedirs("pt_weights", exist_ok=True)
for fn in ("blue_codec.safetensors", "duration_predictor_final.safetensors",
"vf_estimetor.safetensors", "stats_multilingual.safetensors"):
dest = os.path.join("pt_weights", fn)
print(f"[INFO] Fetching {repo_id}/{fn} …")
cached = hf_hub_download(
repo_id=repo_id, filename=fn, repo_type="model",
token=os.environ.get("HF_TOKEN") or None,
force_download=force,
)
shutil.copy2(cached, dest)
with open(marker, "w", encoding="utf-8") as f:
f.write(repo_id + "\n" + stamp + "\n")
needed = {k: _find_pt_weight(v) for k, v in PT_WEIGHT_ALIASES.items()}
assert all(v is not None for v in needed.values()), f"still missing: {needed}"
return {k: v for k, v in needed.items() if v is not None} # type: ignore[misc]
def style_from_wav(ref_wav: str) -> Style:
"""Derive a voice Style from a reference WAV using export_new_voice.py."""
ckpts = _ensure_pt_weights()
from export_new_voice import export_voice_style
payload = export_voice_style(
ref_wav,
config=CONFIG_PATH,
ae_ckpt=ckpts["ae_ckpt"],
ttl_ckpt=ckpts["ttl_ckpt"],
dp_ckpt=ckpts["dp_ckpt"],
stats=ckpts["stats"],
device="cpu",
)
return style_from_dict(payload)
def _reference_audio_status(ref_wav: Optional[str]):
if not ref_wav:
return (
'<div class="ref-status muted">No reference uploaded — '
'using the saved voice above. Upload or record a clip to clone a custom voice.</div>'
)
try:
import soundfile as sf
info = sf.info(ref_wav)
dur = float(info.frames) / float(info.samplerate or 1)
channels = int(info.channels or 1)
if dur < 2.0:
level = "warn"
msg = "Too short for cloning; use at least 3 seconds."
elif dur > 20.0:
level = "warn"
msg = "Long clips work, but only the early frames are used. Trim to the cleanest 3-12 seconds."
elif channels > 2:
level = "warn"
msg = "Many channels detected; mono or stereo speech works best."
else:
level = "ok"
try:
cached = _hash_file(ref_wav) in _REF_WAV_CACHE
except Exception:
cached = False
if cached:
msg = "Cloned voice cached — next generation will be fast."
else:
msg = "Ready. First generation exports the voice (~20-40s); subsequent ones are instant."
return (
f'<div class="ref-status {level}">'
f'Reference: {dur:.1f}s, {info.samplerate} Hz, {channels} channel(s). {html.escape(msg)}'
'</div>'
)
except Exception as e:
return f'<div class="ref-status warn">Could not inspect uploaded audio: {html.escape(str(e))}</div>'
def synthesize_text(text: str, voice: str, lang: str, steps: int, speed: float,
ref_wav: Optional[str] = None,
progress: "gr.Progress | None" = gr.Progress()):
t0 = time.time()
using_ref = bool(ref_wav)
export_time = 0.0
if using_ref:
try:
cache_key = _hash_file(ref_wav)
if cache_key in _REF_WAV_CACHE:
if progress is not None:
progress(0.9, desc="Using cached cloned voice")
style = _REF_WAV_CACHE[cache_key]
else:
if progress is not None:
progress(
0.05,
desc="Exporting cloned voice (first time ~20-40s, cached after)",
)
t_exp = time.time()
style = style_from_wav(ref_wav)
export_time = time.time() - t_exp
_REF_WAV_CACHE[cache_key] = style
if progress is not None:
progress(0.6, desc="Synthesizing speech")
except Exception as e:
err = f'<div class="stats-bar"><span class="stat-pill">❌ voice clone failed: {e}</span></div>'
return None, err
else:
if not VOICE_STYLES:
err = (
'<div class="stats-bar"><span class="stat-pill">'
'No saved voices installed. Upload a reference clip to clone a voice.</span></div>'
)
return None, err
style = VOICE_STYLES[voice]
wav, sr = TTS.synthesize(
prepare_text_for_synthesis(text, lang=lang), lang=lang, style=style,
total_step=int(steps), speed=float(speed), cfg_scale=4.0,
pace_blend=None,
)
wav = normalize_generated_audio(np.asarray(wav).squeeze())
proc_time = time.time() - t0
audio_dur = len(wav) / sr if len(wav) > 0 else 0.0
rtf = proc_time / audio_dur if audio_dur > 0 else 0
export_pill = (
f'<span class="stat-pill">🧬 clone export {export_time:.1f}s</span>'
if using_ref and export_time > 0 else ''
)
stats = (
f'<div class="stats-bar">'
f'<span class="stat-pill">Voice: {"cloned from upload" if using_ref else html.escape(voice)}</span>'
f'{export_pill}'
f'<span class="stat-pill">⏱ {proc_time:.2f}s</span>'
f'<span class="stat-pill">🔊 {audio_dur:.1f}s audio</span>'
f'<span class="stat-pill">⚡ {rtf:.2f}x RTF</span>'
f'</div>'
)
return (sr, wav), stats
def phonemes_for_display(text: str, lang: str) -> str:
"""Return user-facing phonemes without internal <lang> routing tags."""
prepared = prepare_text_for_synthesis(text, lang=lang)
tagged = TTS.g2p.phonemize(prepared, lang=lang)
return strip_language_tags_for_display(tagged)
# ============================================================
# Voice-clone tab
# ============================================================
# Accept checkpoints from a handful of common locations (with the filename
# variants we've seen in the wild) so the clone tab works out of the box.
PT_WEIGHTS_SEARCH = [
"pt_weights",
"pt_models",
os.path.join("fonts", "pt_models"),
]
PT_WEIGHT_ALIASES: dict[str, list[str]] = {
"ae_ckpt": ["blue_codec.safetensors"],
"ttl_ckpt": ["vf_estimetor.safetensors"],
"dp_ckpt": ["duration_predictor_final.safetensors"],
"stats": ["stats_multilingual.safetensors"],
}
def _find_pt_weight(aliases: list[str]) -> Optional[str]:
for d in PT_WEIGHTS_SEARCH:
for name in aliases:
p = os.path.join(d, name)
if os.path.exists(p):
return p
return None
def _refresh_voices() -> None:
global VOICES, VOICE_STYLES
VOICES = discover_voices()
VOICE_STYLES = {name: load_voice_style([path]) for name, path in VOICES.items()}
def clone_voice(ref_wav: Optional[str], voice_name: str):
"""Export a new voice JSON from a reference WAV."""
if not ref_wav:
return "Please upload a reference WAV first.", gr.update()
if not voice_name.strip():
voice_name = f"custom_{int(time.time())}"
safe = re.sub(r"[^\w\-]+", "_", voice_name.strip())
out_path = os.path.join(VOICES_DIR, f"{safe}.json")
needed = _ensure_pt_weights()
from export_new_voice import export_voice_style
payload = export_voice_style(
ref_wav,
config=CONFIG_PATH,
ae_ckpt=needed["ae_ckpt"],
ttl_ckpt=needed["ttl_ckpt"],
dp_ckpt=needed["dp_ckpt"],
stats=needed["stats"],
device="cpu",
)
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
with open(out_path, "w") as f:
json.dump(payload, f)
_refresh_voices()
pretty = safe.replace("_", " ").title()
return (
f"Saved {out_path}. New voice '{pretty}' is now selectable in the Synthesize tab.",
gr.update(choices=list(VOICES.keys())),
)
# ============================================================
# Gradio UI (styling retained from previous version)
# ============================================================
EXAMPLES = [
["The power to change begins the moment you believe it's possible!", "en"],
["הכוח לשנות מתחיל ברגע שבו אתה מאמין שזה אפשרי!", "he"],
["¡El poder de cambiar comienza en el momento en que crees que es posible!", "es"],
["Il potere di cambiare inizia nel momento in cui credi che sia possibile!", "it"],
["Die Kraft zur Veränderung beginnt in dem Moment, in dem du glaubst, dass es möglich ist!", "de"],
]
def _load_font_face() -> str:
p = "fonts/EuclidCircularB.woff2"
if os.path.exists(p):
b64 = base64.b64encode(open(p, "rb").read()).decode()
return (
f"@font-face {{ font-family: 'EuclidCircularB'; "
f"src: url(data:font/woff2;base64,{b64}) format('woff2'); "
f"font-weight: 100 900; font-style: normal; }}"
)
return ""
css = _load_font_face() + """
@import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500&display=swap');
* { box-sizing: border-box; }
body, .gradio-container { background:#06101f !important; font-family:'EuclidCircularB',sans-serif !important; color:#e6efff !important; }
.gradio-container { max-width:900px !important; margin:0 auto !important; padding:2rem 1.5rem !important; }
.app-header { text-align:center; margin-bottom:2rem; padding:2rem 0 1rem; }
.app-header h1 { font-size:2.8rem; font-weight:600; letter-spacing:-0.03em; background:linear-gradient(135deg,#38bdf8 0%,#3b82f6 50%,#1d4ed8 100%); -webkit-background-clip:text; -webkit-text-fill-color:transparent; background-clip:text; margin:0 0 0.5rem; }
.app-header p { color:#7ea3d4; font-size:1rem; margin:0 0 1rem; }
.app-header .github-link { display:inline-flex; align-items:center; gap:0.4rem; margin-top:0.75rem; padding:0.45rem 1rem; font-size:0.9rem; font-weight:500; text-decoration:none !important; color:#93c5fd !important; border:1px solid #1e40af; border-radius:999px; background:rgba(59,130,246,0.12); }
.card { background:#0b1a30; border:1px solid #163056; border-radius:16px; padding:1.5rem; margin-bottom:1rem; }
.big-input textarea { background:#081327 !important; border:1px solid #1e3a66 !important; border-radius:10px !important; color:#e6efff !important; font-size:1.1rem !important; line-height:1.6 !important; padding:1rem !important; unicode-bidi:plaintext !important; }
.big-input textarea:focus { border-color:#3b82f6 !important; outline:none !important; box-shadow:0 0 0 3px rgba(59,130,246,0.18) !important; }
.controls-row { margin-top:1rem; display:flex !important; flex-direction:column !important; gap:0.75rem !important; }
.ctrl-row1, .ctrl-row2, .ctrl-row3 { display:flex !important; flex-direction:row !important; gap:0.75rem !important; width:100% !important; }
.ctrl-lang { flex:2 !important; min-width:0 !important; } .ctrl-voice { flex:3 !important; min-width:0 !important; }
.ctrl-steps, .ctrl-speed { flex:1 !important; min-width:0 !important; }
.gen-btn { background:linear-gradient(135deg,#2563eb,#1d4ed8) !important; border:none !important; border-radius:10px !important; color:#fff !important; font-size:1rem !important; font-weight:600 !important; padding:0.75rem 2rem !important; width:100% !important; margin-top:1rem !important; box-shadow:0 6px 18px rgba(37,99,235,0.35) !important; }
.gen-btn:hover { opacity:0.9 !important; filter:brightness(1.05); }
.gradio-audio { background:#0b1a30 !important; border:1px solid #163056 !important; border-radius:12px !important; }
.stats-bar { display:flex; gap:0.75rem; flex-wrap:wrap; margin-top:0.75rem; padding:0.75rem 0; }
.stat-pill { background:#0e2545; border:1px solid #1e40af; border-radius:20px; padding:0.3rem 0.9rem; font-family:'JetBrains Mono',monospace; font-size:0.8rem; color:#93c5fd; }
.gradio-dropdown select, .gradio-dropdown input { background:#081327 !important; border:1px solid #1e3a66 !important; color:#e6efff !important; border-radius:8px !important; }
.ref-panel { margin-top:1rem; padding:1rem; border:1px dashed #1e40af; border-radius:12px; background:#091a34; }
.ref-panel label { color:#bfdbfe !important; }
.ref-panel h3 { color:#dbeafe; margin:0 0 0.25rem; font-size:1rem; font-weight:600; }
.ref-status { margin-top:0.6rem; padding:0.75rem 0.9rem; border-radius:10px; font-size:0.9rem; line-height:1.4; }
.ref-status.ok { color:#bae6fd; background:rgba(14,165,233,0.12); border:1px solid rgba(14,165,233,0.35); }
.ref-status.warn { color:#fde68a; background:rgba(245,158,11,0.10); border:1px solid rgba(245,158,11,0.25); }
.ref-status.muted { color:#93a6c4; background:rgba(59,130,246,0.08); border:1px solid rgba(59,130,246,0.20); }
.ref-help { color:#7ea3d4; font-size:0.86rem; line-height:1.45; margin-top:0.5rem; }
"""
with gr.Blocks(title="BlueTTS V2 — Multilingual TTS") as demo:
gr.HTML(
'<div class="app-header"><h1>BlueTTS V2</h1>'
'<p>Slim multilingual text-to-speech · English · Hebrew · Spanish · German · Italian</p>'
'<a class="github-link" href="https://github.com/maxmelichov/BlueTTS" target="_blank">GitHub · maxmelichov/BlueTTS</a></div>'
)
with gr.Column(elem_classes="card"):
text_input = gr.Textbox(
label="Text", placeholder="Type or paste text here…",
lines=4, elem_classes="big-input",
value="Great ideas become real when a small team keeps building every single day.",
)
with gr.Column(elem_classes="controls-row"):
with gr.Row(elem_classes="ctrl-row1"):
lang_input = gr.Dropdown(
choices=[("English 🇺🇸", "en"), ("Hebrew 🇮🇱", "he"),
("Spanish 🇪🇸", "es"), ("German 🇩🇪", "de"),
("Italian 🇮🇹", "it")],
value="en", label="Language", elem_classes="ctrl-lang",
)
voice_input = gr.Dropdown(
choices=list(VOICES.keys()),
value=next(iter(VOICES.keys()), None),
label="Voice", elem_classes="ctrl-voice",
)
with gr.Row(elem_classes="ctrl-row2"):
steps_input = gr.Slider(5, 16, 8, step=1, label="Quality (steps)", elem_classes="ctrl-steps")
speed_input = gr.Slider(0.8, 1.2, 0.95, step=0.05, label="Speed", elem_classes="ctrl-speed")
with gr.Column(elem_classes="ref-panel"):
gr.HTML(
'<h3 style="color:#dbeafe;margin:0 0 0.25rem;font-size:1rem;font-weight:600;">Clone a voice (optional)</h3>'
'<div class="ref-help">Upload or record 3-12 seconds of clean speech to clone it. '
'Leave empty to use the saved voice selected above. Generation starts automatically when you upload. '
'<b>Heads up:</b> the first sentence with a new clone takes ~20-40s to export the voice — after that, regeneration is instant.</div>'
)
ref_wav_input = gr.Audio(
label="Reference audio",
sources=["upload", "microphone"], type="filepath",
)
ref_status = gr.HTML(_reference_audio_status(None))
btn = gr.Button("⚡ Generate Speech", elem_classes="gen-btn")
audio_out = gr.Audio(label="Output", type="numpy", autoplay=True)
stats_out = gr.HTML()
gr.Examples(examples=EXAMPLES, inputs=[text_input, lang_input], label="Examples")
synth_inputs = [text_input, voice_input, lang_input, steps_input, speed_input, ref_wav_input]
synth_outputs = [audio_out, stats_out]
def _auto_synth(text, voice, lang, steps, speed, ref_wav):
if not ref_wav:
return gr.update(), gr.update()
return synthesize_text(text, voice, lang, steps, speed, ref_wav)
ref_wav_input.change(
_reference_audio_status,
inputs=[ref_wav_input],
outputs=[ref_status],
).then(
_auto_synth,
inputs=synth_inputs,
outputs=synth_outputs,
)
btn.click(
synthesize_text,
inputs=synth_inputs,
outputs=synth_outputs,
)
gr.HTML("""
<script>
(function applyDirAuto() {
const ta = document.querySelector('.big-input textarea');
if (ta) { ta.setAttribute('dir', 'auto'); return; }
const obs = new MutationObserver(() => {
const ta = document.querySelector('.big-input textarea');
if (ta) { ta.setAttribute('dir', 'auto'); obs.disconnect(); }
});
obs.observe(document.body, { childList: true, subtree: true });
})();
</script>
""")
if __name__ == "__main__":
demo.launch(theme=gr.themes.Base(), css=css)