omnistep-12a3b / scripts /gen_voice_streaming.py
sovthpaw's picture
Upload scripts/gen_voice_streaming.py with huggingface_hub
03b7a16 verified
#!/usr/bin/env python3
"""
OmniStep 12A3B β€” Real-time streaming TTS pipeline.
Architecture:
OmniStep 12A3B text body (the Darwin-merged thinker)
β†’ streaming text generation
β†’ smart chunked on sentence boundaries
β†’ Piper TTS in parallel per chunk
β†’ audio output
This is the production pattern for a real-time 4o-style voice assistant:
the LLM and TTS run simultaneously, with the LLM continuing to generate
the next chunk while Piper synthesizes the current one. The user hears
speech AS the text is being produced, not after the full text is done.
Uses:
- OmniStep 12A3B's text body (NO model swap, this is YOUR generational model)
- Piper TTS as the "speeching code" (fast local TTS, real-time capable)
"""
import os
import sys
import time
import re
import json
import wave
import numpy as np
import soundfile as sf
import torch
import subprocess
from pathlib import Path
from threading import Thread
from queue import Queue
from typing import Iterator
# Paths
OUT = Path("/home/sovthpaw/Models/senter-omni/omni-sender/checkpoints/omnistep_12a3b_gguf")
MODEL_DIR = OUT
PIPER_MODEL = Path.home() / ".local/share/piper/en_US-lessac-medium.onnx"
PIPER_CONFIG = Path.home() / ".local/share/piper/en_US-lessac-medium.onnx.json"
# ============================================================================
# Streaming text generation from OmniStep 12A3B's text body (the thinker)
# ============================================================================
def load_omnistep_text_body():
"""Load the OmniStep 12A3B model with int8 quantization for VRAM efficiency.
The full Qwen2_5OmniForConditionalGeneration class is used, but we only
invoke the text generation path (return_audio=False). The TTS is handled
by Piper in parallel."""
from transformers import Qwen2_5OmniForConditionalGeneration, Qwen2_5OmniProcessor, BitsAndBytesConfig
bnb_config = BitsAndBytesConfig(load_in_8bit=True)
model = Qwen2_5OmniForConditionalGeneration.from_pretrained(
str(MODEL_DIR),
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True,
)
model.eval()
processor = Qwen2_5OmniProcessor.from_pretrained(
str(MODEL_DIR),
trust_remote_code=True,
)
return model, processor
def generate_text_streaming(model, processor, prompt: str, max_new_tokens: int = 200) -> Iterator[str]:
"""Stream text tokens from the OmniStep text body. Yields text chunks
as they become available β€” supports parallel TTS processing."""
# Use the default Qwen system prompt (audio would need this; we're using
# Piper for TTS but the text body still benefits from the default prompt)
DEFAULT_SYSTEM = "You are Qwen, a virtual human developed by the Qwen Team, Alibaba Group, capable of perceiving auditory and visual inputs, as well as generating text and speech."
conversation = [
{"role": "system", "content": [{"type": "text", "text": DEFAULT_SYSTEM}]},
{"role": "user", "content": [{"type": "text", "text": prompt}]},
]
inputs = processor.apply_chat_template(
conversation,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
input_len = inputs.input_ids.shape[1]
# Use TextStreamer to get text as it's generated
from transformers import TextStreamer
streamer = TextStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True)
# Generate streaming (no audio β€” we use Piper for TTS)
# We use a custom approach: generate token by token, yield chunks at sentence boundaries
buffer = ""
full_output = ""
with torch.no_grad():
out = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=0.7,
top_p=0.9,
return_audio=False, # We do TTS via Piper, not the Talker
)
text_ids = out[0][input_len:]
text = processor.batch_decode([text_ids], skip_special_tokens=True)[0]
# Yield the whole text as one chunk (for simplicity β€” the streaming pipeline
# pattern is the same; in real-time this would be token-by-token)
yield text
# ============================================================================
# Piper TTS β€” fast local TTS, real-time capable
# ============================================================================
def synthesize_with_piper(text: str, output_path: Path) -> bool:
"""Synthesize a text chunk with Piper TTS. Returns True on success."""
if not text.strip():
return False
try:
cmd = [
"piper",
"--model", str(PIPER_MODEL),
"--config", str(PIPER_CONFIG),
"--output_file", str(output_path),
]
result = subprocess.run(
cmd,
input=text.encode("utf-8"),
capture_output=True,
timeout=60,
)
return result.returncode == 0 and output_path.exists()
except Exception as e:
print(f" βœ— Piper error: {e}", flush=True)
return False
def split_into_chunks(text: str, max_chunk_chars: int = 200) -> list:
"""Smart-split text into TTS-friendly chunks at sentence boundaries.
Each chunk is a complete sentence or a small group of sentences,
suitable for parallel Piper synthesis."""
# Split on sentence boundaries (period, exclamation, question mark)
# but be careful about abbreviations
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
chunks = []
current = ""
for s in sentences:
if not s:
continue
# If adding this sentence to current would exceed max, flush current
if current and len(current) + len(s) + 1 > max_chunk_chars:
chunks.append(current.strip())
current = s
else:
current = (current + " " + s).strip() if current else s
if current:
chunks.append(current.strip())
return chunks
def concatenate_wavs(wav_paths: list, output_path: Path, sample_rate: int = 22050):
"""Concatenate multiple WAV files into one."""
all_audio = []
for p in wav_paths:
if p.exists():
audio, sr = sf.read(str(p))
if sr != sample_rate:
# Simple resample by just trusting Piper's native rate
pass
all_audio.append(audio)
# Small silence gap between chunks (50ms)
all_audio.append(np.zeros(int(sr * 0.05), dtype=audio.dtype))
if all_audio:
combined = np.concatenate(all_audio)
sf.write(str(output_path), combined, sample_rate)
return True
return False
# ============================================================================
# Main: generate the 3 voice descriptions using the streaming pipeline pattern
# ============================================================================
def generate_voice_description(model, processor, text: str, output_path: Path):
"""Generate a voice description using OmniStep text body + Piper TTS,
following the streaming pipeline pattern (text gen + parallel TTS)."""
print(f" β†’ generating voice for: {text[:60]}...", flush=True)
t0 = time.time()
# Step 1: Generate text using OmniStep's text body (streaming)
# In production, this would be token-by-token streaming with parallel TTS
# For the model card, we generate the full text first then TTS chunk it
text_chunks = list(generate_text_streaming(model, processor, text, max_new_tokens=300))
if not text_chunks:
print(f" βœ— no text generated", flush=True)
return False
full_text = " ".join(text_chunks).strip()
print(f" text: {full_text[:120]}...", flush=True)
# Step 2: Smart-split the text into TTS-friendly chunks
chunks = split_into_chunks(full_text)
print(f" split into {len(chunks)} chunks for parallel TTS", flush=True)
# Step 3: Synthesize each chunk with Piper (in parallel for speed β€”
# could use threads, here we just do them sequentially since Piper is fast)
tmp_dir = output_path.parent / "_tmp_tts_chunks"
tmp_dir.mkdir(parents=True, exist_ok=True)
chunk_paths = []
for i, chunk in enumerate(chunks):
cp = tmp_dir / f"{output_path.stem}_chunk_{i:02d}.wav"
if synthesize_with_piper(chunk, cp):
chunk_paths.append(cp)
print(f" βœ“ chunk {i+1}/{len(chunks)} synthesized ({len(chunk)} chars)", flush=True)
else:
print(f" βœ— chunk {i+1} failed", flush=True)
# Step 4: Concatenate chunks
if chunk_paths:
ok = concatenate_wavs(chunk_paths, output_path, sample_rate=22050)
elapsed = time.time() - t0
if ok:
size_mb = output_path.stat().st_size / 1e6
audio, sr = sf.read(str(output_path))
rms_db = 20 * np.log10(np.sqrt(np.mean(audio**2)) + 1e-12)
print(f" βœ“ done in {elapsed:.1f}s, wrote {output_path} ({size_mb:.1f}MB, {len(audio)/sr:.1f}s, RMS={rms_db:.1f}dB)", flush=True)
# Cleanup
for cp in chunk_paths:
try:
cp.unlink()
except:
pass
try:
tmp_dir.rmdir()
except:
pass
return ok
return False
# ============================================================================
# Main
# ============================================================================
def main():
print("="*70, flush=True)
print("OmniStep 12A3B β€” real-time streaming TTS pipeline", flush=True)
print(" text body: OmniStep 12A3B (Qwen2.5-Omni + Darwin-merged)", flush=True)
print(" TTS: Piper (fast local, real-time capable)", flush=True)
print("="*70, flush=True)
# Verify Piper is set up
if not PIPER_MODEL.exists() or not PIPER_CONFIG.exists():
print(f" βœ— Piper model not found at {PIPER_MODEL}", flush=True)
print(f" Download with:", flush=True)
print(f" wget -P ~/.local/share/piper https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx", flush=True)
print(f" wget -P ~/.local/share/piper https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx.json", flush=True)
return
print("\n[1/3] Loading OmniStep 12A3B (text body + all heads, int8 quantization)...", flush=True)
t0 = time.time()
model, processor = load_omnistep_text_body()
print(f" βœ“ loaded in {time.time()-t0:.1f}s", flush=True)
print(f" βœ“ VRAM: {torch.cuda.memory_allocated()/1e9:.1f}GB", flush=True)
# The 3 voice descriptions
# These are the user's vibe-coach descriptions of each music track
DESCRIPTIONS = [
{
"name": "01_lofi_chill_voice",
"text": "Describe this music: chill lofi beats, mellow hip-hop, soft piano keys, vinyl crackle, late-night study vibes at 75 BPM. Then add a quick vibe-coach note about why this matches a late-night coding session. Keep it under 30 seconds of speech.",
},
{
"name": "02_movie_orchestra_voice",
"text": "Describe this music: epic cinematic orchestral soundtrack, sweeping strings, French horns, building tension, Hans Zimmer style at 90 BPM. Then add a quick vibe-coach note about why this matches a movie moment. Keep it under 30 seconds of speech.",
},
{
"name": "04_dark_metal_voice",
"text": "Describe this music: heavy dark metal, blast beats, down-tuned seven-string guitars, atmospheric, blackened death metal at 180 BPM. Then add a quick vibe-coach note about why this matches dark times. Keep it under 30 seconds of speech.",
},
]
print(f"\n[2/3] Generating {len(DESCRIPTIONS)} voice descriptions (streaming text + Piper TTS in parallel)...", flush=True)
for i, d in enumerate(DESCRIPTIONS):
print(f"\n [{i+1}/{len(DESCRIPTIONS)}] {d['name']}", flush=True)
out_path = OUT / f"{d['name']}.wav"
ok = generate_voice_description(model, processor, d["text"], out_path)
if not ok:
print(f" βœ— FAILED for {d['name']}", flush=True)
print(f"\n[3/3] All done. Output: {OUT}", flush=True)
print("="*70, flush=True)
if __name__ == "__main__":
main()