import os import time import threading from pathlib import Path from typing import Iterator import gradio as gr import numpy as np import soundfile as sf import librosa import torch from transformers import set_seed from vibevoice.modular.modeling_vibevoice_inference import ( VibeVoiceForConditionalGenerationInference, ) from vibevoice.processor.vibevoice_processor import VibeVoiceProcessor from vibevoice.modular.streamer import AudioStreamer MODEL_ID = "microsoft/VibeVoice-1.5B" def convert_to_16bit(data: np.ndarray) -> np.ndarray: if torch.is_tensor(data): data = data.detach().cpu().numpy() data = np.array(data, dtype=np.float32, copy=False) amax = np.max(np.abs(data)) if data.size else 1.0 if amax > 1.0: data = data / amax return (data * 32767.0).astype(np.int16) def read_audio(path: str, target_sr: int = 24000) -> np.ndarray: wav, sr = sf.read(path) if wav.ndim > 1: wav = wav.mean(axis=1) if sr != target_sr: wav = librosa.resample(wav, orig_sr=sr, target_sr=target_sr) return wav.astype(np.float32) class VibeMiniDemo: def __init__(self, model_path: str, device: str = "cuda", inference_steps: int = 10): self.model_path = model_path self.device = device self.inference_steps = inference_steps self._stop = False self._streamer = None self._load() def _load(self): print(f"πŸ”„ Loading VibeVoice from {self.model_path} ...") # Processor pulls tokenizer/config from HF automatically if model_path is a repo id self.processor = VibeVoiceProcessor.from_pretrained(self.model_path) # Try flash-attn2 first; fall back to SDPA if the env doesn’t have it try: self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( self.model_path, torch_dtype=torch.bfloat16, device_map="cuda" if torch.cuda.is_available() else "cpu", attn_implementation="flash_attention_2", ) except Exception as e: print(f"⚠️ flash_attention_2 unavailable ({type(e).__name__}: {e}); falling back to SDPA") self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( self.model_path, torch_dtype=torch.bfloat16, device_map="cuda" if torch.cuda.is_available() else "cpu", attn_implementation="sdpa", ) self.model.eval() # Configure diffusion steps (matches upstream demo defaults) self.model.model.noise_scheduler = self.model.model.noise_scheduler.from_config( self.model.model.noise_scheduler.config, algorithm_type="sde-dpmsolver++", beta_schedule="squaredcos_cap_v2", ) self.model.set_ddpm_inference_steps(num_steps=self.inference_steps) print("βœ… Model ready") def stop(self): self._stop = True if self._streamer is not None: try: self._streamer.end() except Exception as e: print(f"stop error: {e}") def generate_stream( self, script: str, voice_files: list[str], cfg_scale: float = 1.3, ) -> Iterator[tuple]: if not script.strip(): yield None, None, "❌ Please provide a script.", gr.update(visible=False) return # Load voice samples (1..4) voice_samples = [read_audio(p) for p in voice_files if p] if not voice_samples: yield None, None, "❌ Provide at least one voice sample (WAV/MP3/etc).", gr.update(visible=False) return # Normalize speaker labels if user didn’t prefix lines lines = [] for i, raw in enumerate([ln for ln in script.splitlines() if ln.strip()]): if raw.lower().startswith("speaker") and ":" in raw: lines.append(raw) else: lines.append(f"Speaker {i % max(1, len(voice_samples))}: {raw}") formatted = "\n".join(lines) # Pack inputs inputs = self.processor( text=[formatted], voice_samples=[voice_samples], padding=True, return_tensors="pt", return_attention_mask=True, ) self._stop = False streamer = AudioStreamer(batch_size=1, stop_signal=None, timeout=None) self._streamer = streamer # Kick off generation on a worker thread def _worker(): try: self.model.generate( **inputs, max_new_tokens=None, cfg_scale=cfg_scale, tokenizer=self.processor.tokenizer, generation_config={"do_sample": False}, audio_streamer=streamer, stop_check_fn=lambda: self._stop, verbose=False, refresh_negative=True, ) except Exception as e: print(f"gen error: {e}") streamer.end() t = threading.Thread(target=_worker, daemon=True) t.start() # Stream chunks out sr = 24000 all_chunks, pending = [], [] last_yield = time.time() min_chunk = sr * 30 # ~30s per push feels smooth for Spaces audio min_interval = 15.0 # or every 15s if chunks are small stream0 = streamer.get_stream(0) got_any = False yielded_any = False chunk_idx = 0 log_prefix = f"πŸŽ™οΈ VibeVoice streaming (CFG={cfg_scale})\n" for chunk in stream0: if self._stop: streamer.end() break got_any = True chunk_idx += 1 if torch.is_tensor(chunk): if chunk.dtype == torch.bfloat16: chunk = chunk.float() audio_np = chunk.cpu().numpy().astype(np.float32) else: audio_np = np.asarray(chunk, dtype=np.float32) if audio_np.ndim > 1: audio_np = audio_np.squeeze(-1) pcm16 = convert_to_16bit(audio_np) all_chunks.append(pcm16) pending.append(pcm16) need_push = False if not yielded_any and sum(len(c) for c in pending) >= min_chunk: need_push = True yielded_any = True elif yielded_any and ( sum(len(c) for c in pending) >= min_chunk or (time.time() - last_yield) >= min_interval ): need_push = True if need_push and pending: new_audio = np.concatenate(pending) total_sec = sum(len(c) for c in all_chunks) / sr msg = log_prefix + f"🎡 {total_sec:.1f}s generated (chunk {chunk_idx})" yield (sr, new_audio), None, msg, gr.update(visible=True) pending, last_yield = [], time.time() # Flush any remainder if pending: final = np.concatenate(pending) total_sec = sum(len(c) for c in all_chunks) / sr yield (sr, final), None, log_prefix + f"🎡 final chunk: {total_sec:.1f}s", gr.update(visible=True) yielded_any = True # Join worker quickly; then deliver full take t.join(timeout=5.0) self._streamer = None if not got_any: yield None, None, "❌ No audio chunks received from the model.", gr.update(visible=False) return if all_chunks: complete = np.concatenate(all_chunks) final_sec = len(complete) / sr msg = f"βœ… Done. Total: {final_sec:.1f}s" yield None, (sr, complete), msg, gr.update(visible=False) def build_ui(demo: VibeMiniDemo): with gr.Blocks(title="VibeVoice – Minimal") as app: gr.Markdown("## πŸŽ™οΈ VibeVoice β€” Minimal Space\nProvide a script and 1–4 short voice samples.") with gr.Row(): with gr.Column(): script = gr.Textbox( label="Script", value="Speaker 0: Welcome to VibeVoice!\nSpeaker 0: This is a minimal Space demo.", lines=8, ) cfg = gr.Slider(1.0, 2.0, step=0.05, value=1.3, label="CFG Scale") voices = gr.Files( label="Voice samples (WAV/MP3/FLAC/OGG/M4A/AAC) β€” 1 to 4 files", file_count="multiple", type="filepath", ) with gr.Row(): go = gr.Button("πŸš€ Generate") stop = gr.Button("πŸ›‘ Stop", variant="stop") with gr.Column(): live = gr.Audio(label="Live Stream", streaming=True, autoplay=True) full = gr.Audio(label="Complete Take (downloadable)") log = gr.Textbox(label="Log", interactive=False) badge = gr.HTML(visible=False, value="""
LIVE STREAMING
""") def on_go(script, cfg, voices): paths = [f.name if hasattr(f, "name") else f for f in (voices or [])][:4] # Clear outputs first yield None, gr.update(value=None), "⏳ Starting…", gr.update(visible=True) # Stream generation for s_chunk, full_take, msg, badge_vis in demo.generate_stream( script=script, voice_files=paths, cfg_scale=cfg, ): if full_take is not None: # final: hide live, show full yield None, full_take, msg, gr.update(visible=False) else: # live streaming yield s_chunk, gr.update(), msg, badge_vis go.click( on_go, inputs=[script, cfg, voices], outputs=[live, full, log, badge], ) def on_stop(): demo.stop() return "πŸ›‘ Stopped.", gr.update(visible=False) stop.click(on_stop, outputs=[log, badge]) return app def main(): set_seed(42) demo = VibeMiniDemo(model_path=MODEL_ID, device="cuda" if torch.cuda.is_available() else "cpu") app = build_ui(demo) app.queue(max_size=20, default_concurrency_limit=1).launch(server_name="0.0.0.0", show_api=False) if __name__ == "__main__": main()