Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import json | |
| import math | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import uuid | |
| import wave | |
| from pathlib import Path | |
| from typing import Any | |
| try: | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| except ImportError: | |
| pass | |
| import gradio as gr | |
| import requests | |
| import spaces | |
| import torch | |
| import websocket | |
| from scripts.workflow_client import load_workflow, patch_voicegate_workflow | |
| ROOT = Path(__file__).resolve().parent | |
| COMFY_DIR = ROOT / "ComfyUI" | |
| COMFY_INPUT_DIR = COMFY_DIR / "input" | |
| COMFY_LOG = Path("/tmp/voicegate_comfy_gradio.log") | |
| COMFY_URL = "http://127.0.0.1:8188" | |
| COMFY_HOST = "127.0.0.1" | |
| COMFY_PORT = "8188" | |
| COMFY_PROCESS: subprocess.Popen | None = None | |
| PREPARE_PROCESS: subprocess.Popen | None = None | |
| BOOTSTRAPPED = False | |
| BOOTSTRAP_LOG = Path("/tmp/voicegate_bootstrap.log") | |
| USER_OUTPUT_DIR = ROOT / "user_outputs" | |
| REQUIRED_MODEL_PATHS = [ | |
| COMFY_DIR / "models" / "diffusion_models" / "MelBandRoFormer_comfy" / "MelBandRoformer_fp32.safetensors", | |
| COMFY_DIR / "models" / "voxcpm" / "VoxCPM2" / "model.safetensors", | |
| COMFY_DIR / "models" / "voxcpm" / "VoxCPM2" / "audiovae.pth", | |
| COMFY_DIR / "models" / "Qwen3-ASR" / "Qwen3-ASR-1.7B", | |
| COMFY_DIR / "models" / "Qwen3-ASR" / "Qwen3-ForcedAligner-0.6B", | |
| ] | |
| TARGET_LANGUAGES = [ | |
| "Arabic", | |
| "Burmese", | |
| "Chinese", | |
| "Danish", | |
| "Dutch", | |
| "English", | |
| "Finnish", | |
| "French", | |
| "German", | |
| "Greek", | |
| "Hebrew", | |
| "Hindi", | |
| "Indonesian", | |
| "Italian", | |
| "Japanese", | |
| "Khmer", | |
| "Korean", | |
| "Lao", | |
| "Malay", | |
| "Norwegian", | |
| "Polish", | |
| "Portuguese", | |
| "Russian", | |
| "Spanish", | |
| "Swahili", | |
| "Swedish", | |
| "Tagalog", | |
| "Thai", | |
| "Turkish", | |
| "Vietnamese", | |
| ] | |
| VG_PRIMARY = "#6366c7" | |
| VG_WAVEFORM = "#98a2b3" | |
| VOICEGATE_WAVEFORM_OPTIONS = gr.WaveformOptions( | |
| waveform_color=VG_WAVEFORM, | |
| waveform_progress_color=VG_PRIMARY, | |
| ) | |
| APP_CSS = """ | |
| :root { | |
| --vg-primary: #6366c7; | |
| --vg-primary-dark: #5255b5; | |
| --vg-ink: #171827; | |
| --vg-muted: #667085; | |
| --vg-line: #eceef5; | |
| --vg-soft: #f6f7fb; | |
| --vg-radius: 8px; | |
| --vg-radius-sm: 6px; | |
| } | |
| :root:root:root:root main { | |
| max-width: 1160px; | |
| margin-left: auto !important; | |
| margin-right: auto !important; | |
| } | |
| :root:root:root:root .gradio-container { | |
| overflow: unset; | |
| } | |
| .voicegate-shell { | |
| gap: 16px; | |
| } | |
| .voicegate-card { | |
| background: #ffffff; | |
| border: 1px solid var(--vg-line); | |
| border-radius: var(--vg-radius) !important; | |
| padding: 12px; | |
| box-shadow: none; | |
| overflow: hidden; | |
| } | |
| /* Gradio may attach elem_classes to an outer wrapper while the visible block is a | |
| child element. Apply the same rounded corner to both so the final rendered card | |
| never appears square. */ | |
| .voicegate-card.block, | |
| .voicegate-card > .block, | |
| .voicegate-card > div, | |
| .voicegate-card > div > .block { | |
| border-radius: var(--vg-radius) !important; | |
| overflow: hidden; | |
| } | |
| .voicegate-intro { | |
| margin: 10px 0 12px; | |
| padding: 18px; | |
| border-color: rgba(99, 102, 199, 0.24); | |
| background: linear-gradient(180deg, #ffffff 0%, #f8f8ff 100%); | |
| } | |
| .voicegate-kicker { | |
| color: var(--vg-primary); | |
| font-size: 12px; | |
| font-weight: 700; | |
| letter-spacing: 0; | |
| text-transform: uppercase; | |
| } | |
| .voicegate-intro h1 { | |
| margin: 6px 0 8px; | |
| color: var(--vg-ink); | |
| font-size: 30px; | |
| line-height: 1.12; | |
| letter-spacing: 0; | |
| } | |
| .voicegate-intro p { | |
| max-width: none; | |
| width: 100%; | |
| margin: 0; | |
| color: var(--vg-muted); | |
| font-size: 14px; | |
| line-height: 1.6; | |
| } | |
| .voicegate-link-row { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 8px; | |
| margin-top: 14px; | |
| } | |
| .voicegate-link-row a { | |
| display: inline-flex; | |
| min-height: 34px; | |
| align-items: center; | |
| justify-content: center; | |
| border: 1px solid rgba(99, 102, 199, 0.34); | |
| border-radius: var(--vg-radius-sm); | |
| padding: 6px 12px; | |
| color: var(--vg-primary) !important; | |
| background: #ffffff; | |
| font-size: 13px; | |
| font-weight: 650; | |
| text-decoration: none; | |
| } | |
| .voicegate-link-row a:hover { | |
| border-color: var(--vg-primary); | |
| background: #f4f4ff; | |
| } | |
| .voicegate-link-row a.voicegate-github { | |
| border-color: var(--vg-primary); | |
| background: var(--vg-primary); | |
| color: #ffffff !important; | |
| } | |
| .voicegate-link-row a.voicegate-github:hover { | |
| border-color: var(--vg-primary-dark); | |
| background: var(--vg-primary-dark); | |
| } | |
| .voicegate-card-label { | |
| display: inline-flex; | |
| align-items: center; | |
| margin: 0 0 10px; | |
| border-radius: var(--vg-radius-sm); | |
| padding: 5px 8px; | |
| background: #ececf1; | |
| color: var(--vg-ink); | |
| font-size: 12px; | |
| font-weight: 700; | |
| letter-spacing: 0; | |
| text-transform: uppercase; | |
| } | |
| .voicegate-card-label .voicegate-tag { | |
| margin-left: 8px; | |
| border-radius: 999px; | |
| padding: 2px 7px; | |
| color: var(--vg-primary); | |
| background: #ffffff; | |
| font-size: 12px; | |
| font-weight: 700; | |
| text-transform: none; | |
| } | |
| /* Keep only the outer VoiceGate card. Gradio generates many nested blocks/forms; | |
| these rules prevent each nested wrapper from drawing another visible box. */ | |
| .voicegate-card .block, | |
| .voicegate-card .form, | |
| .voicegate-card .panel, | |
| .voicegate-card .accordion, | |
| .voicegate-card .tabs, | |
| .voicegate-card .tabitem { | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| background: transparent !important; | |
| } | |
| .voicegate-card .block { | |
| padding-left: 0 !important; | |
| padding-right: 0 !important; | |
| } | |
| .voicegate-card textarea, | |
| .voicegate-card input, | |
| .voicegate-card select { | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| .voicegate-card textarea { | |
| font-size: 13px; | |
| } | |
| /* Match FaceFusion-like softly rounded inner controls without adding extra boxes. */ | |
| .voicegate-card input, | |
| .voicegate-card textarea, | |
| .voicegate-card select, | |
| .voicegate-card button, | |
| .voicegate-card .wrap, | |
| .voicegate-card .container, | |
| .voicegate-card .input-container, | |
| .voicegate-card .dropdown-arrow, | |
| .voicegate-card details, | |
| .voicegate-card details > summary { | |
| border-radius: var(--vg-radius-sm) !important; | |
| } | |
| /* Rounded corners for visible component cards such as Upload audio and Target language. | |
| Gradio applies elem_classes to a wrapper, so radius must also be pushed into | |
| the rendered block and its inner containers. */ | |
| .voicegate-control-card, | |
| .voicegate-control-card.block, | |
| .voicegate-control-card > .block, | |
| .voicegate-control-card > div, | |
| .voicegate-control-card > div > .block, | |
| .voicegate-control-card .wrap, | |
| .voicegate-control-card .container, | |
| .voicegate-control-card .input-container { | |
| border-radius: var(--vg-radius) !important; | |
| overflow: hidden !important; | |
| } | |
| .voicegate-control-card .block, | |
| .voicegate-control-card .form { | |
| border-radius: var(--vg-radius) !important; | |
| } | |
| .voicegate-control-card input, | |
| .voicegate-control-card textarea, | |
| .voicegate-control-card select, | |
| .voicegate-control-card button { | |
| border-radius: var(--vg-radius-sm) !important; | |
| } | |
| /* Rounded accordion cards: Advanced audio cleanup, Subtitle preview, and Log. | |
| Keep them visually light, but give the expanded sections the same soft radius as | |
| Upload audio and Target language. */ | |
| .voicegate-accordion-card, | |
| .voicegate-accordion-card.block, | |
| .voicegate-accordion-card > .block, | |
| .voicegate-accordion-card > div, | |
| .voicegate-accordion-card > div > .block, | |
| .voicegate-accordion-card details { | |
| border-radius: var(--vg-radius) !important; | |
| overflow: hidden !important; | |
| } | |
| .voicegate-accordion-card details { | |
| border: 1px solid var(--vg-line) !important; | |
| background: #ffffff !important; | |
| box-shadow: none !important; | |
| } | |
| .voicegate-accordion-card details > summary { | |
| border-radius: var(--vg-radius) var(--vg-radius) 0 0 !important; | |
| padding: 10px 12px !important; | |
| background: var(--vg-soft) !important; | |
| box-shadow: none !important; | |
| } | |
| .voicegate-accordion-card details:not([open]) > summary { | |
| border-radius: var(--vg-radius) !important; | |
| } | |
| .voicegate-accordion-card details[open] > summary { | |
| border-bottom: 1px solid var(--vg-line) !important; | |
| } | |
| /* The content rendered inside an open accordion can have its own Gradio wrappers. | |
| Round those wrappers too so textboxes/sliders do not look square inside. */ | |
| .voicegate-accordion-card .block, | |
| .voicegate-accordion-card .form, | |
| .voicegate-accordion-card .wrap, | |
| .voicegate-accordion-card .container, | |
| .voicegate-accordion-card .input-container, | |
| .voicegate-accordion-card textarea, | |
| .voicegate-accordion-card input, | |
| .voicegate-accordion-card select { | |
| border-radius: var(--vg-radius-sm) !important; | |
| } | |
| /* Full-width primary action without an extra gr.Group wrapper. */ | |
| .voicegate-run-button, | |
| .voicegate-run-button button, | |
| button.voicegate-run-button { | |
| width: 100%; | |
| } | |
| .voicegate-run-button button.primary, | |
| .voicegate-run-button .primary, | |
| button.voicegate-run-button.primary { | |
| background: var(--vg-primary) !important; | |
| border-color: var(--vg-primary) !important; | |
| color: #ffffff !important; | |
| } | |
| .voicegate-run-button button.primary:hover, | |
| .voicegate-run-button .primary:hover, | |
| button.voicegate-run-button.primary:hover { | |
| background: var(--vg-primary-dark) !important; | |
| border-color: var(--vg-primary-dark) !important; | |
| } | |
| .voicegate-downloads { | |
| gap: 10px; | |
| } | |
| .voicegate-downloads button, | |
| .voicegate-downloads a { | |
| width: 100%; | |
| } | |
| .voicegate-status textarea { | |
| font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; | |
| font-size: 12px; | |
| } | |
| :root:root:root:root input[type="range"] { | |
| accent-color: var(--vg-primary); | |
| } | |
| :root:root:root:root input[type="range"]::-moz-range-thumb, | |
| :root:root:root:root input[type="range"]::-webkit-slider-thumb { | |
| background: var(--vg-primary); | |
| box-shadow: none; | |
| } | |
| :root:root:root:root .tab-container button.selected, | |
| :root:root:root:root button[role="tab"][aria-selected="true"] { | |
| color: var(--vg-primary); | |
| border-color: var(--vg-primary); | |
| } | |
| :root:root:root:root footer { | |
| display: none; | |
| } | |
| @media (max-width: 760px) { | |
| .voicegate-intro h1 { | |
| font-size: 26px; | |
| } | |
| .voicegate-link-row a { | |
| flex: 1 1 46%; | |
| } | |
| } | |
| """ | |
| def gpu_status_lines() -> list[str]: | |
| lines = ["VoiceGate GPU status"] | |
| lines.append(f"torch={torch.__version__}") | |
| lines.append(f"cuda_available={torch.cuda.is_available()}") | |
| lines.append(f"cuda_device_count={torch.cuda.device_count()}") | |
| if torch.cuda.is_available(): | |
| props = torch.cuda.get_device_properties(0) | |
| lines.append(f"device_name={torch.cuda.get_device_name(0)}") | |
| lines.append(f"total_memory_gb={props.total_memory / 1024**3:.2f}") | |
| return lines | |
| def voicegate_theme() -> gr.Theme: | |
| primary = gr.themes.Color( | |
| name="voicegate", | |
| c50="#f5f5ff", | |
| c100="#ececff", | |
| c200="#dadaff", | |
| c300="#b8b9fb", | |
| c400="#9193ee", | |
| c500="#6366c7", | |
| c600="#5255b5", | |
| c700="#444695", | |
| c800="#393b78", | |
| c900="#313262", | |
| c950="#1f2040", | |
| ) | |
| return gr.themes.Base( | |
| primary_hue=primary, | |
| secondary_hue=gr.themes.colors.neutral, | |
| radius_size=gr.themes.sizes.radius_md, | |
| font=[gr.themes.GoogleFont("Open Sans"), "ui-sans-serif", "system-ui", "sans-serif"], | |
| ).set( | |
| background_fill_primary="*neutral_100", | |
| background_fill_secondary="*neutral_50", | |
| block_background_fill="white", | |
| block_border_width="0", | |
| block_label_background_fill="*neutral_100", | |
| block_label_border_width="none", | |
| block_label_margin="0.5rem", | |
| block_label_radius="*radius_sm", | |
| block_label_text_color="*neutral_700", | |
| block_label_text_size="*text_sm", | |
| block_label_text_weight="600", | |
| block_padding="0.5rem", | |
| border_color_primary="transparent", | |
| button_primary_background_fill="*primary_500", | |
| button_primary_background_fill_hover="*primary_600", | |
| button_primary_text_color="white", | |
| input_background_fill="*neutral_50", | |
| shadow_drop="none", | |
| slider_color="*primary_500", | |
| ) | |
| def wait_for_comfy(timeout: float = 180) -> dict[str, Any]: | |
| deadline = time.time() + timeout | |
| last_error = "" | |
| while time.time() < deadline: | |
| try: | |
| response = requests.get(f"{COMFY_URL}/system_stats", timeout=5) | |
| if response.ok: | |
| return response.json() | |
| last_error = f"HTTP {response.status_code}: {response.text[:300]}" | |
| except requests.RequestException as exc: | |
| last_error = repr(exc) | |
| time.sleep(2) | |
| raise RuntimeError(f"ComfyUI did not become ready: {last_error}") | |
| def run_bootstrap(lines: list[str], *, allow_heavy: bool = True) -> None: | |
| global BOOTSTRAPPED | |
| if BOOTSTRAPPED and (COMFY_DIR / "main.py").exists(): | |
| lines.append("bootstrap=already_done") | |
| return | |
| if (COMFY_DIR / "main.py").exists() and (COMFY_DIR / "custom_nodes").exists(): | |
| if not allow_heavy: | |
| lines.append("bootstrap=existing_comfyui") | |
| BOOTSTRAPPED = True | |
| return | |
| started = time.time() | |
| lines.append("bootstrap=starting") | |
| command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py")] | |
| result = subprocess.run( | |
| command, | |
| cwd=ROOT, | |
| text=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| timeout=900, | |
| ) | |
| lines.append(f"bootstrap_returncode={result.returncode}") | |
| lines.append(f"bootstrap_elapsed_sec={time.time() - started:.1f}") | |
| if result.returncode != 0: | |
| lines.append("bootstrap_tail:") | |
| lines.extend(result.stdout.splitlines()[-80:]) | |
| raise RuntimeError("bootstrap_comfy.py failed") | |
| BOOTSTRAPPED = True | |
| def missing_required_models() -> list[Path]: | |
| return [path for path in REQUIRED_MODEL_PATHS if not path.exists()] | |
| def ensure_runtime_assets(lines: list[str]) -> None: | |
| missing = missing_required_models() | |
| if not missing: | |
| lines.append("models=ready") | |
| return | |
| lines.append("models=missing") | |
| lines.extend(f"missing_model={path}" for path in missing) | |
| started = time.time() | |
| command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py"), "--with-models"] | |
| result = subprocess.run( | |
| command, | |
| cwd=ROOT, | |
| text=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| timeout=1800, | |
| ) | |
| lines.append(f"model_prepare_returncode={result.returncode}") | |
| lines.append(f"model_prepare_elapsed_sec={time.time() - started:.1f}") | |
| if result.returncode != 0: | |
| lines.append("model_prepare_tail:") | |
| lines.extend(result.stdout.splitlines()[-100:]) | |
| raise RuntimeError("Could not prepare required VoiceGate models.") | |
| remaining = missing_required_models() | |
| if remaining: | |
| lines.append("models_still_missing:") | |
| lines.extend(str(path) for path in remaining) | |
| raise RuntimeError("Required VoiceGate models are still missing after preparation.") | |
| lines.append("models=ready_after_prepare") | |
| def ensure_comfy(lines: list[str], *, timeout: float = 240) -> dict[str, Any]: | |
| global COMFY_PROCESS | |
| if PREPARE_PROCESS is not None: | |
| returncode = PREPARE_PROCESS.poll() | |
| if returncode is None: | |
| raise RuntimeError("Runtime preparation is still running. Check Prepare Status first.") | |
| if returncode != 0: | |
| raise RuntimeError(f"Runtime preparation failed with return code {returncode}.") | |
| run_bootstrap(lines, allow_heavy=False) | |
| try: | |
| stats = wait_for_comfy(timeout=5) | |
| lines.append("comfy=already_running") | |
| return stats | |
| except RuntimeError: | |
| pass | |
| log = COMFY_LOG.open("ab") | |
| command = [ | |
| sys.executable, | |
| "main.py", | |
| "--listen", | |
| COMFY_HOST, | |
| "--port", | |
| COMFY_PORT, | |
| ] | |
| COMFY_PROCESS = subprocess.Popen( | |
| command, | |
| cwd=COMFY_DIR, | |
| stdout=log, | |
| stderr=subprocess.STDOUT, | |
| ) | |
| lines.append(f"comfy_started_pid={COMFY_PROCESS.pid}") | |
| try: | |
| return wait_for_comfy(timeout=timeout) | |
| except Exception: | |
| lines.append("comfy_log_tail:") | |
| if COMFY_LOG.exists(): | |
| lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-120:]) | |
| raise | |
| def write_sine_wav(filename: str, *, seconds: float = 1.0, frequency: float = 440.0) -> str: | |
| COMFY_INPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| path = COMFY_INPUT_DIR / filename | |
| sample_rate = 16000 | |
| total = int(sample_rate * seconds) | |
| amplitude = 0.2 | |
| with wave.open(str(path), "wb") as file: | |
| file.setnchannels(1) | |
| file.setsampwidth(2) | |
| file.setframerate(sample_rate) | |
| for index in range(total): | |
| value = int(32767 * amplitude * math.sin(2 * math.pi * frequency * index / sample_rate)) | |
| file.writeframesraw(value.to_bytes(2, byteorder="little", signed=True)) | |
| return filename | |
| def submit_prompt(workflow: dict[str, Any], *, client_id: str | None = None) -> str: | |
| response = requests.post( | |
| f"{COMFY_URL}/prompt", | |
| json={"prompt": workflow, "client_id": client_id or str(uuid.uuid4())}, | |
| timeout=120, | |
| ) | |
| if not response.ok: | |
| raise RuntimeError(f"/prompt failed HTTP {response.status_code}: {response.text[:2000]}") | |
| return response.json()["prompt_id"] | |
| def execute_prompt_with_timing(workflow: dict[str, Any], *, timeout: float) -> tuple[str, dict[str, Any], list[str]]: | |
| client_id = str(uuid.uuid4()) | |
| websocket_url = f"ws://{COMFY_HOST}:{COMFY_PORT}/ws?clientId={client_id}" | |
| ws = websocket.create_connection(websocket_url, timeout=30) | |
| prompt_id = submit_prompt(workflow, client_id=client_id) | |
| started = time.time() | |
| deadline = started + timeout | |
| current_node: str | None = None | |
| current_started = 0.0 | |
| node_durations: dict[str, float] = {} | |
| node_order: list[str] = [] | |
| event_lines = [f"prompt_id={prompt_id}", "node_timing=started"] | |
| def close_current_node(now: float) -> None: | |
| nonlocal current_node, current_started | |
| if current_node is not None: | |
| node_durations[current_node] = node_durations.get(current_node, 0.0) + max(0.0, now - current_started) | |
| current_node = None | |
| current_started = 0.0 | |
| try: | |
| while time.time() < deadline: | |
| ws.settimeout(max(1.0, min(10.0, deadline - time.time()))) | |
| try: | |
| message = ws.recv() | |
| except websocket.WebSocketTimeoutException: | |
| continue | |
| if isinstance(message, bytes): | |
| message = message.decode("utf-8", errors="replace") | |
| try: | |
| payload = json.loads(message) | |
| except json.JSONDecodeError: | |
| continue | |
| event_type = payload.get("type") | |
| data = payload.get("data") or {} | |
| if data.get("prompt_id") not in (None, prompt_id): | |
| continue | |
| now = time.time() | |
| if event_type == "executing": | |
| close_current_node(now) | |
| node = data.get("node") | |
| if node is None: | |
| continue | |
| current_node = str(node) | |
| current_started = now | |
| if current_node not in node_order: | |
| node_order.append(current_node) | |
| elif event_type == "execution_success": | |
| close_current_node(now) | |
| event_lines.append(f"websocket_elapsed_sec={now - started:.1f}") | |
| break | |
| elif event_type == "execution_error": | |
| close_current_node(now) | |
| event_lines.append("websocket_execution_error:") | |
| event_lines.append(json.dumps(data, ensure_ascii=False, indent=2)[:4000]) | |
| break | |
| else: | |
| close_current_node(time.time()) | |
| raise TimeoutError(f"Timed out waiting for prompt {prompt_id}") | |
| finally: | |
| ws.close() | |
| history = wait_for_history(prompt_id, timeout=30) | |
| timed_nodes = sorted( | |
| ((node_id, node_durations.get(node_id, 0.0)) for node_id in node_order), | |
| key=lambda item: item[1], | |
| reverse=True, | |
| ) | |
| if timed_nodes: | |
| event_lines.append("node_timing_top:") | |
| for node_id, seconds in timed_nodes[:20]: | |
| class_type = workflow.get(node_id, {}).get("class_type", "unknown") | |
| event_lines.append(f"{node_id} {class_type}: {seconds:.1f}s") | |
| return prompt_id, history, event_lines | |
| def wait_for_history(prompt_id: str, timeout: float = 1200) -> dict[str, Any]: | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| response = requests.get(f"{COMFY_URL}/history/{prompt_id}", timeout=30) | |
| response.raise_for_status() | |
| payload = response.json() | |
| if prompt_id in payload: | |
| return payload[prompt_id] | |
| time.sleep(2) | |
| raise TimeoutError(f"Timed out waiting for prompt {prompt_id}") | |
| def history_summary(history: dict[str, Any]) -> list[str]: | |
| lines = [] | |
| status = history.get("status", {}) | |
| lines.append(f"status_str={status.get('status_str')}") | |
| lines.append(f"completed={status.get('completed')}") | |
| messages = status.get("messages") or [] | |
| errors = [message for message in messages if isinstance(message, list) and message[0] == "execution_error"] | |
| if errors: | |
| lines.append("errors:") | |
| lines.append(json.dumps(errors, ensure_ascii=False, indent=2)[:4000]) | |
| outputs = history.get("outputs", {}) | |
| output_files = [] | |
| for node_output in outputs.values(): | |
| for key in ("audio", "images", "gifs"): | |
| for item in node_output.get(key, []) or []: | |
| filename = item.get("filename") | |
| subfolder = item.get("subfolder") | |
| if subfolder: | |
| output_files.append(f"{subfolder}/{filename}") | |
| elif filename: | |
| output_files.append(filename) | |
| if output_files: | |
| lines.append("outputs:") | |
| lines.extend(output_files) | |
| text_outputs = [] | |
| for node_output in outputs.values(): | |
| for key in ("text", "string"): | |
| values = node_output.get(key, []) or [] | |
| if isinstance(values, str): | |
| values = [values] | |
| text_outputs.extend(str(value) for value in values) | |
| if text_outputs: | |
| lines.append("text_outputs:") | |
| for value in text_outputs: | |
| lines.append(value[:2000]) | |
| return lines | |
| def first_output_audio_path(history: dict[str, Any]) -> str | None: | |
| outputs = history.get("outputs", {}) | |
| for node_output in outputs.values(): | |
| for item in node_output.get("audio", []) or []: | |
| filename = item.get("filename") | |
| if not filename: | |
| continue | |
| subfolder = item.get("subfolder") or "" | |
| path = COMFY_DIR / "output" / subfolder / filename | |
| if path.exists(): | |
| return str(path) | |
| return None | |
| def text_outputs_for_node(history: dict[str, Any], node_id: str) -> list[str]: | |
| node_output = (history.get("outputs", {}) or {}).get(node_id, {}) | |
| values: list[str] = [] | |
| for key in ("text", "string"): | |
| raw_values = node_output.get(key, []) or [] | |
| if isinstance(raw_values, str): | |
| raw_values = [raw_values] | |
| values.extend(str(value) for value in raw_values if str(value).strip()) | |
| return values | |
| def write_srt_file(prefix: str, name: str, text: str) -> str | None: | |
| if not text.strip(): | |
| return None | |
| USER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| path = USER_OUTPUT_DIR / f"{prefix}_{name}.srt" | |
| path.write_text(text, encoding="utf-8") | |
| return str(path) | |
| def melband_workflow(audio_filename: str, prefix: str) -> dict[str, Any]: | |
| return { | |
| "1": { | |
| "class_type": "LoadAudio", | |
| "inputs": {"audio": audio_filename, "audioUI": ""}, | |
| }, | |
| "2": { | |
| "class_type": "MelBandRoFormerModelLoader", | |
| "inputs": {"model_name": "MelBandRoFormer_comfy/MelBandRoformer_fp32.safetensors"}, | |
| }, | |
| "3": { | |
| "class_type": "MelBandRoFormerSampler", | |
| "inputs": {"model": ["2", 0], "audio": ["1", 0]}, | |
| }, | |
| "4": { | |
| "class_type": "SaveAudioMP3", | |
| "inputs": { | |
| "filename_prefix": f"audio/{prefix}_vocals", | |
| "quality": "V0", | |
| "audioUI": "", | |
| "audio": ["3", 0], | |
| }, | |
| }, | |
| "5": { | |
| "class_type": "SaveAudioMP3", | |
| "inputs": { | |
| "filename_prefix": f"audio/{prefix}_instruments", | |
| "quality": "V0", | |
| "audioUI": "", | |
| "audio": ["3", 1], | |
| }, | |
| }, | |
| } | |
| def voxcpm_tts_workflow(prefix: str) -> dict[str, Any]: | |
| return { | |
| "1": { | |
| "class_type": "RunningHub_VoxCPM_LoadModel", | |
| "inputs": {"model_name": "VoxCPM2", "optimize": False, "lora_name": "None"}, | |
| }, | |
| "2": { | |
| "class_type": "RunningHub_VoxCPM_Generate", | |
| "inputs": { | |
| "model": ["1", 0], | |
| "control_instruction": "清晰自然的中文女声", | |
| "text": "你好,VoiceGate GPU 语音合成测试。", | |
| "cfg_value": 2.0, | |
| "inference_steps": 4, | |
| "seed": 20260605, | |
| "ultimate_clone": False, | |
| "reference_audio_text": "", | |
| "normalize_text": False, | |
| "denoise_reference": False, | |
| "max_len": 512, | |
| "retry_badcase": True, | |
| }, | |
| }, | |
| "3": { | |
| "class_type": "SaveAudioMP3", | |
| "inputs": { | |
| "filename_prefix": f"audio/{prefix}", | |
| "quality": "V0", | |
| "audioUI": "", | |
| "audio": ["2", 0], | |
| }, | |
| }, | |
| } | |
| def copy_audio_to_comfy_input(audio_path: str | Path, prefix: str) -> str: | |
| source = Path(audio_path) | |
| if not source.exists(): | |
| raise FileNotFoundError(f"Uploaded audio does not exist: {source}") | |
| suffix = source.suffix or ".wav" | |
| filename = f"{prefix}_{uuid.uuid4().hex[:8]}{suffix}" | |
| COMFY_INPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| shutil.copyfile(source, COMFY_INPUT_DIR / filename) | |
| return filename | |
| def asr_workflow(audio_filename: str, prefix: str) -> dict[str, Any]: | |
| return { | |
| "1": { | |
| "class_type": "LoadAudio", | |
| "inputs": {"audio": audio_filename, "audioUI": ""}, | |
| }, | |
| "2": { | |
| "class_type": "VoiceBridgeASRLoader", | |
| "inputs": { | |
| "repo_id": "Qwen/Qwen3-ASR-1.7B", | |
| "source": "HuggingFace", | |
| "precision": "bf16", | |
| "attention": "sdpa", | |
| "max_new_tokens": 256, | |
| "forced_aligner": "Qwen/Qwen3-ForcedAligner-0.6B", | |
| "local_model_path_asr": "", | |
| "local_model_path_fa": "", | |
| }, | |
| }, | |
| "3": { | |
| "class_type": "VoiceBridgeASRTranscribe", | |
| "inputs": { | |
| "model_key": ["2", 0], | |
| "audio": ["1", 0], | |
| "language": "auto", | |
| "context": "", | |
| "return_timestamps": True, | |
| }, | |
| }, | |
| "4": { | |
| "class_type": "GenerateSRT", | |
| "inputs": { | |
| "forced_aligns": ["3", 0], | |
| "text": ["3", 1], | |
| "language": ["3", 2], | |
| "save_srt": True, | |
| "filename_prefix": f"VoiceBridge/{prefix}", | |
| }, | |
| }, | |
| "5": { | |
| "class_type": "easy showAnything", | |
| "inputs": { | |
| "text": "", | |
| "anything": ["4", 0], | |
| }, | |
| }, | |
| } | |
| def full_voicegate_workflow( | |
| audio_filename: str, | |
| prefix: str, | |
| target_language: str, | |
| *, | |
| tts_trim_start: float, | |
| ) -> dict[str, Any]: | |
| workflow = load_workflow() | |
| return patch_voicegate_workflow( | |
| workflow, | |
| audio_filename=audio_filename, | |
| target_language=target_language, | |
| api_key=os.environ.get("DEEPSEEK_API_KEY"), | |
| api_baseurl=os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com"), | |
| llm_model=os.environ.get("DEEPSEEK_MODEL", "deepseek-v4-flash"), | |
| job_id=prefix, | |
| tts_trim_start=tts_trim_start, | |
| ) | |
| def run_full_voicegate( | |
| audio_path: str | None, | |
| target_language: str, | |
| *, | |
| tts_trim_start: float = 0.0, | |
| timeout: float = 880, | |
| ) -> dict[str, Any]: | |
| lines = gpu_status_lines() | |
| started = time.time() | |
| trim_start = min(1.0, max(0.0, float(tts_trim_start))) | |
| if not audio_path: | |
| raise ValueError("Please upload an audio file before running VoiceGate.") | |
| if not os.environ.get("DEEPSEEK_API_KEY"): | |
| raise RuntimeError("DEEPSEEK_API_KEY is not configured in the Space.") | |
| ensure_runtime_assets(lines) | |
| ensure_comfy(lines) | |
| prefix = f"full_{uuid.uuid4().hex[:8]}" | |
| audio_filename = copy_audio_to_comfy_input(audio_path, prefix) | |
| lines.append(f"input_audio={audio_filename}") | |
| lines.append(f"target_language={target_language}") | |
| lines.append(f"tts_trim_start={trim_start}") | |
| prompt = full_voicegate_workflow( | |
| audio_filename, | |
| prefix, | |
| target_language or "English", | |
| tts_trim_start=trim_start, | |
| ) | |
| _prompt_id, history, timing_lines = execute_prompt_with_timing(prompt, timeout=timeout) | |
| lines.extend(timing_lines) | |
| lines.extend(history_summary(history)) | |
| output_audio = first_output_audio_path(history) | |
| source_subtitle = "\n\n".join(text_outputs_for_node(history, "61")) | |
| translated_subtitle = "\n\n".join(text_outputs_for_node(history, "179") or text_outputs_for_node(history, "107")) | |
| source_subtitle_file = write_srt_file(prefix, "source", source_subtitle) | |
| translated_subtitle_file = write_srt_file(prefix, "translated", translated_subtitle) | |
| if output_audio: | |
| lines.append(f"output_audio_path={output_audio}") | |
| if source_subtitle_file: | |
| lines.append(f"source_subtitle_file={source_subtitle_file}") | |
| if translated_subtitle_file: | |
| lines.append(f"translated_subtitle_file={translated_subtitle_file}") | |
| lines.append(f"elapsed_sec={time.time() - started:.1f}") | |
| return { | |
| "lines": lines, | |
| "audio": output_audio, | |
| "source_subtitle": source_subtitle, | |
| "translated_subtitle": translated_subtitle, | |
| "source_subtitle_file": source_subtitle_file, | |
| "translated_subtitle_file": translated_subtitle_file, | |
| } | |
| def prepare_runtime() -> str: | |
| global PREPARE_PROCESS | |
| lines = ["VoiceGate runtime preparation"] | |
| if PREPARE_PROCESS is not None and PREPARE_PROCESS.poll() is None: | |
| lines.append(f"prepare=already_running pid={PREPARE_PROCESS.pid}") | |
| return "\n".join(lines) | |
| BOOTSTRAP_LOG.parent.mkdir(parents=True, exist_ok=True) | |
| log = BOOTSTRAP_LOG.open("ab") | |
| command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py"), "--with-models"] | |
| PREPARE_PROCESS = subprocess.Popen( | |
| command, | |
| cwd=ROOT, | |
| stdout=log, | |
| stderr=subprocess.STDOUT, | |
| ) | |
| lines.append(f"prepare=started pid={PREPARE_PROCESS.pid}") | |
| lines.append(f"log={BOOTSTRAP_LOG}") | |
| return "\n".join(lines) | |
| def prepare_status() -> str: | |
| global BOOTSTRAPPED | |
| lines = ["VoiceGate runtime preparation status"] | |
| if PREPARE_PROCESS is None: | |
| lines.append("prepare=not_started") | |
| else: | |
| returncode = PREPARE_PROCESS.poll() | |
| if returncode is None: | |
| lines.append(f"prepare=running pid={PREPARE_PROCESS.pid}") | |
| else: | |
| lines.append(f"prepare=finished returncode={returncode}") | |
| if returncode == 0 and (COMFY_DIR / "main.py").exists(): | |
| BOOTSTRAPPED = True | |
| lines.append(f"comfy_dir_exists={(COMFY_DIR / 'main.py').exists()}") | |
| if BOOTSTRAP_LOG.exists(): | |
| lines.append("bootstrap_log_tail:") | |
| lines.extend(BOOTSTRAP_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-80:]) | |
| return "\n".join(lines) | |
| def gpu_smoke_test() -> str: | |
| lines = gpu_status_lines() | |
| if torch.cuda.is_available(): | |
| tensor = torch.arange(16, device="cuda:0", dtype=torch.float32) | |
| result = (tensor * 2).sum().item() | |
| torch.cuda.synchronize() | |
| lines.append(f"tensor_result={result}") | |
| lines.append(f"memory_reserved_mb={torch.cuda.memory_reserved(0) / 1024**2:.2f}") | |
| return "\n".join(lines) | |
| def comfy_runtime_test() -> str: | |
| lines = gpu_status_lines() | |
| started = time.time() | |
| try: | |
| stats = ensure_comfy(lines) | |
| lines.append(f"comfy_ready=true") | |
| lines.append(f"comfy_elapsed_sec={time.time() - started:.1f}") | |
| lines.append("system_stats:") | |
| lines.append(json.dumps(stats, ensure_ascii=False, indent=2)[:4000]) | |
| except Exception as exc: | |
| lines.append(f"error={type(exc).__name__}: {exc}") | |
| return "\n".join(lines) | |
| def melband_gpu_test() -> str: | |
| lines = gpu_status_lines() | |
| started = time.time() | |
| try: | |
| ensure_comfy(lines) | |
| audio_filename = write_sine_wav(f"voicegate_melband_{uuid.uuid4().hex[:8]}.wav") | |
| prefix = f"melband_gpu_{uuid.uuid4().hex[:8]}" | |
| prompt_id = submit_prompt(melband_workflow(audio_filename, prefix)) | |
| lines.append(f"prompt_id={prompt_id}") | |
| history = wait_for_history(prompt_id) | |
| lines.extend(history_summary(history)) | |
| lines.append(f"elapsed_sec={time.time() - started:.1f}") | |
| except Exception as exc: | |
| lines.append(f"error={type(exc).__name__}: {exc}") | |
| if COMFY_LOG.exists(): | |
| lines.append("comfy_log_tail:") | |
| lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-120:]) | |
| return "\n".join(lines) | |
| def voxcpm_tts_gpu_test() -> str: | |
| lines = gpu_status_lines() | |
| started = time.time() | |
| try: | |
| ensure_comfy(lines) | |
| prefix = f"voxcpm_tts_gpu_{uuid.uuid4().hex[:8]}" | |
| prompt_id = submit_prompt(voxcpm_tts_workflow(prefix)) | |
| lines.append(f"prompt_id={prompt_id}") | |
| history = wait_for_history(prompt_id, timeout=1200) | |
| lines.extend(history_summary(history)) | |
| lines.append(f"elapsed_sec={time.time() - started:.1f}") | |
| except Exception as exc: | |
| lines.append(f"error={type(exc).__name__}: {exc}") | |
| if COMFY_LOG.exists(): | |
| lines.append("comfy_log_tail:") | |
| lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-160:]) | |
| return "\n".join(lines) | |
| def asr_gpu_test(audio_path: str | None) -> str: | |
| lines = gpu_status_lines() | |
| started = time.time() | |
| try: | |
| if not audio_path: | |
| raise ValueError("Please upload an audio file before running ASR.") | |
| ensure_comfy(lines) | |
| prefix = f"asr_gpu_{uuid.uuid4().hex[:8]}" | |
| audio_filename = copy_audio_to_comfy_input(audio_path, prefix) | |
| lines.append(f"input_audio={audio_filename}") | |
| prompt_id = submit_prompt(asr_workflow(audio_filename, prefix)) | |
| lines.append(f"prompt_id={prompt_id}") | |
| history = wait_for_history(prompt_id, timeout=900) | |
| lines.extend(history_summary(history)) | |
| lines.append(f"elapsed_sec={time.time() - started:.1f}") | |
| except Exception as exc: | |
| lines.append(f"error={type(exc).__name__}: {exc}") | |
| if COMFY_LOG.exists(): | |
| lines.append("comfy_log_tail:") | |
| lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-180:]) | |
| return "\n".join(lines) | |
| def full_voicegate_gpu_test(audio_path: str | None, target_language: str, tts_trim_start: float) -> str: | |
| try: | |
| result = run_full_voicegate(audio_path, target_language, tts_trim_start=tts_trim_start, timeout=880) | |
| lines = result["lines"] | |
| except Exception as exc: | |
| lines = gpu_status_lines() | |
| lines.append(f"error={type(exc).__name__}: {exc}") | |
| if COMFY_LOG.exists(): | |
| lines.append("comfy_log_tail:") | |
| lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-220:]) | |
| return "\n".join(lines) | |
| def voicegate_user_run(audio_path: str | None, target_language: str, tts_trim_start: float) -> tuple[ | |
| str | None, | |
| str, | |
| str | None, | |
| str | None, | |
| str, | |
| str, | |
| ]: | |
| try: | |
| result = run_full_voicegate( | |
| audio_path, | |
| target_language, | |
| tts_trim_start=tts_trim_start, | |
| timeout=880, | |
| ) | |
| lines = result["lines"] | |
| output_audio = result["audio"] | |
| if not output_audio: | |
| lines.append("warning=No output audio file was found in ComfyUI history.") | |
| return ( | |
| output_audio, | |
| "\n".join(lines), | |
| result["source_subtitle_file"], | |
| result["translated_subtitle_file"], | |
| result["source_subtitle"], | |
| result["translated_subtitle"], | |
| ) | |
| except Exception as exc: | |
| lines = gpu_status_lines() | |
| lines.append(f"error={type(exc).__name__}: {exc}") | |
| if COMFY_LOG.exists(): | |
| lines.append("comfy_log_tail:") | |
| lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-160:]) | |
| return None, "\n".join(lines), None, None, "", "" | |
| with gr.Blocks(title="VoiceGate", fill_width=True) as demo: | |
| with gr.Tab("Translate"): | |
| gr.HTML( | |
| """ | |
| <section class="voicegate-card voicegate-intro"> | |
| <div class="voicegate-kicker">ComfyUI workflow · multilingual dubbing</div> | |
| <h1>VoiceGate</h1> | |
| <p> | |
| VoiceGate transforms speech clips into precisely time-aligned multilingual dubbing. Each sentence is | |
| automatically matched to the original speech timestamp, so the generated voice follows the source | |
| rhythm and stays synchronized with the subtitles and video timeline. The pipeline combines ASR, | |
| LLM translation, multilingual TTS, SRT-based audio alignment, and ambience preservation to produce | |
| natural translated dubbing while keeping the original pacing and background atmosphere. Runtime is | |
| usually close to the uploaded audio duration. | |
| </p> | |
| <div class="voicegate-link-row"> | |
| <a class="voicegate-github" href="https://github.com/YanTianlong-01/VoiceGate" target="_blank">GitHub source</a> | |
| <a href="https://www.runninghub.ai/ai-detail/2062442306350964737?inviteCode=rh-v1455" target="_blank">Online app - audio</a> | |
| <a href="https://www.runninghub.ai/ai-detail/2062446982618238978?inviteCode=rh-v1455" target="_blank">Online app - video</a> | |
| <a href="https://www.runninghub.ai/post/2062432233125928961?inviteCode=rh-v1455" target="_blank">ComfyUI workflow - audio</a> | |
| <a href="https://www.runninghub.ai/post/2062445363042283522?inviteCode=rh-v1455" target="_blank">ComfyUI workflow - video</a> | |
| </div> | |
| </section> | |
| """ | |
| ) | |
| with gr.Row(elem_classes=["voicegate-shell"]): | |
| with gr.Column(scale=4, min_width=300): | |
| with gr.Blocks(elem_classes=["voicegate-card"]): | |
| gr.HTML('<div class="voicegate-card-label">Input <span class="voicegate-tag">required</span></div>') | |
| user_audio = gr.Audio( | |
| label="Upload audio", | |
| type="filepath", | |
| elem_classes=["voicegate-control-card"], | |
| waveform_options=VOICEGATE_WAVEFORM_OPTIONS, | |
| ) | |
| user_target_language = gr.Dropdown( | |
| label="Target language", | |
| choices=TARGET_LANGUAGES, | |
| value="English", | |
| elem_classes=["voicegate-control-card"], | |
| ) | |
| with gr.Accordion("Advanced audio cleanup", open=False, elem_classes=["voicegate-accordion-card"]): | |
| user_tts_trim_start = gr.Slider( | |
| label="TTS segment trim start", | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.0, | |
| step=0.05, | |
| info=( | |
| "Skips the first n seconds of each generated TTS segment. " | |
| "Use this to remove short noises that may appear at the beginning of generated speech segments." | |
| ), | |
| ) | |
| user_run = gr.Button( | |
| "Generate translated dubbing", | |
| variant="primary", | |
| elem_classes=["voicegate-run-button"], | |
| ) | |
| with gr.Column(scale=8, min_width=420): | |
| with gr.Blocks(elem_classes=["voicegate-card"]): | |
| gr.HTML('<div class="voicegate-card-label">Output <span class="voicegate-tag">audio + subtitles</span></div>') | |
| user_output_audio = gr.Audio( | |
| label="Translated dubbing audio", | |
| type="filepath", | |
| elem_classes=["voicegate-control-card"], | |
| waveform_options=VOICEGATE_WAVEFORM_OPTIONS, | |
| ) | |
| with gr.Row(elem_classes=["voicegate-downloads"]): | |
| user_source_file = gr.DownloadButton("Download original subtitles", size="sm") | |
| user_translated_file = gr.DownloadButton("Download translated subtitles", size="sm") | |
| with gr.Accordion("Subtitle preview", open=True, elem_classes=["voicegate-accordion-card"]): | |
| with gr.Row(): | |
| user_source_text = gr.Textbox(label="Original subtitles", lines=8) | |
| user_translated_text = gr.Textbox(label="Translated subtitles", lines=8) | |
| with gr.Blocks(elem_classes=["voicegate-card"]): | |
| with gr.Accordion("Log", open=True, elem_classes=["voicegate-accordion-card"]): | |
| user_status = gr.Textbox(label="Status", lines=12, elem_classes=["voicegate-status"]) | |
| user_run.click( | |
| fn=voicegate_user_run, | |
| inputs=[user_audio, user_target_language, user_tts_trim_start], | |
| outputs=[ | |
| user_output_audio, | |
| user_status, | |
| user_source_file, | |
| user_translated_file, | |
| user_source_text, | |
| user_translated_text, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(theme=voicegate_theme(), css=APP_CSS) | |