BexttsStream / appStreamPlayer.py
archivartaunik's picture
Rename app.py to appStreamPlayer.py
953060f verified
# Калі запускаеце ў чыстым асяроддзі (раскаментуйце):
# !pip install -q gradio spaces huggingface_hub torch scipy gitpython
import os
import sys
import re
import time
import base64
import hashlib
import tempfile
import subprocess
import inspect
from typing import Iterator, Iterable, Optional, Tuple, Any, List
import spaces
import gradio as gr
import torch
import numpy as np
from huggingface_hub import hf_hub_download
from scipy.io.wavfile import write
# ---------------------------------------------------------
# 1) Клануем і падключаем coqui-ai-TTS (fork з падтрымкай BE)
# ---------------------------------------------------------
REPO_URL = "https://github.com/tuteishygpt/coqui-ai-TTS.git"
REPO_DIR = "coqui-ai-TTS"
if not os.path.exists(REPO_DIR):
subprocess.run(["git", "clone", REPO_URL, REPO_DIR], check=True)
repo_root = os.path.abspath(REPO_DIR)
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
from TTS.tts.layers.xtts.tokenizer import VoiceBpeTokenizer
# ---------------------------------------------------------
# 2) Файлы мадэлі
# ---------------------------------------------------------
repo_id = "archivartaunik/BE_XTTS_V2_10ep250k"
model_dir = "./model"
os.makedirs(model_dir, exist_ok=True)
checkpoint_file = os.path.join(model_dir, "model.pth")
config_file = os.path.join(model_dir, "config.json")
vocab_file = os.path.join(model_dir, "vocab.json")
default_voice_file = os.path.join(model_dir, "voice.wav")
if not os.path.exists(checkpoint_file):
hf_hub_download(repo_id, filename="model.pth", local_dir=model_dir)
if not os.path.exists(config_file):
hf_hub_download(repo_id, filename="config.json", local_dir=model_dir)
if not os.path.exists(vocab_file):
hf_hub_download(repo_id, filename="vocab.json", local_dir=model_dir)
if not os.path.exists(default_voice_file):
hf_hub_download(repo_id, filename="voice.wav", local_dir=model_dir)
# ---------------------------------------------------------
# 3) Загрузка мадэлі і токенайзера
# ---------------------------------------------------------
config = XttsConfig()
config.load_json(config_file)
XTTS_MODEL = Xtts.init_from_config(config)
XTTS_MODEL.load_checkpoint(
config,
checkpoint_path=checkpoint_file,
vocab_path=vocab_file,
use_deepspeed=False,
)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
XTTS_MODEL.to(device).eval()
sampling_rate = int(XTTS_MODEL.config.audio["sample_rate"])
tokenizer = VoiceBpeTokenizer(vocab_file=vocab_file)
XTTS_MODEL.tokenizer = tokenizer
# =========================================================
# 4) Streaming-канфіг (мінімальная затрымка)
# =========================================================
MIN_BUFFER_S = 0.03 # ~30 мс — хутчэйшы старт
FADE_S = 0.004 # карацейшы cross-fade
TOKENS_PER_STEP = 1 # крок прэфікса ў fallback (бліжэй да «па токенах»)
def _seconds_to_samples(sec: float, sr: int) -> int:
return max(1, int(sec * sr))
def _to_np_audio(x) -> np.ndarray:
"""Гарантавана вяртае 1D np.float32 і пераносіць з CUDA на CPU пры патрэбе."""
if isinstance(x, dict) and "wav" in x:
x = x["wav"]
if isinstance(x, torch.Tensor):
if x.dtype != torch.float32:
x = x.float()
x = x.detach().cpu().contiguous().view(-1)
return x.numpy()
x = np.asarray(x)
if x.ndim > 1:
x = x.reshape(-1)
if x.dtype != np.float32:
x = x.astype(np.float32, copy=False)
return x
def _crossfade_concat(a: np.ndarray, b: np.ndarray, sr: int, fade_s: float) -> np.ndarray:
"""Плыўнае зліццё без клікаў."""
if a.size == 0:
return b.astype(np.float32, copy=False)
if b.size == 0:
return a.astype(np.float32, copy=False)
a = a.astype(np.float32, copy=False)
b = b.astype(np.float32, copy=False)
fade_n = min(_seconds_to_samples(fade_s, sr), a.size, b.size)
if fade_n <= 1:
return np.concatenate([a, b], axis=0)
fade_out = np.linspace(1.0, 0.0, fade_n, endpoint=True, dtype=np.float32)
fade_in = 1.0 - fade_out
head = a[:-fade_n]
tail = (a[-fade_n:] * fade_out) + (b[:fade_n] * fade_in)
rest = b[fade_n:]
return np.concatenate([head, tail, rest], axis=0)
def _bpe_prefixes(text: str, lang: str, step_tokens: int):
"""Генерацыя прэфіксаў па BPE; калі encode недаступны — fallback на словы/прабелы."""
try:
ids = tokenizer.encode(text, lang=lang)
n = len(ids)
for k in range(step_tokens, n + 1, step_tokens):
yield tokenizer.decode(ids[:k], lang=lang)
if n % step_tokens != 0:
yield tokenizer.decode(ids, lang=lang)
return
except Exception:
pass
pseudo_tokens = re.findall(r"\S+|\s+", text)
acc = ""
for i in range(0, len(pseudo_tokens), step_tokens):
acc = "".join(pseudo_tokens[: i + step_tokens])
yield acc
if acc.strip() != text.strip():
yield text
def _native_stream(
model: Xtts,
text: str,
language: str,
gpt_cond_latent: Any,
speaker_embedding: Any,
**gen_kwargs,
) -> Iterator[np.ndarray]:
"""Натыўны паток з model.inference_stream(...), калі ён ёсць у форку."""
sig = inspect.signature(model.inference_stream)
call_kwargs = dict(
text=text,
language=language,
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
)
# перадаём параметры генерацыі, калі яны падтрымліваюцца
for k in ("temperature", "length_penalty", "repetition_penalty", "top_k", "top_p", "stream_chunk_size_s"):
if k in gen_kwargs and k in sig.parameters:
call_kwargs[k] = gen_kwargs[k]
generator = model.inference_stream(**call_kwargs)
for out in generator:
yield _to_np_audio(out)
def _fallback_incremental(
model: Xtts,
text: str,
language: str,
gpt_cond_latent: Any,
speaker_embedding: Any,
tokens_per_step: int,
**gen_kwargs,
) -> Iterator[np.ndarray]:
"""Fallback: павялічваем прэфікс па токенах і выдаём толькі «новую» аўдыя-частку."""
emitted = 0
for prefix in _bpe_prefixes(text, language, tokens_per_step):
with torch.no_grad():
out = model.inference(
text=prefix,
language=language,
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
temperature=gen_kwargs.get("temperature", 0.1),
length_penalty=gen_kwargs.get("length_penalty", 1.0),
repetition_penalty=gen_kwargs.get("repetition_penalty", 10.0),
top_k=gen_kwargs.get("top_k", 10),
top_p=gen_kwargs.get("top_p", 0.3),
)
wav = _to_np_audio(out)
new_part = wav[emitted:]
emitted = wav.size
if new_part.size:
yield new_part
class NewTTSGenerationMixin:
"""Дадаем Xtts.generate()/sample_stream() у духу transformers-stream-generator."""
@torch.inference_mode()
def generate(
self: Xtts,
text: Optional[str] = None,
*,
do_stream: bool = False,
language: str = "be",
gpt_cond_latent: Any = None,
speaker_embedding: Any = None,
min_buffer_s: float = MIN_BUFFER_S,
tokens_per_step: int = TOKENS_PER_STEP,
**gen_kwargs,
):
assert isinstance(text, str) and text.strip(), "text is required"
if not do_stream:
out = self.inference(
text=text,
language=language,
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
temperature=gen_kwargs.get("temperature", 0.1),
length_penalty=gen_kwargs.get("length_penalty", 1.0),
repetition_penalty=gen_kwargs.get("repetition_penalty", 10.0),
top_k=gen_kwargs.get("top_k", 10),
top_p=gen_kwargs.get("top_p", 0.3),
)
return _to_np_audio(out)
# ВАЖНА: НЕ дадаём min_buffer_s у gen_kwargs, каб не было дублікату.
# stream_chunk_size_s будзе пракінута далей у sample_stream().
return self.sample_stream(
text=text,
language=language,
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
min_buffer_s=min_buffer_s,
tokens_per_step=tokens_per_step,
**gen_kwargs,
)
@torch.inference_mode()
def sample_stream(
self: Xtts,
*,
text: str,
language: str,
gpt_cond_latent: Any,
speaker_embedding: Any,
min_buffer_s: float = MIN_BUFFER_S,
tokens_per_step: int = TOKENS_PER_STEP,
**gen_kwargs,
) -> Iterator[np.ndarray]:
# Каб не дубляваць stream_chunk_size_s, ставім яго толькі калі не перададзены звонку
local_kwargs = dict(gen_kwargs)
local_kwargs.setdefault("stream_chunk_size_s", float(min_buffer_s))
if hasattr(self, "inference_stream"):
for chunk in _native_stream(
self,
text,
language,
gpt_cond_latent,
speaker_embedding,
**local_kwargs,
):
yield chunk
return
for chunk in _fallback_incremental(
self,
text,
language,
gpt_cond_latent,
speaker_embedding,
tokens_per_step,
**gen_kwargs,
):
yield chunk
def init_stream_support():
Xtts.generate = NewTTSGenerationMixin.generate
Xtts.sample_stream = NewTTSGenerationMixin.sample_stream
init_stream_support()
# ---------------------------------------------------------
# 5) Кэш латэнтаў голасу (скарачае старт-латэнтнасць)
# ---------------------------------------------------------
LATENT_CACHE: dict[str, Tuple[Any, Any]] = {}
def _latents_for(path: str) -> Tuple[Any, Any]:
if path and os.path.exists(path):
key = f"{path}:{os.path.getmtime(path)}:{os.path.getsize(path)}"
else:
key = "default_voice"
h = hashlib.md5(key.encode("utf-8")).hexdigest()
if h not in LATENT_CACHE:
g, s = XTTS_MODEL.get_conditioning_latents(
audio_path=path,
gpt_cond_len=XTTS_MODEL.config.gpt_cond_len,
max_ref_length=XTTS_MODEL.config.max_ref_len,
sound_norm_refs=XTTS_MODEL.config.sound_norm_refs,
)
LATENT_CACHE[h] = (g, s)
return LATENT_CACHE[h]
# ---------------------------------------------------------
# 6) Хэлперы: буферы + base64
# ---------------------------------------------------------
def _merge_for_file(chunks: List[np.ndarray]) -> np.ndarray:
if not chunks:
return np.zeros((0,), dtype=np.float32)
out = chunks[0]
for i in range(1, len(chunks)):
out = _crossfade_concat(out, chunks[i], sampling_rate, FADE_S)
return out
def _chunker(chunks: Iterable[np.ndarray], sr: int, target_s: float) -> Iterable[np.ndarray]:
"""Мінімальная групоўка да ~target_s (30 мс) — баланс затрымкі/гладкасці."""
target_samples = _seconds_to_samples(target_s, sr)
buf = np.zeros((0,), dtype=np.float32)
for c in chunks:
c = _to_np_audio(c)
if c.size == 0:
continue
buf = c if buf.size == 0 else _crossfade_concat(buf, c, sr, FADE_S)
if buf.size >= target_samples:
yield buf
buf = np.zeros((0,), dtype=np.float32)
if buf.size:
yield buf
def _pcm_f32_to_b64(x: np.ndarray) -> str:
if x.dtype != np.float32:
x = x.astype(np.float32, copy=False)
return base64.b64encode(x.tobytes()).decode("ascii")
# ---------------------------------------------------------
# 7) Асноўная функцыя TTS — стрим (base64 у схаваны канал) + фінальны файл (gr.File)
# ---------------------------------------------------------
@spaces.GPU(duration=60)
def text_to_speech(belarusian_story, speaker_audio_file=None):
"""
Выхады:
1) stream_pipe (hidden Textbox) — base64(PCM float32) па кроках, у фінале "__STOP__" (EOS)
2) final_file (File) — толькі ў фінале шлях да WAV (не спыняе стрим)
"""
if not belarusian_story or str(belarusian_story).strip() == "":
raise gr.Error("Увядзі хоць нейкі тэкст 🙂")
# Голас па змаўчанні
if not speaker_audio_file or (
not isinstance(speaker_audio_file, str)
and getattr(speaker_audio_file, "name", "") == ""
):
speaker_audio_file = default_voice_file
# Conditioning latents (з кэшем)
try:
gpt_cond_latent, speaker_embedding = _latents_for(speaker_audio_file)
except Exception as e:
raise gr.Error(f"Памылка пры атрыманні латэнтаў голасу: {e}")
# Генератар гуку (НЕ перадаём тут stream_chunk_size_s — ён будзе пастаўлены ў sample_stream праз setdefault)
gen = XTTS_MODEL.generate(
text=str(belarusian_story).strip(),
do_stream=True,
language="be",
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
min_buffer_s=MIN_BUFFER_S,
tokens_per_step=TOKENS_PER_STEP,
temperature=0.1,
length_penalty=1.0,
repetition_penalty=10.0,
top_k=10,
top_p=0.3,
)
full_audio_chunks: List[np.ndarray] = []
# Струменім невялікімі буферамі ў схаваны канал
for buf in _chunker(gen, sampling_rate, MIN_BUFFER_S):
full_audio_chunks.append(buf)
yield (_pcm_f32_to_b64(buf), None)
# Фінал: файл + EOS (НЕ спыняем плэер адразу — ён даіграе чаргу)
if not full_audio_chunks:
yield ("__STOP__", None)
return
full_audio = _merge_for_file(full_audio_chunks)
try:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
write(tmp.name, sampling_rate, full_audio.astype(np.float32))
yield ("__STOP__", tmp.name)
except Exception as e:
raise gr.Error(f"Памылка пры запісе фінальнага WAV: {e}")
# ---------------------------------------------------------
# 8) UI: аўта-Play пры "Згенераваць" + лагі старт-латэнтнасці
# ---------------------------------------------------------
examples = [
["Прывітанне! Гэта праверка жывога струменя беларускага TTS.", "Nestarka.wav"],
]
with gr.Blocks() as demo:
gr.Markdown("## Belarusian TTS — Streaming па токенах (WebAudio) + фінальны файл")
with gr.Row():
inp_text = gr.Textbox(lines=5, label="Тэкст на беларускай мове")
inp_voice = gr.Audio(type="filepath", label="Прыклад голасу (7+ сек)", interactive=True)
with gr.Row():
play_btn = gr.Button("▶️ Play")
stop_btn = gr.Button("⏹ Stop")
gr.Markdown(f"**Sample rate:** {sampling_rate} Hz")
# Панэль лагавання (паказваем затрымкі і статус)
log_panel = gr.HTML(
value='<div id="wa-log" style="font-family:system-ui;font-size:12px;white-space:pre-line">[лог пусты]</div>',
label="Лагі плэера",
)
# Схаваны канал для стриму base64-чанкаў і фінальны файл (File, каб не перарываць прайграванне)
stream_pipe = gr.Textbox(value="", visible=False, label="stream_pipe")
final_file = gr.File(label="Згенераваны WAV (спампаваць)")
run_btn = gr.Button("Згенераваць")
# --- JS: ініт + reset + auto-play + лагі ---
INIT_RESET_AND_PLAY_JS = f"""
() => {{
const sampleRate = {sampling_rate};
const AC = window.AudioContext || window.webkitAudioContext;
if (!AC) return;
// helper для лагу
function logUpdate() {{
const el = document.getElementById('wa-log');
if (!el || !window.__wa || !window.__wa.meta) return;
const m = window.__wa.meta;
const lines = [];
if (m.t_click_ms) lines.push("Клік (Згенераваць): " + m.t_click_ms.toFixed(1) + " ms");
if (m.t_first_push_ms) {{
lines.push("Першы чанк прыйшоў: " + m.t_first_push_ms.toFixed(1) + " ms");
if (m.t_click_ms) lines.push(" Затрымка (клік→чанк): " + (m.t_first_push_ms - m.t_click_ms).toFixed(1) + " ms");
}}
if (m.t_first_audio_ms) {{
lines.push("Пачатак прайгравання: " + m.t_first_audio_ms.toFixed(1) + " ms");
if (m.t_click_ms) lines.push(" Затрымка (клік→аўдыя): " + (m.t_first_audio_ms - m.t_click_ms).toFixed(1) + " ms");
if (m.t_first_push_ms) lines.push(" Затрымка (чанк→аўдыя): " + (m.t_first_audio_ms - m.t_first_push_ms).toFixed(1) + " ms");
}}
if (window.__wa.eos) lines.push("EOS: сервер перастаўляў чанкі; даігрываем чаргу…");
lines.push("Статус: " + (window.__wa.playing ? "playing" : "stopped"));
el.textContent = lines.join("\\n");
try {{ console.log(lines.join("\\n")); }} catch (e) {{}}
}}
if (!window.__wa) {{
const ctx = new AC({{ sampleRate }});
const bufferSize = 1024; // меншы буфер -> менш затрымкі (512 яшчэ хутчэй, але цяжэй CPU)
const node = ctx.createScriptProcessor(bufferSize, 0, 1);
let queue = [];
let playing = false;
let eos = false;
const meta = {{
t_click_ms: performance.now(),
t_first_push_ms: null,
t_first_audio_ms: null,
}};
node.onaudioprocess = (e) => {{
const out = e.outputBuffer.getChannelData(0);
let i = 0;
while (i < out.length) {{
if (queue.length === 0 || !playing) {{ out[i++] = 0.0; continue; }}
let cur = queue[0];
const take = Math.min(cur.length, out.length - i);
if (meta.t_first_audio_ms === null) {{
meta.t_first_audio_ms = performance.now();
logUpdate();
}}
out.set(cur.subarray(0, take), i);
i += take;
if (take === cur.length) queue.shift();
else queue[0] = cur.subarray(take);
}}
if (eos && queue.length === 0 && playing) {{
playing = false; // спыняемся толькі калі даігралі ўсё
logUpdate();
}}
}};
node.connect(ctx.destination);
window.__wa = {{
ctx, node,
get playing() {{ return playing; }},
get eos() {{ return eos; }},
set eos(v) {{ eos = v; }},
meta,
push: (f32) => {{ queue.push(f32); }},
start: async () => {{ try {{ await ctx.resume(); }} catch(e){{}} playing = true; logUpdate(); }},
stop: () => {{ playing = false; logUpdate(); }},
reset: () => {{ playing = false; eos = false; queue = []; meta.t_first_push_ms = null; meta.t_first_audio_ms = null; logUpdate(); }},
updateLog: logUpdate,
}};
}} else {{
window.__wa.reset();
window.__wa.meta.t_click_ms = performance.now();
}}
window.__wa.start(); // аўтаматычна запускаем Play
}}
"""
STOP_JS = "() => { if (window.__wa) window.__wa.stop(); }"
PLAY_JS = "() => { if (window.__wa) window.__wa.start(); }"
# Base64 -> Float32 + лагі; "__STOP__" — EOS (не стоп адразу)
PUSH_JS = """
(b64) => {
if (!window.__wa || !b64) return;
const meta = window.__wa.meta || {};
if (b64 === "__STOP__") {
window.__wa.eos = true; // адзначаем канец струменя
window.__wa.updateLog && window.__wa.updateLog();
return;
}
if (!meta.t_first_push_ms) {
meta.t_first_push_ms = performance.now();
window.__wa.updateLog && window.__wa.updateLog();
}
// b64 PCM Float32 -> Float32Array
const bin = atob(b64);
const len = bin.length;
const buf = new ArrayBuffer(len);
const view = new Uint8Array(buf);
for (let i=0;i<len;i++) view[i] = bin.charCodeAt(i);
const f32 = new Float32Array(buf);
window.__wa.push(f32);
}
"""
# Ручныя кнопкі
play_btn.click(fn=None, inputs=[], outputs=[], js=PLAY_JS)
stop_btn.click(fn=None, inputs=[], outputs=[], js=STOP_JS)
# Аўта-ініт+reset+play перад стартам сервера
run_btn.click(fn=None, inputs=[], outputs=[], js=INIT_RESET_AND_PLAY_JS)
# Стрымінг (сервер: base64 у схаваны канал, файл у gr.File)
run_btn.click(
fn=text_to_speech,
inputs=[inp_text, inp_voice],
outputs=[stream_pipe, final_file],
)
# Кожнае абнаўленне схаванага канала — пуш у WebAudio
stream_pipe.change(fn=None, inputs=[stream_pipe], outputs=[], js=PUSH_JS)
# Прыклады: толькі падстаўляем увод; генерацыя запускаецца кнопкай "Згенераваць"
gr.Examples(
examples=examples,
inputs=[inp_text, inp_voice],
fn=None,
cache_examples=False,
)
if __name__ == "__main__":
demo.launch()