Spaces:
Build error
Build error
| import os | |
| import time | |
| import threading | |
| from pathlib import Path | |
| from typing import Iterator | |
| import gradio as gr | |
| import numpy as np | |
| import soundfile as sf | |
| import librosa | |
| import torch | |
| from transformers import set_seed | |
| from vibevoice.modular.modeling_vibevoice_inference import ( | |
| VibeVoiceForConditionalGenerationInference, | |
| ) | |
| from vibevoice.processor.vibevoice_processor import VibeVoiceProcessor | |
| from vibevoice.modular.streamer import AudioStreamer | |
| MODEL_ID = "microsoft/VibeVoice-1.5B" | |
| def convert_to_16bit(data: np.ndarray) -> np.ndarray: | |
| if torch.is_tensor(data): | |
| data = data.detach().cpu().numpy() | |
| data = np.array(data, dtype=np.float32, copy=False) | |
| amax = np.max(np.abs(data)) if data.size else 1.0 | |
| if amax > 1.0: | |
| data = data / amax | |
| return (data * 32767.0).astype(np.int16) | |
| def read_audio(path: str, target_sr: int = 24000) -> np.ndarray: | |
| wav, sr = sf.read(path) | |
| if wav.ndim > 1: | |
| wav = wav.mean(axis=1) | |
| if sr != target_sr: | |
| wav = librosa.resample(wav, orig_sr=sr, target_sr=target_sr) | |
| return wav.astype(np.float32) | |
| class VibeMiniDemo: | |
| def __init__(self, model_path: str, device: str = "cuda", inference_steps: int = 10): | |
| self.model_path = model_path | |
| self.device = device | |
| self.inference_steps = inference_steps | |
| self._stop = False | |
| self._streamer = None | |
| self._load() | |
| def _load(self): | |
| print(f"🔄 Loading VibeVoice from {self.model_path} ...") | |
| # Processor pulls tokenizer/config from HF automatically if model_path is a repo id | |
| self.processor = VibeVoiceProcessor.from_pretrained(self.model_path) | |
| # Try flash-attn2 first; fall back to SDPA if the env doesn’t have it | |
| try: | |
| self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( | |
| self.model_path, | |
| torch_dtype=torch.bfloat16, | |
| device_map="cuda" if torch.cuda.is_available() else "cpu", | |
| attn_implementation="flash_attention_2", | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ flash_attention_2 unavailable ({type(e).__name__}: {e}); falling back to SDPA") | |
| self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( | |
| self.model_path, | |
| torch_dtype=torch.bfloat16, | |
| device_map="cuda" if torch.cuda.is_available() else "cpu", | |
| attn_implementation="sdpa", | |
| ) | |
| self.model.eval() | |
| # Configure diffusion steps (matches upstream demo defaults) | |
| self.model.model.noise_scheduler = self.model.model.noise_scheduler.from_config( | |
| self.model.model.noise_scheduler.config, | |
| algorithm_type="sde-dpmsolver++", | |
| beta_schedule="squaredcos_cap_v2", | |
| ) | |
| self.model.set_ddpm_inference_steps(num_steps=self.inference_steps) | |
| print("✅ Model ready") | |
| def stop(self): | |
| self._stop = True | |
| if self._streamer is not None: | |
| try: | |
| self._streamer.end() | |
| except Exception as e: | |
| print(f"stop error: {e}") | |
| def generate_stream( | |
| self, | |
| script: str, | |
| voice_files: list[str], | |
| cfg_scale: float = 1.3, | |
| ) -> Iterator[tuple]: | |
| if not script.strip(): | |
| yield None, None, "❌ Please provide a script.", gr.update(visible=False) | |
| return | |
| # Load voice samples (1..4) | |
| voice_samples = [read_audio(p) for p in voice_files if p] | |
| if not voice_samples: | |
| yield None, None, "❌ Provide at least one voice sample (WAV/MP3/etc).", gr.update(visible=False) | |
| return | |
| # Normalize speaker labels if user didn’t prefix lines | |
| lines = [] | |
| for i, raw in enumerate([ln for ln in script.splitlines() if ln.strip()]): | |
| if raw.lower().startswith("speaker") and ":" in raw: | |
| lines.append(raw) | |
| else: | |
| lines.append(f"Speaker {i % max(1, len(voice_samples))}: {raw}") | |
| formatted = "\n".join(lines) | |
| # Pack inputs | |
| inputs = self.processor( | |
| text=[formatted], | |
| voice_samples=[voice_samples], | |
| padding=True, | |
| return_tensors="pt", | |
| return_attention_mask=True, | |
| ) | |
| self._stop = False | |
| streamer = AudioStreamer(batch_size=1, stop_signal=None, timeout=None) | |
| self._streamer = streamer | |
| # Kick off generation on a worker thread | |
| def _worker(): | |
| try: | |
| self.model.generate( | |
| **inputs, | |
| max_new_tokens=None, | |
| cfg_scale=cfg_scale, | |
| tokenizer=self.processor.tokenizer, | |
| generation_config={"do_sample": False}, | |
| audio_streamer=streamer, | |
| stop_check_fn=lambda: self._stop, | |
| verbose=False, | |
| refresh_negative=True, | |
| ) | |
| except Exception as e: | |
| print(f"gen error: {e}") | |
| streamer.end() | |
| t = threading.Thread(target=_worker, daemon=True) | |
| t.start() | |
| # Stream chunks out | |
| sr = 24000 | |
| all_chunks, pending = [], [] | |
| last_yield = time.time() | |
| min_chunk = sr * 30 # ~30s per push feels smooth for Spaces audio | |
| min_interval = 15.0 # or every 15s if chunks are small | |
| stream0 = streamer.get_stream(0) | |
| got_any = False | |
| yielded_any = False | |
| chunk_idx = 0 | |
| log_prefix = f"🎙️ VibeVoice streaming (CFG={cfg_scale})\n" | |
| for chunk in stream0: | |
| if self._stop: | |
| streamer.end() | |
| break | |
| got_any = True | |
| chunk_idx += 1 | |
| if torch.is_tensor(chunk): | |
| if chunk.dtype == torch.bfloat16: | |
| chunk = chunk.float() | |
| audio_np = chunk.cpu().numpy().astype(np.float32) | |
| else: | |
| audio_np = np.asarray(chunk, dtype=np.float32) | |
| if audio_np.ndim > 1: | |
| audio_np = audio_np.squeeze(-1) | |
| pcm16 = convert_to_16bit(audio_np) | |
| all_chunks.append(pcm16) | |
| pending.append(pcm16) | |
| need_push = False | |
| if not yielded_any and sum(len(c) for c in pending) >= min_chunk: | |
| need_push = True | |
| yielded_any = True | |
| elif yielded_any and ( | |
| sum(len(c) for c in pending) >= min_chunk | |
| or (time.time() - last_yield) >= min_interval | |
| ): | |
| need_push = True | |
| if need_push and pending: | |
| new_audio = np.concatenate(pending) | |
| total_sec = sum(len(c) for c in all_chunks) / sr | |
| msg = log_prefix + f"🎵 {total_sec:.1f}s generated (chunk {chunk_idx})" | |
| yield (sr, new_audio), None, msg, gr.update(visible=True) | |
| pending, last_yield = [], time.time() | |
| # Flush any remainder | |
| if pending: | |
| final = np.concatenate(pending) | |
| total_sec = sum(len(c) for c in all_chunks) / sr | |
| yield (sr, final), None, log_prefix + f"🎵 final chunk: {total_sec:.1f}s", gr.update(visible=True) | |
| yielded_any = True | |
| # Join worker quickly; then deliver full take | |
| t.join(timeout=5.0) | |
| self._streamer = None | |
| if not got_any: | |
| yield None, None, "❌ No audio chunks received from the model.", gr.update(visible=False) | |
| return | |
| if all_chunks: | |
| complete = np.concatenate(all_chunks) | |
| final_sec = len(complete) / sr | |
| msg = f"✅ Done. Total: {final_sec:.1f}s" | |
| yield None, (sr, complete), msg, gr.update(visible=False) | |
| def build_ui(demo: VibeMiniDemo): | |
| with gr.Blocks(title="VibeVoice – Minimal") as app: | |
| gr.Markdown("## 🎙️ VibeVoice — Minimal Space\nProvide a script and 1–4 short voice samples.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| script = gr.Textbox( | |
| label="Script", | |
| value="Speaker 0: Welcome to VibeVoice!\nSpeaker 0: This is a minimal Space demo.", | |
| lines=8, | |
| ) | |
| cfg = gr.Slider(1.0, 2.0, step=0.05, value=1.3, label="CFG Scale") | |
| voices = gr.Files( | |
| label="Voice samples (WAV/MP3/FLAC/OGG/M4A/AAC) — 1 to 4 files", | |
| file_count="multiple", | |
| type="filepath", | |
| ) | |
| with gr.Row(): | |
| go = gr.Button("🚀 Generate") | |
| stop = gr.Button("🛑 Stop", variant="stop") | |
| with gr.Column(): | |
| live = gr.Audio(label="Live Stream", streaming=True, autoplay=True) | |
| full = gr.Audio(label="Complete Take (downloadable)") | |
| log = gr.Textbox(label="Log", interactive=False) | |
| badge = gr.HTML(visible=False, value=""" | |
| <div style="background:#dcfce7;border:1px solid #86efac;padding:8px;border-radius:8px;text-align:center"> | |
| <strong>LIVE STREAMING</strong> | |
| </div> | |
| """) | |
| def on_go(script, cfg, voices): | |
| paths = [f.name if hasattr(f, "name") else f for f in (voices or [])][:4] | |
| # Clear outputs first | |
| yield None, gr.update(value=None), "⏳ Starting…", gr.update(visible=True) | |
| # Stream generation | |
| for s_chunk, full_take, msg, badge_vis in demo.generate_stream( | |
| script=script, | |
| voice_files=paths, | |
| cfg_scale=cfg, | |
| ): | |
| if full_take is not None: | |
| # final: hide live, show full | |
| yield None, full_take, msg, gr.update(visible=False) | |
| else: | |
| # live streaming | |
| yield s_chunk, gr.update(), msg, badge_vis | |
| go.click( | |
| on_go, | |
| inputs=[script, cfg, voices], | |
| outputs=[live, full, log, badge], | |
| ) | |
| def on_stop(): | |
| demo.stop() | |
| return "🛑 Stopped.", gr.update(visible=False) | |
| stop.click(on_stop, outputs=[log, badge]) | |
| return app | |
| def main(): | |
| set_seed(42) | |
| demo = VibeMiniDemo(model_path=MODEL_ID, device="cuda" if torch.cuda.is_available() else "cpu") | |
| app = build_ui(demo) | |
| app.queue(max_size=20, default_concurrency_limit=1).launch(server_name="0.0.0.0", show_api=False) | |
| if __name__ == "__main__": | |
| main() | |