from __future__ import annotations import json import math import os import shutil import subprocess import sys import time import uuid import wave from pathlib import Path from typing import Any try: import matplotlib matplotlib.use("Agg") except ImportError: pass import gradio as gr import requests import spaces import torch import websocket from scripts.workflow_client import load_workflow, patch_voicegate_workflow ROOT = Path(__file__).resolve().parent COMFY_DIR = ROOT / "ComfyUI" COMFY_INPUT_DIR = COMFY_DIR / "input" COMFY_LOG = Path("/tmp/voicegate_comfy_gradio.log") COMFY_URL = "http://127.0.0.1:8188" COMFY_HOST = "127.0.0.1" COMFY_PORT = "8188" COMFY_PROCESS: subprocess.Popen | None = None PREPARE_PROCESS: subprocess.Popen | None = None BOOTSTRAPPED = False BOOTSTRAP_LOG = Path("/tmp/voicegate_bootstrap.log") USER_OUTPUT_DIR = ROOT / "user_outputs" REQUIRED_MODEL_PATHS = [ COMFY_DIR / "models" / "diffusion_models" / "MelBandRoFormer_comfy" / "MelBandRoformer_fp32.safetensors", COMFY_DIR / "models" / "voxcpm" / "VoxCPM2" / "model.safetensors", COMFY_DIR / "models" / "voxcpm" / "VoxCPM2" / "audiovae.pth", COMFY_DIR / "models" / "Qwen3-ASR" / "Qwen3-ASR-1.7B", COMFY_DIR / "models" / "Qwen3-ASR" / "Qwen3-ForcedAligner-0.6B", ] TARGET_LANGUAGES = [ "Arabic", "Burmese", "Chinese", "Danish", "Dutch", "English", "Finnish", "French", "German", "Greek", "Hebrew", "Hindi", "Indonesian", "Italian", "Japanese", "Khmer", "Korean", "Lao", "Malay", "Norwegian", "Polish", "Portuguese", "Russian", "Spanish", "Swahili", "Swedish", "Tagalog", "Thai", "Turkish", "Vietnamese", ] VG_PRIMARY = "#6366c7" VG_WAVEFORM = "#98a2b3" VOICEGATE_WAVEFORM_OPTIONS = gr.WaveformOptions( waveform_color=VG_WAVEFORM, waveform_progress_color=VG_PRIMARY, ) APP_CSS = """ :root { --vg-primary: #6366c7; --vg-primary-dark: #5255b5; --vg-ink: #171827; --vg-muted: #667085; --vg-line: #eceef5; --vg-soft: #f6f7fb; --vg-radius: 8px; --vg-radius-sm: 6px; } :root:root:root:root main { max-width: 1160px; margin-left: auto !important; margin-right: auto !important; } :root:root:root:root .gradio-container { overflow: unset; } .voicegate-shell { gap: 16px; } .voicegate-card { background: #ffffff; border: 1px solid var(--vg-line); border-radius: var(--vg-radius) !important; padding: 12px; box-shadow: none; overflow: hidden; } /* Gradio may attach elem_classes to an outer wrapper while the visible block is a child element. Apply the same rounded corner to both so the final rendered card never appears square. */ .voicegate-card.block, .voicegate-card > .block, .voicegate-card > div, .voicegate-card > div > .block { border-radius: var(--vg-radius) !important; overflow: hidden; } .voicegate-intro { margin: 10px 0 12px; padding: 18px; border-color: rgba(99, 102, 199, 0.24); background: linear-gradient(180deg, #ffffff 0%, #f8f8ff 100%); } .voicegate-kicker { color: var(--vg-primary); font-size: 12px; font-weight: 700; letter-spacing: 0; text-transform: uppercase; } .voicegate-intro h1 { margin: 6px 0 8px; color: var(--vg-ink); font-size: 30px; line-height: 1.12; letter-spacing: 0; } .voicegate-intro p { max-width: none; width: 100%; margin: 0; color: var(--vg-muted); font-size: 14px; line-height: 1.6; } .voicegate-link-row { display: flex; flex-wrap: wrap; gap: 8px; margin-top: 14px; } .voicegate-link-row a { display: inline-flex; min-height: 34px; align-items: center; justify-content: center; border: 1px solid rgba(99, 102, 199, 0.34); border-radius: var(--vg-radius-sm); padding: 6px 12px; color: var(--vg-primary) !important; background: #ffffff; font-size: 13px; font-weight: 650; text-decoration: none; } .voicegate-link-row a:hover { border-color: var(--vg-primary); background: #f4f4ff; } .voicegate-link-row a.voicegate-github { border-color: var(--vg-primary); background: var(--vg-primary); color: #ffffff !important; } .voicegate-link-row a.voicegate-github:hover { border-color: var(--vg-primary-dark); background: var(--vg-primary-dark); } .voicegate-card-label { display: inline-flex; align-items: center; margin: 0 0 10px; border-radius: var(--vg-radius-sm); padding: 5px 8px; background: #ececf1; color: var(--vg-ink); font-size: 12px; font-weight: 700; letter-spacing: 0; text-transform: uppercase; } .voicegate-card-label .voicegate-tag { margin-left: 8px; border-radius: 999px; padding: 2px 7px; color: var(--vg-primary); background: #ffffff; font-size: 12px; font-weight: 700; text-transform: none; } /* Keep only the outer VoiceGate card. Gradio generates many nested blocks/forms; these rules prevent each nested wrapper from drawing another visible box. */ .voicegate-card .block, .voicegate-card .form, .voicegate-card .panel, .voicegate-card .accordion, .voicegate-card .tabs, .voicegate-card .tabitem { border: 0 !important; box-shadow: none !important; background: transparent !important; } .voicegate-card .block { padding-left: 0 !important; padding-right: 0 !important; } .voicegate-card textarea, .voicegate-card input, .voicegate-card select { border: 0 !important; box-shadow: none !important; } .voicegate-card textarea { font-size: 13px; } /* Match FaceFusion-like softly rounded inner controls without adding extra boxes. */ .voicegate-card input, .voicegate-card textarea, .voicegate-card select, .voicegate-card button, .voicegate-card .wrap, .voicegate-card .container, .voicegate-card .input-container, .voicegate-card .dropdown-arrow, .voicegate-card details, .voicegate-card details > summary { border-radius: var(--vg-radius-sm) !important; } /* Rounded corners for visible component cards such as Upload audio and Target language. Gradio applies elem_classes to a wrapper, so radius must also be pushed into the rendered block and its inner containers. */ .voicegate-control-card, .voicegate-control-card.block, .voicegate-control-card > .block, .voicegate-control-card > div, .voicegate-control-card > div > .block, .voicegate-control-card .wrap, .voicegate-control-card .container, .voicegate-control-card .input-container { border-radius: var(--vg-radius) !important; overflow: hidden !important; } .voicegate-control-card .block, .voicegate-control-card .form { border-radius: var(--vg-radius) !important; } .voicegate-control-card input, .voicegate-control-card textarea, .voicegate-control-card select, .voicegate-control-card button { border-radius: var(--vg-radius-sm) !important; } /* Rounded accordion cards: Advanced audio cleanup, Subtitle preview, and Log. Keep them visually light, but give the expanded sections the same soft radius as Upload audio and Target language. */ .voicegate-accordion-card, .voicegate-accordion-card.block, .voicegate-accordion-card > .block, .voicegate-accordion-card > div, .voicegate-accordion-card > div > .block, .voicegate-accordion-card details { border-radius: var(--vg-radius) !important; overflow: hidden !important; } .voicegate-accordion-card details { border: 1px solid var(--vg-line) !important; background: #ffffff !important; box-shadow: none !important; } .voicegate-accordion-card details > summary { border-radius: var(--vg-radius) var(--vg-radius) 0 0 !important; padding: 10px 12px !important; background: var(--vg-soft) !important; box-shadow: none !important; } .voicegate-accordion-card details:not([open]) > summary { border-radius: var(--vg-radius) !important; } .voicegate-accordion-card details[open] > summary { border-bottom: 1px solid var(--vg-line) !important; } /* The content rendered inside an open accordion can have its own Gradio wrappers. Round those wrappers too so textboxes/sliders do not look square inside. */ .voicegate-accordion-card .block, .voicegate-accordion-card .form, .voicegate-accordion-card .wrap, .voicegate-accordion-card .container, .voicegate-accordion-card .input-container, .voicegate-accordion-card textarea, .voicegate-accordion-card input, .voicegate-accordion-card select { border-radius: var(--vg-radius-sm) !important; } /* Full-width primary action without an extra gr.Group wrapper. */ .voicegate-run-button, .voicegate-run-button button, button.voicegate-run-button { width: 100%; } .voicegate-run-button button.primary, .voicegate-run-button .primary, button.voicegate-run-button.primary { background: var(--vg-primary) !important; border-color: var(--vg-primary) !important; color: #ffffff !important; } .voicegate-run-button button.primary:hover, .voicegate-run-button .primary:hover, button.voicegate-run-button.primary:hover { background: var(--vg-primary-dark) !important; border-color: var(--vg-primary-dark) !important; } .voicegate-downloads { gap: 10px; } .voicegate-downloads button, .voicegate-downloads a { width: 100%; } .voicegate-status textarea { font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; font-size: 12px; } :root:root:root:root input[type="range"] { accent-color: var(--vg-primary); } :root:root:root:root input[type="range"]::-moz-range-thumb, :root:root:root:root input[type="range"]::-webkit-slider-thumb { background: var(--vg-primary); box-shadow: none; } :root:root:root:root .tab-container button.selected, :root:root:root:root button[role="tab"][aria-selected="true"] { color: var(--vg-primary); border-color: var(--vg-primary); } :root:root:root:root footer { display: none; } @media (max-width: 760px) { .voicegate-intro h1 { font-size: 26px; } .voicegate-link-row a { flex: 1 1 46%; } } """ def gpu_status_lines() -> list[str]: lines = ["VoiceGate GPU status"] lines.append(f"torch={torch.__version__}") lines.append(f"cuda_available={torch.cuda.is_available()}") lines.append(f"cuda_device_count={torch.cuda.device_count()}") if torch.cuda.is_available(): props = torch.cuda.get_device_properties(0) lines.append(f"device_name={torch.cuda.get_device_name(0)}") lines.append(f"total_memory_gb={props.total_memory / 1024**3:.2f}") return lines def voicegate_theme() -> gr.Theme: primary = gr.themes.Color( name="voicegate", c50="#f5f5ff", c100="#ececff", c200="#dadaff", c300="#b8b9fb", c400="#9193ee", c500="#6366c7", c600="#5255b5", c700="#444695", c800="#393b78", c900="#313262", c950="#1f2040", ) return gr.themes.Base( primary_hue=primary, secondary_hue=gr.themes.colors.neutral, radius_size=gr.themes.sizes.radius_md, font=[gr.themes.GoogleFont("Open Sans"), "ui-sans-serif", "system-ui", "sans-serif"], ).set( background_fill_primary="*neutral_100", background_fill_secondary="*neutral_50", block_background_fill="white", block_border_width="0", block_label_background_fill="*neutral_100", block_label_border_width="none", block_label_margin="0.5rem", block_label_radius="*radius_sm", block_label_text_color="*neutral_700", block_label_text_size="*text_sm", block_label_text_weight="600", block_padding="0.5rem", border_color_primary="transparent", button_primary_background_fill="*primary_500", button_primary_background_fill_hover="*primary_600", button_primary_text_color="white", input_background_fill="*neutral_50", shadow_drop="none", slider_color="*primary_500", ) def wait_for_comfy(timeout: float = 180) -> dict[str, Any]: deadline = time.time() + timeout last_error = "" while time.time() < deadline: try: response = requests.get(f"{COMFY_URL}/system_stats", timeout=5) if response.ok: return response.json() last_error = f"HTTP {response.status_code}: {response.text[:300]}" except requests.RequestException as exc: last_error = repr(exc) time.sleep(2) raise RuntimeError(f"ComfyUI did not become ready: {last_error}") def run_bootstrap(lines: list[str], *, allow_heavy: bool = True) -> None: global BOOTSTRAPPED if BOOTSTRAPPED and (COMFY_DIR / "main.py").exists(): lines.append("bootstrap=already_done") return if (COMFY_DIR / "main.py").exists() and (COMFY_DIR / "custom_nodes").exists(): if not allow_heavy: lines.append("bootstrap=existing_comfyui") BOOTSTRAPPED = True return started = time.time() lines.append("bootstrap=starting") command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py")] result = subprocess.run( command, cwd=ROOT, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=900, ) lines.append(f"bootstrap_returncode={result.returncode}") lines.append(f"bootstrap_elapsed_sec={time.time() - started:.1f}") if result.returncode != 0: lines.append("bootstrap_tail:") lines.extend(result.stdout.splitlines()[-80:]) raise RuntimeError("bootstrap_comfy.py failed") BOOTSTRAPPED = True def missing_required_models() -> list[Path]: return [path for path in REQUIRED_MODEL_PATHS if not path.exists()] def ensure_runtime_assets(lines: list[str]) -> None: missing = missing_required_models() if not missing: lines.append("models=ready") return lines.append("models=missing") lines.extend(f"missing_model={path}" for path in missing) started = time.time() command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py"), "--with-models"] result = subprocess.run( command, cwd=ROOT, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=1800, ) lines.append(f"model_prepare_returncode={result.returncode}") lines.append(f"model_prepare_elapsed_sec={time.time() - started:.1f}") if result.returncode != 0: lines.append("model_prepare_tail:") lines.extend(result.stdout.splitlines()[-100:]) raise RuntimeError("Could not prepare required VoiceGate models.") remaining = missing_required_models() if remaining: lines.append("models_still_missing:") lines.extend(str(path) for path in remaining) raise RuntimeError("Required VoiceGate models are still missing after preparation.") lines.append("models=ready_after_prepare") def ensure_comfy(lines: list[str], *, timeout: float = 240) -> dict[str, Any]: global COMFY_PROCESS if PREPARE_PROCESS is not None: returncode = PREPARE_PROCESS.poll() if returncode is None: raise RuntimeError("Runtime preparation is still running. Check Prepare Status first.") if returncode != 0: raise RuntimeError(f"Runtime preparation failed with return code {returncode}.") run_bootstrap(lines, allow_heavy=False) try: stats = wait_for_comfy(timeout=5) lines.append("comfy=already_running") return stats except RuntimeError: pass log = COMFY_LOG.open("ab") command = [ sys.executable, "main.py", "--listen", COMFY_HOST, "--port", COMFY_PORT, ] COMFY_PROCESS = subprocess.Popen( command, cwd=COMFY_DIR, stdout=log, stderr=subprocess.STDOUT, ) lines.append(f"comfy_started_pid={COMFY_PROCESS.pid}") try: return wait_for_comfy(timeout=timeout) except Exception: lines.append("comfy_log_tail:") if COMFY_LOG.exists(): lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-120:]) raise def write_sine_wav(filename: str, *, seconds: float = 1.0, frequency: float = 440.0) -> str: COMFY_INPUT_DIR.mkdir(parents=True, exist_ok=True) path = COMFY_INPUT_DIR / filename sample_rate = 16000 total = int(sample_rate * seconds) amplitude = 0.2 with wave.open(str(path), "wb") as file: file.setnchannels(1) file.setsampwidth(2) file.setframerate(sample_rate) for index in range(total): value = int(32767 * amplitude * math.sin(2 * math.pi * frequency * index / sample_rate)) file.writeframesraw(value.to_bytes(2, byteorder="little", signed=True)) return filename def submit_prompt(workflow: dict[str, Any], *, client_id: str | None = None) -> str: response = requests.post( f"{COMFY_URL}/prompt", json={"prompt": workflow, "client_id": client_id or str(uuid.uuid4())}, timeout=120, ) if not response.ok: raise RuntimeError(f"/prompt failed HTTP {response.status_code}: {response.text[:2000]}") return response.json()["prompt_id"] def execute_prompt_with_timing(workflow: dict[str, Any], *, timeout: float) -> tuple[str, dict[str, Any], list[str]]: client_id = str(uuid.uuid4()) websocket_url = f"ws://{COMFY_HOST}:{COMFY_PORT}/ws?clientId={client_id}" ws = websocket.create_connection(websocket_url, timeout=30) prompt_id = submit_prompt(workflow, client_id=client_id) started = time.time() deadline = started + timeout current_node: str | None = None current_started = 0.0 node_durations: dict[str, float] = {} node_order: list[str] = [] event_lines = [f"prompt_id={prompt_id}", "node_timing=started"] def close_current_node(now: float) -> None: nonlocal current_node, current_started if current_node is not None: node_durations[current_node] = node_durations.get(current_node, 0.0) + max(0.0, now - current_started) current_node = None current_started = 0.0 try: while time.time() < deadline: ws.settimeout(max(1.0, min(10.0, deadline - time.time()))) try: message = ws.recv() except websocket.WebSocketTimeoutException: continue if isinstance(message, bytes): message = message.decode("utf-8", errors="replace") try: payload = json.loads(message) except json.JSONDecodeError: continue event_type = payload.get("type") data = payload.get("data") or {} if data.get("prompt_id") not in (None, prompt_id): continue now = time.time() if event_type == "executing": close_current_node(now) node = data.get("node") if node is None: continue current_node = str(node) current_started = now if current_node not in node_order: node_order.append(current_node) elif event_type == "execution_success": close_current_node(now) event_lines.append(f"websocket_elapsed_sec={now - started:.1f}") break elif event_type == "execution_error": close_current_node(now) event_lines.append("websocket_execution_error:") event_lines.append(json.dumps(data, ensure_ascii=False, indent=2)[:4000]) break else: close_current_node(time.time()) raise TimeoutError(f"Timed out waiting for prompt {prompt_id}") finally: ws.close() history = wait_for_history(prompt_id, timeout=30) timed_nodes = sorted( ((node_id, node_durations.get(node_id, 0.0)) for node_id in node_order), key=lambda item: item[1], reverse=True, ) if timed_nodes: event_lines.append("node_timing_top:") for node_id, seconds in timed_nodes[:20]: class_type = workflow.get(node_id, {}).get("class_type", "unknown") event_lines.append(f"{node_id} {class_type}: {seconds:.1f}s") return prompt_id, history, event_lines def wait_for_history(prompt_id: str, timeout: float = 1200) -> dict[str, Any]: deadline = time.time() + timeout while time.time() < deadline: response = requests.get(f"{COMFY_URL}/history/{prompt_id}", timeout=30) response.raise_for_status() payload = response.json() if prompt_id in payload: return payload[prompt_id] time.sleep(2) raise TimeoutError(f"Timed out waiting for prompt {prompt_id}") def history_summary(history: dict[str, Any]) -> list[str]: lines = [] status = history.get("status", {}) lines.append(f"status_str={status.get('status_str')}") lines.append(f"completed={status.get('completed')}") messages = status.get("messages") or [] errors = [message for message in messages if isinstance(message, list) and message[0] == "execution_error"] if errors: lines.append("errors:") lines.append(json.dumps(errors, ensure_ascii=False, indent=2)[:4000]) outputs = history.get("outputs", {}) output_files = [] for node_output in outputs.values(): for key in ("audio", "images", "gifs"): for item in node_output.get(key, []) or []: filename = item.get("filename") subfolder = item.get("subfolder") if subfolder: output_files.append(f"{subfolder}/{filename}") elif filename: output_files.append(filename) if output_files: lines.append("outputs:") lines.extend(output_files) text_outputs = [] for node_output in outputs.values(): for key in ("text", "string"): values = node_output.get(key, []) or [] if isinstance(values, str): values = [values] text_outputs.extend(str(value) for value in values) if text_outputs: lines.append("text_outputs:") for value in text_outputs: lines.append(value[:2000]) return lines def first_output_audio_path(history: dict[str, Any]) -> str | None: outputs = history.get("outputs", {}) for node_output in outputs.values(): for item in node_output.get("audio", []) or []: filename = item.get("filename") if not filename: continue subfolder = item.get("subfolder") or "" path = COMFY_DIR / "output" / subfolder / filename if path.exists(): return str(path) return None def text_outputs_for_node(history: dict[str, Any], node_id: str) -> list[str]: node_output = (history.get("outputs", {}) or {}).get(node_id, {}) values: list[str] = [] for key in ("text", "string"): raw_values = node_output.get(key, []) or [] if isinstance(raw_values, str): raw_values = [raw_values] values.extend(str(value) for value in raw_values if str(value).strip()) return values def write_srt_file(prefix: str, name: str, text: str) -> str | None: if not text.strip(): return None USER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) path = USER_OUTPUT_DIR / f"{prefix}_{name}.srt" path.write_text(text, encoding="utf-8") return str(path) def melband_workflow(audio_filename: str, prefix: str) -> dict[str, Any]: return { "1": { "class_type": "LoadAudio", "inputs": {"audio": audio_filename, "audioUI": ""}, }, "2": { "class_type": "MelBandRoFormerModelLoader", "inputs": {"model_name": "MelBandRoFormer_comfy/MelBandRoformer_fp32.safetensors"}, }, "3": { "class_type": "MelBandRoFormerSampler", "inputs": {"model": ["2", 0], "audio": ["1", 0]}, }, "4": { "class_type": "SaveAudioMP3", "inputs": { "filename_prefix": f"audio/{prefix}_vocals", "quality": "V0", "audioUI": "", "audio": ["3", 0], }, }, "5": { "class_type": "SaveAudioMP3", "inputs": { "filename_prefix": f"audio/{prefix}_instruments", "quality": "V0", "audioUI": "", "audio": ["3", 1], }, }, } def voxcpm_tts_workflow(prefix: str) -> dict[str, Any]: return { "1": { "class_type": "RunningHub_VoxCPM_LoadModel", "inputs": {"model_name": "VoxCPM2", "optimize": False, "lora_name": "None"}, }, "2": { "class_type": "RunningHub_VoxCPM_Generate", "inputs": { "model": ["1", 0], "control_instruction": "清晰自然的中文女声", "text": "你好,VoiceGate GPU 语音合成测试。", "cfg_value": 2.0, "inference_steps": 4, "seed": 20260605, "ultimate_clone": False, "reference_audio_text": "", "normalize_text": False, "denoise_reference": False, "max_len": 512, "retry_badcase": True, }, }, "3": { "class_type": "SaveAudioMP3", "inputs": { "filename_prefix": f"audio/{prefix}", "quality": "V0", "audioUI": "", "audio": ["2", 0], }, }, } def copy_audio_to_comfy_input(audio_path: str | Path, prefix: str) -> str: source = Path(audio_path) if not source.exists(): raise FileNotFoundError(f"Uploaded audio does not exist: {source}") suffix = source.suffix or ".wav" filename = f"{prefix}_{uuid.uuid4().hex[:8]}{suffix}" COMFY_INPUT_DIR.mkdir(parents=True, exist_ok=True) shutil.copyfile(source, COMFY_INPUT_DIR / filename) return filename def asr_workflow(audio_filename: str, prefix: str) -> dict[str, Any]: return { "1": { "class_type": "LoadAudio", "inputs": {"audio": audio_filename, "audioUI": ""}, }, "2": { "class_type": "VoiceBridgeASRLoader", "inputs": { "repo_id": "Qwen/Qwen3-ASR-1.7B", "source": "HuggingFace", "precision": "bf16", "attention": "sdpa", "max_new_tokens": 256, "forced_aligner": "Qwen/Qwen3-ForcedAligner-0.6B", "local_model_path_asr": "", "local_model_path_fa": "", }, }, "3": { "class_type": "VoiceBridgeASRTranscribe", "inputs": { "model_key": ["2", 0], "audio": ["1", 0], "language": "auto", "context": "", "return_timestamps": True, }, }, "4": { "class_type": "GenerateSRT", "inputs": { "forced_aligns": ["3", 0], "text": ["3", 1], "language": ["3", 2], "save_srt": True, "filename_prefix": f"VoiceBridge/{prefix}", }, }, "5": { "class_type": "easy showAnything", "inputs": { "text": "", "anything": ["4", 0], }, }, } def full_voicegate_workflow( audio_filename: str, prefix: str, target_language: str, *, tts_trim_start: float, ) -> dict[str, Any]: workflow = load_workflow() return patch_voicegate_workflow( workflow, audio_filename=audio_filename, target_language=target_language, api_key=os.environ.get("DEEPSEEK_API_KEY"), api_baseurl=os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com"), llm_model=os.environ.get("DEEPSEEK_MODEL", "deepseek-v4-flash"), job_id=prefix, tts_trim_start=tts_trim_start, ) def run_full_voicegate( audio_path: str | None, target_language: str, *, tts_trim_start: float = 0.0, timeout: float = 880, ) -> dict[str, Any]: lines = gpu_status_lines() started = time.time() trim_start = min(1.0, max(0.0, float(tts_trim_start))) if not audio_path: raise ValueError("Please upload an audio file before running VoiceGate.") if not os.environ.get("DEEPSEEK_API_KEY"): raise RuntimeError("DEEPSEEK_API_KEY is not configured in the Space.") ensure_runtime_assets(lines) ensure_comfy(lines) prefix = f"full_{uuid.uuid4().hex[:8]}" audio_filename = copy_audio_to_comfy_input(audio_path, prefix) lines.append(f"input_audio={audio_filename}") lines.append(f"target_language={target_language}") lines.append(f"tts_trim_start={trim_start}") prompt = full_voicegate_workflow( audio_filename, prefix, target_language or "English", tts_trim_start=trim_start, ) _prompt_id, history, timing_lines = execute_prompt_with_timing(prompt, timeout=timeout) lines.extend(timing_lines) lines.extend(history_summary(history)) output_audio = first_output_audio_path(history) source_subtitle = "\n\n".join(text_outputs_for_node(history, "61")) translated_subtitle = "\n\n".join(text_outputs_for_node(history, "179") or text_outputs_for_node(history, "107")) source_subtitle_file = write_srt_file(prefix, "source", source_subtitle) translated_subtitle_file = write_srt_file(prefix, "translated", translated_subtitle) if output_audio: lines.append(f"output_audio_path={output_audio}") if source_subtitle_file: lines.append(f"source_subtitle_file={source_subtitle_file}") if translated_subtitle_file: lines.append(f"translated_subtitle_file={translated_subtitle_file}") lines.append(f"elapsed_sec={time.time() - started:.1f}") return { "lines": lines, "audio": output_audio, "source_subtitle": source_subtitle, "translated_subtitle": translated_subtitle, "source_subtitle_file": source_subtitle_file, "translated_subtitle_file": translated_subtitle_file, } def prepare_runtime() -> str: global PREPARE_PROCESS lines = ["VoiceGate runtime preparation"] if PREPARE_PROCESS is not None and PREPARE_PROCESS.poll() is None: lines.append(f"prepare=already_running pid={PREPARE_PROCESS.pid}") return "\n".join(lines) BOOTSTRAP_LOG.parent.mkdir(parents=True, exist_ok=True) log = BOOTSTRAP_LOG.open("ab") command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py"), "--with-models"] PREPARE_PROCESS = subprocess.Popen( command, cwd=ROOT, stdout=log, stderr=subprocess.STDOUT, ) lines.append(f"prepare=started pid={PREPARE_PROCESS.pid}") lines.append(f"log={BOOTSTRAP_LOG}") return "\n".join(lines) def prepare_status() -> str: global BOOTSTRAPPED lines = ["VoiceGate runtime preparation status"] if PREPARE_PROCESS is None: lines.append("prepare=not_started") else: returncode = PREPARE_PROCESS.poll() if returncode is None: lines.append(f"prepare=running pid={PREPARE_PROCESS.pid}") else: lines.append(f"prepare=finished returncode={returncode}") if returncode == 0 and (COMFY_DIR / "main.py").exists(): BOOTSTRAPPED = True lines.append(f"comfy_dir_exists={(COMFY_DIR / 'main.py').exists()}") if BOOTSTRAP_LOG.exists(): lines.append("bootstrap_log_tail:") lines.extend(BOOTSTRAP_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-80:]) return "\n".join(lines) @spaces.GPU(duration=60) def gpu_smoke_test() -> str: lines = gpu_status_lines() if torch.cuda.is_available(): tensor = torch.arange(16, device="cuda:0", dtype=torch.float32) result = (tensor * 2).sum().item() torch.cuda.synchronize() lines.append(f"tensor_result={result}") lines.append(f"memory_reserved_mb={torch.cuda.memory_reserved(0) / 1024**2:.2f}") return "\n".join(lines) @spaces.GPU(duration=900) def comfy_runtime_test() -> str: lines = gpu_status_lines() started = time.time() try: stats = ensure_comfy(lines) lines.append(f"comfy_ready=true") lines.append(f"comfy_elapsed_sec={time.time() - started:.1f}") lines.append("system_stats:") lines.append(json.dumps(stats, ensure_ascii=False, indent=2)[:4000]) except Exception as exc: lines.append(f"error={type(exc).__name__}: {exc}") return "\n".join(lines) @spaces.GPU(duration=1200) def melband_gpu_test() -> str: lines = gpu_status_lines() started = time.time() try: ensure_comfy(lines) audio_filename = write_sine_wav(f"voicegate_melband_{uuid.uuid4().hex[:8]}.wav") prefix = f"melband_gpu_{uuid.uuid4().hex[:8]}" prompt_id = submit_prompt(melband_workflow(audio_filename, prefix)) lines.append(f"prompt_id={prompt_id}") history = wait_for_history(prompt_id) lines.extend(history_summary(history)) lines.append(f"elapsed_sec={time.time() - started:.1f}") except Exception as exc: lines.append(f"error={type(exc).__name__}: {exc}") if COMFY_LOG.exists(): lines.append("comfy_log_tail:") lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-120:]) return "\n".join(lines) @spaces.GPU(duration=1200) def voxcpm_tts_gpu_test() -> str: lines = gpu_status_lines() started = time.time() try: ensure_comfy(lines) prefix = f"voxcpm_tts_gpu_{uuid.uuid4().hex[:8]}" prompt_id = submit_prompt(voxcpm_tts_workflow(prefix)) lines.append(f"prompt_id={prompt_id}") history = wait_for_history(prompt_id, timeout=1200) lines.extend(history_summary(history)) lines.append(f"elapsed_sec={time.time() - started:.1f}") except Exception as exc: lines.append(f"error={type(exc).__name__}: {exc}") if COMFY_LOG.exists(): lines.append("comfy_log_tail:") lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-160:]) return "\n".join(lines) @spaces.GPU(duration=900) def asr_gpu_test(audio_path: str | None) -> str: lines = gpu_status_lines() started = time.time() try: if not audio_path: raise ValueError("Please upload an audio file before running ASR.") ensure_comfy(lines) prefix = f"asr_gpu_{uuid.uuid4().hex[:8]}" audio_filename = copy_audio_to_comfy_input(audio_path, prefix) lines.append(f"input_audio={audio_filename}") prompt_id = submit_prompt(asr_workflow(audio_filename, prefix)) lines.append(f"prompt_id={prompt_id}") history = wait_for_history(prompt_id, timeout=900) lines.extend(history_summary(history)) lines.append(f"elapsed_sec={time.time() - started:.1f}") except Exception as exc: lines.append(f"error={type(exc).__name__}: {exc}") if COMFY_LOG.exists(): lines.append("comfy_log_tail:") lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-180:]) return "\n".join(lines) @spaces.GPU(duration=900) def full_voicegate_gpu_test(audio_path: str | None, target_language: str, tts_trim_start: float) -> str: try: result = run_full_voicegate(audio_path, target_language, tts_trim_start=tts_trim_start, timeout=880) lines = result["lines"] except Exception as exc: lines = gpu_status_lines() lines.append(f"error={type(exc).__name__}: {exc}") if COMFY_LOG.exists(): lines.append("comfy_log_tail:") lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-220:]) return "\n".join(lines) @spaces.GPU(duration=900) def voicegate_user_run(audio_path: str | None, target_language: str, tts_trim_start: float) -> tuple[ str | None, str, str | None, str | None, str, str, ]: try: result = run_full_voicegate( audio_path, target_language, tts_trim_start=tts_trim_start, timeout=880, ) lines = result["lines"] output_audio = result["audio"] if not output_audio: lines.append("warning=No output audio file was found in ComfyUI history.") return ( output_audio, "\n".join(lines), result["source_subtitle_file"], result["translated_subtitle_file"], result["source_subtitle"], result["translated_subtitle"], ) except Exception as exc: lines = gpu_status_lines() lines.append(f"error={type(exc).__name__}: {exc}") if COMFY_LOG.exists(): lines.append("comfy_log_tail:") lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-160:]) return None, "\n".join(lines), None, None, "", "" with gr.Blocks(title="VoiceGate", fill_width=True) as demo: with gr.Tab("Translate"): gr.HTML( """
ComfyUI workflow · multilingual dubbing

VoiceGate

VoiceGate transforms speech clips into precisely time-aligned multilingual dubbing. Each sentence is automatically matched to the original speech timestamp, so the generated voice follows the source rhythm and stays synchronized with the subtitles and video timeline. The pipeline combines ASR, LLM translation, multilingual TTS, SRT-based audio alignment, and ambience preservation to produce natural translated dubbing while keeping the original pacing and background atmosphere. Runtime is usually close to the uploaded audio duration.

""" ) with gr.Row(elem_classes=["voicegate-shell"]): with gr.Column(scale=4, min_width=300): with gr.Blocks(elem_classes=["voicegate-card"]): gr.HTML('
Input required
') user_audio = gr.Audio( label="Upload audio", type="filepath", elem_classes=["voicegate-control-card"], waveform_options=VOICEGATE_WAVEFORM_OPTIONS, ) user_target_language = gr.Dropdown( label="Target language", choices=TARGET_LANGUAGES, value="English", elem_classes=["voicegate-control-card"], ) with gr.Accordion("Advanced audio cleanup", open=False, elem_classes=["voicegate-accordion-card"]): user_tts_trim_start = gr.Slider( label="TTS segment trim start", minimum=0.0, maximum=1.0, value=0.0, step=0.05, info=( "Skips the first n seconds of each generated TTS segment. " "Use this to remove short noises that may appear at the beginning of generated speech segments." ), ) user_run = gr.Button( "Generate translated dubbing", variant="primary", elem_classes=["voicegate-run-button"], ) with gr.Column(scale=8, min_width=420): with gr.Blocks(elem_classes=["voicegate-card"]): gr.HTML('
Output audio + subtitles
') user_output_audio = gr.Audio( label="Translated dubbing audio", type="filepath", elem_classes=["voicegate-control-card"], waveform_options=VOICEGATE_WAVEFORM_OPTIONS, ) with gr.Row(elem_classes=["voicegate-downloads"]): user_source_file = gr.DownloadButton("Download original subtitles", size="sm") user_translated_file = gr.DownloadButton("Download translated subtitles", size="sm") with gr.Accordion("Subtitle preview", open=True, elem_classes=["voicegate-accordion-card"]): with gr.Row(): user_source_text = gr.Textbox(label="Original subtitles", lines=8) user_translated_text = gr.Textbox(label="Translated subtitles", lines=8) with gr.Blocks(elem_classes=["voicegate-card"]): with gr.Accordion("Log", open=True, elem_classes=["voicegate-accordion-card"]): user_status = gr.Textbox(label="Status", lines=12, elem_classes=["voicegate-status"]) user_run.click( fn=voicegate_user_run, inputs=[user_audio, user_target_language, user_tts_trim_start], outputs=[ user_output_audio, user_status, user_source_file, user_translated_file, user_source_text, user_translated_text, ], ) if __name__ == "__main__": demo.launch(theme=voicegate_theme(), css=APP_CSS)