from __future__ import annotations
import json
import math
import os
import shutil
import subprocess
import sys
import time
import uuid
import wave
from pathlib import Path
from typing import Any
try:
import matplotlib
matplotlib.use("Agg")
except ImportError:
pass
import gradio as gr
import requests
import spaces
import torch
import websocket
from scripts.workflow_client import load_workflow, patch_voicegate_workflow
ROOT = Path(__file__).resolve().parent
COMFY_DIR = ROOT / "ComfyUI"
COMFY_INPUT_DIR = COMFY_DIR / "input"
COMFY_LOG = Path("/tmp/voicegate_comfy_gradio.log")
COMFY_URL = "http://127.0.0.1:8188"
COMFY_HOST = "127.0.0.1"
COMFY_PORT = "8188"
COMFY_PROCESS: subprocess.Popen | None = None
PREPARE_PROCESS: subprocess.Popen | None = None
BOOTSTRAPPED = False
BOOTSTRAP_LOG = Path("/tmp/voicegate_bootstrap.log")
USER_OUTPUT_DIR = ROOT / "user_outputs"
REQUIRED_MODEL_PATHS = [
COMFY_DIR / "models" / "diffusion_models" / "MelBandRoFormer_comfy" / "MelBandRoformer_fp32.safetensors",
COMFY_DIR / "models" / "voxcpm" / "VoxCPM2" / "model.safetensors",
COMFY_DIR / "models" / "voxcpm" / "VoxCPM2" / "audiovae.pth",
COMFY_DIR / "models" / "Qwen3-ASR" / "Qwen3-ASR-1.7B",
COMFY_DIR / "models" / "Qwen3-ASR" / "Qwen3-ForcedAligner-0.6B",
]
TARGET_LANGUAGES = [
"Arabic",
"Burmese",
"Chinese",
"Danish",
"Dutch",
"English",
"Finnish",
"French",
"German",
"Greek",
"Hebrew",
"Hindi",
"Indonesian",
"Italian",
"Japanese",
"Khmer",
"Korean",
"Lao",
"Malay",
"Norwegian",
"Polish",
"Portuguese",
"Russian",
"Spanish",
"Swahili",
"Swedish",
"Tagalog",
"Thai",
"Turkish",
"Vietnamese",
]
VG_PRIMARY = "#6366c7"
VG_WAVEFORM = "#98a2b3"
VOICEGATE_WAVEFORM_OPTIONS = gr.WaveformOptions(
waveform_color=VG_WAVEFORM,
waveform_progress_color=VG_PRIMARY,
)
APP_CSS = """
:root {
--vg-primary: #6366c7;
--vg-primary-dark: #5255b5;
--vg-ink: #171827;
--vg-muted: #667085;
--vg-line: #eceef5;
--vg-soft: #f6f7fb;
--vg-radius: 8px;
--vg-radius-sm: 6px;
}
:root:root:root:root main {
max-width: 1160px;
margin-left: auto !important;
margin-right: auto !important;
}
:root:root:root:root .gradio-container {
overflow: unset;
}
.voicegate-shell {
gap: 16px;
}
.voicegate-card {
background: #ffffff;
border: 1px solid var(--vg-line);
border-radius: var(--vg-radius) !important;
padding: 12px;
box-shadow: none;
overflow: hidden;
}
/* Gradio may attach elem_classes to an outer wrapper while the visible block is a
child element. Apply the same rounded corner to both so the final rendered card
never appears square. */
.voicegate-card.block,
.voicegate-card > .block,
.voicegate-card > div,
.voicegate-card > div > .block {
border-radius: var(--vg-radius) !important;
overflow: hidden;
}
.voicegate-intro {
margin: 10px 0 12px;
padding: 18px;
border-color: rgba(99, 102, 199, 0.24);
background: linear-gradient(180deg, #ffffff 0%, #f8f8ff 100%);
}
.voicegate-kicker {
color: var(--vg-primary);
font-size: 12px;
font-weight: 700;
letter-spacing: 0;
text-transform: uppercase;
}
.voicegate-intro h1 {
margin: 6px 0 8px;
color: var(--vg-ink);
font-size: 30px;
line-height: 1.12;
letter-spacing: 0;
}
.voicegate-intro p {
max-width: none;
width: 100%;
margin: 0;
color: var(--vg-muted);
font-size: 14px;
line-height: 1.6;
}
.voicegate-link-row {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-top: 14px;
}
.voicegate-link-row a {
display: inline-flex;
min-height: 34px;
align-items: center;
justify-content: center;
border: 1px solid rgba(99, 102, 199, 0.34);
border-radius: var(--vg-radius-sm);
padding: 6px 12px;
color: var(--vg-primary) !important;
background: #ffffff;
font-size: 13px;
font-weight: 650;
text-decoration: none;
}
.voicegate-link-row a:hover {
border-color: var(--vg-primary);
background: #f4f4ff;
}
.voicegate-link-row a.voicegate-github {
border-color: var(--vg-primary);
background: var(--vg-primary);
color: #ffffff !important;
}
.voicegate-link-row a.voicegate-github:hover {
border-color: var(--vg-primary-dark);
background: var(--vg-primary-dark);
}
.voicegate-card-label {
display: inline-flex;
align-items: center;
margin: 0 0 10px;
border-radius: var(--vg-radius-sm);
padding: 5px 8px;
background: #ececf1;
color: var(--vg-ink);
font-size: 12px;
font-weight: 700;
letter-spacing: 0;
text-transform: uppercase;
}
.voicegate-card-label .voicegate-tag {
margin-left: 8px;
border-radius: 999px;
padding: 2px 7px;
color: var(--vg-primary);
background: #ffffff;
font-size: 12px;
font-weight: 700;
text-transform: none;
}
/* Keep only the outer VoiceGate card. Gradio generates many nested blocks/forms;
these rules prevent each nested wrapper from drawing another visible box. */
.voicegate-card .block,
.voicegate-card .form,
.voicegate-card .panel,
.voicegate-card .accordion,
.voicegate-card .tabs,
.voicegate-card .tabitem {
border: 0 !important;
box-shadow: none !important;
background: transparent !important;
}
.voicegate-card .block {
padding-left: 0 !important;
padding-right: 0 !important;
}
.voicegate-card textarea,
.voicegate-card input,
.voicegate-card select {
border: 0 !important;
box-shadow: none !important;
}
.voicegate-card textarea {
font-size: 13px;
}
/* Match FaceFusion-like softly rounded inner controls without adding extra boxes. */
.voicegate-card input,
.voicegate-card textarea,
.voicegate-card select,
.voicegate-card button,
.voicegate-card .wrap,
.voicegate-card .container,
.voicegate-card .input-container,
.voicegate-card .dropdown-arrow,
.voicegate-card details,
.voicegate-card details > summary {
border-radius: var(--vg-radius-sm) !important;
}
/* Rounded corners for visible component cards such as Upload audio and Target language.
Gradio applies elem_classes to a wrapper, so radius must also be pushed into
the rendered block and its inner containers. */
.voicegate-control-card,
.voicegate-control-card.block,
.voicegate-control-card > .block,
.voicegate-control-card > div,
.voicegate-control-card > div > .block,
.voicegate-control-card .wrap,
.voicegate-control-card .container,
.voicegate-control-card .input-container {
border-radius: var(--vg-radius) !important;
overflow: hidden !important;
}
.voicegate-control-card .block,
.voicegate-control-card .form {
border-radius: var(--vg-radius) !important;
}
.voicegate-control-card input,
.voicegate-control-card textarea,
.voicegate-control-card select,
.voicegate-control-card button {
border-radius: var(--vg-radius-sm) !important;
}
/* Rounded accordion cards: Advanced audio cleanup, Subtitle preview, and Log.
Keep them visually light, but give the expanded sections the same soft radius as
Upload audio and Target language. */
.voicegate-accordion-card,
.voicegate-accordion-card.block,
.voicegate-accordion-card > .block,
.voicegate-accordion-card > div,
.voicegate-accordion-card > div > .block,
.voicegate-accordion-card details {
border-radius: var(--vg-radius) !important;
overflow: hidden !important;
}
.voicegate-accordion-card details {
border: 1px solid var(--vg-line) !important;
background: #ffffff !important;
box-shadow: none !important;
}
.voicegate-accordion-card details > summary {
border-radius: var(--vg-radius) var(--vg-radius) 0 0 !important;
padding: 10px 12px !important;
background: var(--vg-soft) !important;
box-shadow: none !important;
}
.voicegate-accordion-card details:not([open]) > summary {
border-radius: var(--vg-radius) !important;
}
.voicegate-accordion-card details[open] > summary {
border-bottom: 1px solid var(--vg-line) !important;
}
/* The content rendered inside an open accordion can have its own Gradio wrappers.
Round those wrappers too so textboxes/sliders do not look square inside. */
.voicegate-accordion-card .block,
.voicegate-accordion-card .form,
.voicegate-accordion-card .wrap,
.voicegate-accordion-card .container,
.voicegate-accordion-card .input-container,
.voicegate-accordion-card textarea,
.voicegate-accordion-card input,
.voicegate-accordion-card select {
border-radius: var(--vg-radius-sm) !important;
}
/* Full-width primary action without an extra gr.Group wrapper. */
.voicegate-run-button,
.voicegate-run-button button,
button.voicegate-run-button {
width: 100%;
}
.voicegate-run-button button.primary,
.voicegate-run-button .primary,
button.voicegate-run-button.primary {
background: var(--vg-primary) !important;
border-color: var(--vg-primary) !important;
color: #ffffff !important;
}
.voicegate-run-button button.primary:hover,
.voicegate-run-button .primary:hover,
button.voicegate-run-button.primary:hover {
background: var(--vg-primary-dark) !important;
border-color: var(--vg-primary-dark) !important;
}
.voicegate-downloads {
gap: 10px;
}
.voicegate-downloads button,
.voicegate-downloads a {
width: 100%;
}
.voicegate-status textarea {
font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace;
font-size: 12px;
}
:root:root:root:root input[type="range"] {
accent-color: var(--vg-primary);
}
:root:root:root:root input[type="range"]::-moz-range-thumb,
:root:root:root:root input[type="range"]::-webkit-slider-thumb {
background: var(--vg-primary);
box-shadow: none;
}
:root:root:root:root .tab-container button.selected,
:root:root:root:root button[role="tab"][aria-selected="true"] {
color: var(--vg-primary);
border-color: var(--vg-primary);
}
:root:root:root:root footer {
display: none;
}
@media (max-width: 760px) {
.voicegate-intro h1 {
font-size: 26px;
}
.voicegate-link-row a {
flex: 1 1 46%;
}
}
"""
def gpu_status_lines() -> list[str]:
lines = ["VoiceGate GPU status"]
lines.append(f"torch={torch.__version__}")
lines.append(f"cuda_available={torch.cuda.is_available()}")
lines.append(f"cuda_device_count={torch.cuda.device_count()}")
if torch.cuda.is_available():
props = torch.cuda.get_device_properties(0)
lines.append(f"device_name={torch.cuda.get_device_name(0)}")
lines.append(f"total_memory_gb={props.total_memory / 1024**3:.2f}")
return lines
def voicegate_theme() -> gr.Theme:
primary = gr.themes.Color(
name="voicegate",
c50="#f5f5ff",
c100="#ececff",
c200="#dadaff",
c300="#b8b9fb",
c400="#9193ee",
c500="#6366c7",
c600="#5255b5",
c700="#444695",
c800="#393b78",
c900="#313262",
c950="#1f2040",
)
return gr.themes.Base(
primary_hue=primary,
secondary_hue=gr.themes.colors.neutral,
radius_size=gr.themes.sizes.radius_md,
font=[gr.themes.GoogleFont("Open Sans"), "ui-sans-serif", "system-ui", "sans-serif"],
).set(
background_fill_primary="*neutral_100",
background_fill_secondary="*neutral_50",
block_background_fill="white",
block_border_width="0",
block_label_background_fill="*neutral_100",
block_label_border_width="none",
block_label_margin="0.5rem",
block_label_radius="*radius_sm",
block_label_text_color="*neutral_700",
block_label_text_size="*text_sm",
block_label_text_weight="600",
block_padding="0.5rem",
border_color_primary="transparent",
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
button_primary_text_color="white",
input_background_fill="*neutral_50",
shadow_drop="none",
slider_color="*primary_500",
)
def wait_for_comfy(timeout: float = 180) -> dict[str, Any]:
deadline = time.time() + timeout
last_error = ""
while time.time() < deadline:
try:
response = requests.get(f"{COMFY_URL}/system_stats", timeout=5)
if response.ok:
return response.json()
last_error = f"HTTP {response.status_code}: {response.text[:300]}"
except requests.RequestException as exc:
last_error = repr(exc)
time.sleep(2)
raise RuntimeError(f"ComfyUI did not become ready: {last_error}")
def run_bootstrap(lines: list[str], *, allow_heavy: bool = True) -> None:
global BOOTSTRAPPED
if BOOTSTRAPPED and (COMFY_DIR / "main.py").exists():
lines.append("bootstrap=already_done")
return
if (COMFY_DIR / "main.py").exists() and (COMFY_DIR / "custom_nodes").exists():
if not allow_heavy:
lines.append("bootstrap=existing_comfyui")
BOOTSTRAPPED = True
return
started = time.time()
lines.append("bootstrap=starting")
command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py")]
result = subprocess.run(
command,
cwd=ROOT,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=900,
)
lines.append(f"bootstrap_returncode={result.returncode}")
lines.append(f"bootstrap_elapsed_sec={time.time() - started:.1f}")
if result.returncode != 0:
lines.append("bootstrap_tail:")
lines.extend(result.stdout.splitlines()[-80:])
raise RuntimeError("bootstrap_comfy.py failed")
BOOTSTRAPPED = True
def missing_required_models() -> list[Path]:
return [path for path in REQUIRED_MODEL_PATHS if not path.exists()]
def ensure_runtime_assets(lines: list[str]) -> None:
missing = missing_required_models()
if not missing:
lines.append("models=ready")
return
lines.append("models=missing")
lines.extend(f"missing_model={path}" for path in missing)
started = time.time()
command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py"), "--with-models"]
result = subprocess.run(
command,
cwd=ROOT,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=1800,
)
lines.append(f"model_prepare_returncode={result.returncode}")
lines.append(f"model_prepare_elapsed_sec={time.time() - started:.1f}")
if result.returncode != 0:
lines.append("model_prepare_tail:")
lines.extend(result.stdout.splitlines()[-100:])
raise RuntimeError("Could not prepare required VoiceGate models.")
remaining = missing_required_models()
if remaining:
lines.append("models_still_missing:")
lines.extend(str(path) for path in remaining)
raise RuntimeError("Required VoiceGate models are still missing after preparation.")
lines.append("models=ready_after_prepare")
def ensure_comfy(lines: list[str], *, timeout: float = 240) -> dict[str, Any]:
global COMFY_PROCESS
if PREPARE_PROCESS is not None:
returncode = PREPARE_PROCESS.poll()
if returncode is None:
raise RuntimeError("Runtime preparation is still running. Check Prepare Status first.")
if returncode != 0:
raise RuntimeError(f"Runtime preparation failed with return code {returncode}.")
run_bootstrap(lines, allow_heavy=False)
try:
stats = wait_for_comfy(timeout=5)
lines.append("comfy=already_running")
return stats
except RuntimeError:
pass
log = COMFY_LOG.open("ab")
command = [
sys.executable,
"main.py",
"--listen",
COMFY_HOST,
"--port",
COMFY_PORT,
]
COMFY_PROCESS = subprocess.Popen(
command,
cwd=COMFY_DIR,
stdout=log,
stderr=subprocess.STDOUT,
)
lines.append(f"comfy_started_pid={COMFY_PROCESS.pid}")
try:
return wait_for_comfy(timeout=timeout)
except Exception:
lines.append("comfy_log_tail:")
if COMFY_LOG.exists():
lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-120:])
raise
def write_sine_wav(filename: str, *, seconds: float = 1.0, frequency: float = 440.0) -> str:
COMFY_INPUT_DIR.mkdir(parents=True, exist_ok=True)
path = COMFY_INPUT_DIR / filename
sample_rate = 16000
total = int(sample_rate * seconds)
amplitude = 0.2
with wave.open(str(path), "wb") as file:
file.setnchannels(1)
file.setsampwidth(2)
file.setframerate(sample_rate)
for index in range(total):
value = int(32767 * amplitude * math.sin(2 * math.pi * frequency * index / sample_rate))
file.writeframesraw(value.to_bytes(2, byteorder="little", signed=True))
return filename
def submit_prompt(workflow: dict[str, Any], *, client_id: str | None = None) -> str:
response = requests.post(
f"{COMFY_URL}/prompt",
json={"prompt": workflow, "client_id": client_id or str(uuid.uuid4())},
timeout=120,
)
if not response.ok:
raise RuntimeError(f"/prompt failed HTTP {response.status_code}: {response.text[:2000]}")
return response.json()["prompt_id"]
def execute_prompt_with_timing(workflow: dict[str, Any], *, timeout: float) -> tuple[str, dict[str, Any], list[str]]:
client_id = str(uuid.uuid4())
websocket_url = f"ws://{COMFY_HOST}:{COMFY_PORT}/ws?clientId={client_id}"
ws = websocket.create_connection(websocket_url, timeout=30)
prompt_id = submit_prompt(workflow, client_id=client_id)
started = time.time()
deadline = started + timeout
current_node: str | None = None
current_started = 0.0
node_durations: dict[str, float] = {}
node_order: list[str] = []
event_lines = [f"prompt_id={prompt_id}", "node_timing=started"]
def close_current_node(now: float) -> None:
nonlocal current_node, current_started
if current_node is not None:
node_durations[current_node] = node_durations.get(current_node, 0.0) + max(0.0, now - current_started)
current_node = None
current_started = 0.0
try:
while time.time() < deadline:
ws.settimeout(max(1.0, min(10.0, deadline - time.time())))
try:
message = ws.recv()
except websocket.WebSocketTimeoutException:
continue
if isinstance(message, bytes):
message = message.decode("utf-8", errors="replace")
try:
payload = json.loads(message)
except json.JSONDecodeError:
continue
event_type = payload.get("type")
data = payload.get("data") or {}
if data.get("prompt_id") not in (None, prompt_id):
continue
now = time.time()
if event_type == "executing":
close_current_node(now)
node = data.get("node")
if node is None:
continue
current_node = str(node)
current_started = now
if current_node not in node_order:
node_order.append(current_node)
elif event_type == "execution_success":
close_current_node(now)
event_lines.append(f"websocket_elapsed_sec={now - started:.1f}")
break
elif event_type == "execution_error":
close_current_node(now)
event_lines.append("websocket_execution_error:")
event_lines.append(json.dumps(data, ensure_ascii=False, indent=2)[:4000])
break
else:
close_current_node(time.time())
raise TimeoutError(f"Timed out waiting for prompt {prompt_id}")
finally:
ws.close()
history = wait_for_history(prompt_id, timeout=30)
timed_nodes = sorted(
((node_id, node_durations.get(node_id, 0.0)) for node_id in node_order),
key=lambda item: item[1],
reverse=True,
)
if timed_nodes:
event_lines.append("node_timing_top:")
for node_id, seconds in timed_nodes[:20]:
class_type = workflow.get(node_id, {}).get("class_type", "unknown")
event_lines.append(f"{node_id} {class_type}: {seconds:.1f}s")
return prompt_id, history, event_lines
def wait_for_history(prompt_id: str, timeout: float = 1200) -> dict[str, Any]:
deadline = time.time() + timeout
while time.time() < deadline:
response = requests.get(f"{COMFY_URL}/history/{prompt_id}", timeout=30)
response.raise_for_status()
payload = response.json()
if prompt_id in payload:
return payload[prompt_id]
time.sleep(2)
raise TimeoutError(f"Timed out waiting for prompt {prompt_id}")
def history_summary(history: dict[str, Any]) -> list[str]:
lines = []
status = history.get("status", {})
lines.append(f"status_str={status.get('status_str')}")
lines.append(f"completed={status.get('completed')}")
messages = status.get("messages") or []
errors = [message for message in messages if isinstance(message, list) and message[0] == "execution_error"]
if errors:
lines.append("errors:")
lines.append(json.dumps(errors, ensure_ascii=False, indent=2)[:4000])
outputs = history.get("outputs", {})
output_files = []
for node_output in outputs.values():
for key in ("audio", "images", "gifs"):
for item in node_output.get(key, []) or []:
filename = item.get("filename")
subfolder = item.get("subfolder")
if subfolder:
output_files.append(f"{subfolder}/{filename}")
elif filename:
output_files.append(filename)
if output_files:
lines.append("outputs:")
lines.extend(output_files)
text_outputs = []
for node_output in outputs.values():
for key in ("text", "string"):
values = node_output.get(key, []) or []
if isinstance(values, str):
values = [values]
text_outputs.extend(str(value) for value in values)
if text_outputs:
lines.append("text_outputs:")
for value in text_outputs:
lines.append(value[:2000])
return lines
def first_output_audio_path(history: dict[str, Any]) -> str | None:
outputs = history.get("outputs", {})
for node_output in outputs.values():
for item in node_output.get("audio", []) or []:
filename = item.get("filename")
if not filename:
continue
subfolder = item.get("subfolder") or ""
path = COMFY_DIR / "output" / subfolder / filename
if path.exists():
return str(path)
return None
def text_outputs_for_node(history: dict[str, Any], node_id: str) -> list[str]:
node_output = (history.get("outputs", {}) or {}).get(node_id, {})
values: list[str] = []
for key in ("text", "string"):
raw_values = node_output.get(key, []) or []
if isinstance(raw_values, str):
raw_values = [raw_values]
values.extend(str(value) for value in raw_values if str(value).strip())
return values
def write_srt_file(prefix: str, name: str, text: str) -> str | None:
if not text.strip():
return None
USER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
path = USER_OUTPUT_DIR / f"{prefix}_{name}.srt"
path.write_text(text, encoding="utf-8")
return str(path)
def melband_workflow(audio_filename: str, prefix: str) -> dict[str, Any]:
return {
"1": {
"class_type": "LoadAudio",
"inputs": {"audio": audio_filename, "audioUI": ""},
},
"2": {
"class_type": "MelBandRoFormerModelLoader",
"inputs": {"model_name": "MelBandRoFormer_comfy/MelBandRoformer_fp32.safetensors"},
},
"3": {
"class_type": "MelBandRoFormerSampler",
"inputs": {"model": ["2", 0], "audio": ["1", 0]},
},
"4": {
"class_type": "SaveAudioMP3",
"inputs": {
"filename_prefix": f"audio/{prefix}_vocals",
"quality": "V0",
"audioUI": "",
"audio": ["3", 0],
},
},
"5": {
"class_type": "SaveAudioMP3",
"inputs": {
"filename_prefix": f"audio/{prefix}_instruments",
"quality": "V0",
"audioUI": "",
"audio": ["3", 1],
},
},
}
def voxcpm_tts_workflow(prefix: str) -> dict[str, Any]:
return {
"1": {
"class_type": "RunningHub_VoxCPM_LoadModel",
"inputs": {"model_name": "VoxCPM2", "optimize": False, "lora_name": "None"},
},
"2": {
"class_type": "RunningHub_VoxCPM_Generate",
"inputs": {
"model": ["1", 0],
"control_instruction": "清晰自然的中文女声",
"text": "你好,VoiceGate GPU 语音合成测试。",
"cfg_value": 2.0,
"inference_steps": 4,
"seed": 20260605,
"ultimate_clone": False,
"reference_audio_text": "",
"normalize_text": False,
"denoise_reference": False,
"max_len": 512,
"retry_badcase": True,
},
},
"3": {
"class_type": "SaveAudioMP3",
"inputs": {
"filename_prefix": f"audio/{prefix}",
"quality": "V0",
"audioUI": "",
"audio": ["2", 0],
},
},
}
def copy_audio_to_comfy_input(audio_path: str | Path, prefix: str) -> str:
source = Path(audio_path)
if not source.exists():
raise FileNotFoundError(f"Uploaded audio does not exist: {source}")
suffix = source.suffix or ".wav"
filename = f"{prefix}_{uuid.uuid4().hex[:8]}{suffix}"
COMFY_INPUT_DIR.mkdir(parents=True, exist_ok=True)
shutil.copyfile(source, COMFY_INPUT_DIR / filename)
return filename
def asr_workflow(audio_filename: str, prefix: str) -> dict[str, Any]:
return {
"1": {
"class_type": "LoadAudio",
"inputs": {"audio": audio_filename, "audioUI": ""},
},
"2": {
"class_type": "VoiceBridgeASRLoader",
"inputs": {
"repo_id": "Qwen/Qwen3-ASR-1.7B",
"source": "HuggingFace",
"precision": "bf16",
"attention": "sdpa",
"max_new_tokens": 256,
"forced_aligner": "Qwen/Qwen3-ForcedAligner-0.6B",
"local_model_path_asr": "",
"local_model_path_fa": "",
},
},
"3": {
"class_type": "VoiceBridgeASRTranscribe",
"inputs": {
"model_key": ["2", 0],
"audio": ["1", 0],
"language": "auto",
"context": "",
"return_timestamps": True,
},
},
"4": {
"class_type": "GenerateSRT",
"inputs": {
"forced_aligns": ["3", 0],
"text": ["3", 1],
"language": ["3", 2],
"save_srt": True,
"filename_prefix": f"VoiceBridge/{prefix}",
},
},
"5": {
"class_type": "easy showAnything",
"inputs": {
"text": "",
"anything": ["4", 0],
},
},
}
def full_voicegate_workflow(
audio_filename: str,
prefix: str,
target_language: str,
*,
tts_trim_start: float,
) -> dict[str, Any]:
workflow = load_workflow()
return patch_voicegate_workflow(
workflow,
audio_filename=audio_filename,
target_language=target_language,
api_key=os.environ.get("DEEPSEEK_API_KEY"),
api_baseurl=os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com"),
llm_model=os.environ.get("DEEPSEEK_MODEL", "deepseek-v4-flash"),
job_id=prefix,
tts_trim_start=tts_trim_start,
)
def run_full_voicegate(
audio_path: str | None,
target_language: str,
*,
tts_trim_start: float = 0.0,
timeout: float = 880,
) -> dict[str, Any]:
lines = gpu_status_lines()
started = time.time()
trim_start = min(1.0, max(0.0, float(tts_trim_start)))
if not audio_path:
raise ValueError("Please upload an audio file before running VoiceGate.")
if not os.environ.get("DEEPSEEK_API_KEY"):
raise RuntimeError("DEEPSEEK_API_KEY is not configured in the Space.")
ensure_runtime_assets(lines)
ensure_comfy(lines)
prefix = f"full_{uuid.uuid4().hex[:8]}"
audio_filename = copy_audio_to_comfy_input(audio_path, prefix)
lines.append(f"input_audio={audio_filename}")
lines.append(f"target_language={target_language}")
lines.append(f"tts_trim_start={trim_start}")
prompt = full_voicegate_workflow(
audio_filename,
prefix,
target_language or "English",
tts_trim_start=trim_start,
)
_prompt_id, history, timing_lines = execute_prompt_with_timing(prompt, timeout=timeout)
lines.extend(timing_lines)
lines.extend(history_summary(history))
output_audio = first_output_audio_path(history)
source_subtitle = "\n\n".join(text_outputs_for_node(history, "61"))
translated_subtitle = "\n\n".join(text_outputs_for_node(history, "179") or text_outputs_for_node(history, "107"))
source_subtitle_file = write_srt_file(prefix, "source", source_subtitle)
translated_subtitle_file = write_srt_file(prefix, "translated", translated_subtitle)
if output_audio:
lines.append(f"output_audio_path={output_audio}")
if source_subtitle_file:
lines.append(f"source_subtitle_file={source_subtitle_file}")
if translated_subtitle_file:
lines.append(f"translated_subtitle_file={translated_subtitle_file}")
lines.append(f"elapsed_sec={time.time() - started:.1f}")
return {
"lines": lines,
"audio": output_audio,
"source_subtitle": source_subtitle,
"translated_subtitle": translated_subtitle,
"source_subtitle_file": source_subtitle_file,
"translated_subtitle_file": translated_subtitle_file,
}
def prepare_runtime() -> str:
global PREPARE_PROCESS
lines = ["VoiceGate runtime preparation"]
if PREPARE_PROCESS is not None and PREPARE_PROCESS.poll() is None:
lines.append(f"prepare=already_running pid={PREPARE_PROCESS.pid}")
return "\n".join(lines)
BOOTSTRAP_LOG.parent.mkdir(parents=True, exist_ok=True)
log = BOOTSTRAP_LOG.open("ab")
command = [sys.executable, str(ROOT / "scripts" / "bootstrap_comfy.py"), "--with-models"]
PREPARE_PROCESS = subprocess.Popen(
command,
cwd=ROOT,
stdout=log,
stderr=subprocess.STDOUT,
)
lines.append(f"prepare=started pid={PREPARE_PROCESS.pid}")
lines.append(f"log={BOOTSTRAP_LOG}")
return "\n".join(lines)
def prepare_status() -> str:
global BOOTSTRAPPED
lines = ["VoiceGate runtime preparation status"]
if PREPARE_PROCESS is None:
lines.append("prepare=not_started")
else:
returncode = PREPARE_PROCESS.poll()
if returncode is None:
lines.append(f"prepare=running pid={PREPARE_PROCESS.pid}")
else:
lines.append(f"prepare=finished returncode={returncode}")
if returncode == 0 and (COMFY_DIR / "main.py").exists():
BOOTSTRAPPED = True
lines.append(f"comfy_dir_exists={(COMFY_DIR / 'main.py').exists()}")
if BOOTSTRAP_LOG.exists():
lines.append("bootstrap_log_tail:")
lines.extend(BOOTSTRAP_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-80:])
return "\n".join(lines)
@spaces.GPU(duration=60)
def gpu_smoke_test() -> str:
lines = gpu_status_lines()
if torch.cuda.is_available():
tensor = torch.arange(16, device="cuda:0", dtype=torch.float32)
result = (tensor * 2).sum().item()
torch.cuda.synchronize()
lines.append(f"tensor_result={result}")
lines.append(f"memory_reserved_mb={torch.cuda.memory_reserved(0) / 1024**2:.2f}")
return "\n".join(lines)
@spaces.GPU(duration=900)
def comfy_runtime_test() -> str:
lines = gpu_status_lines()
started = time.time()
try:
stats = ensure_comfy(lines)
lines.append(f"comfy_ready=true")
lines.append(f"comfy_elapsed_sec={time.time() - started:.1f}")
lines.append("system_stats:")
lines.append(json.dumps(stats, ensure_ascii=False, indent=2)[:4000])
except Exception as exc:
lines.append(f"error={type(exc).__name__}: {exc}")
return "\n".join(lines)
@spaces.GPU(duration=1200)
def melband_gpu_test() -> str:
lines = gpu_status_lines()
started = time.time()
try:
ensure_comfy(lines)
audio_filename = write_sine_wav(f"voicegate_melband_{uuid.uuid4().hex[:8]}.wav")
prefix = f"melband_gpu_{uuid.uuid4().hex[:8]}"
prompt_id = submit_prompt(melband_workflow(audio_filename, prefix))
lines.append(f"prompt_id={prompt_id}")
history = wait_for_history(prompt_id)
lines.extend(history_summary(history))
lines.append(f"elapsed_sec={time.time() - started:.1f}")
except Exception as exc:
lines.append(f"error={type(exc).__name__}: {exc}")
if COMFY_LOG.exists():
lines.append("comfy_log_tail:")
lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-120:])
return "\n".join(lines)
@spaces.GPU(duration=1200)
def voxcpm_tts_gpu_test() -> str:
lines = gpu_status_lines()
started = time.time()
try:
ensure_comfy(lines)
prefix = f"voxcpm_tts_gpu_{uuid.uuid4().hex[:8]}"
prompt_id = submit_prompt(voxcpm_tts_workflow(prefix))
lines.append(f"prompt_id={prompt_id}")
history = wait_for_history(prompt_id, timeout=1200)
lines.extend(history_summary(history))
lines.append(f"elapsed_sec={time.time() - started:.1f}")
except Exception as exc:
lines.append(f"error={type(exc).__name__}: {exc}")
if COMFY_LOG.exists():
lines.append("comfy_log_tail:")
lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-160:])
return "\n".join(lines)
@spaces.GPU(duration=900)
def asr_gpu_test(audio_path: str | None) -> str:
lines = gpu_status_lines()
started = time.time()
try:
if not audio_path:
raise ValueError("Please upload an audio file before running ASR.")
ensure_comfy(lines)
prefix = f"asr_gpu_{uuid.uuid4().hex[:8]}"
audio_filename = copy_audio_to_comfy_input(audio_path, prefix)
lines.append(f"input_audio={audio_filename}")
prompt_id = submit_prompt(asr_workflow(audio_filename, prefix))
lines.append(f"prompt_id={prompt_id}")
history = wait_for_history(prompt_id, timeout=900)
lines.extend(history_summary(history))
lines.append(f"elapsed_sec={time.time() - started:.1f}")
except Exception as exc:
lines.append(f"error={type(exc).__name__}: {exc}")
if COMFY_LOG.exists():
lines.append("comfy_log_tail:")
lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-180:])
return "\n".join(lines)
@spaces.GPU(duration=900)
def full_voicegate_gpu_test(audio_path: str | None, target_language: str, tts_trim_start: float) -> str:
try:
result = run_full_voicegate(audio_path, target_language, tts_trim_start=tts_trim_start, timeout=880)
lines = result["lines"]
except Exception as exc:
lines = gpu_status_lines()
lines.append(f"error={type(exc).__name__}: {exc}")
if COMFY_LOG.exists():
lines.append("comfy_log_tail:")
lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-220:])
return "\n".join(lines)
@spaces.GPU(duration=900)
def voicegate_user_run(audio_path: str | None, target_language: str, tts_trim_start: float) -> tuple[
str | None,
str,
str | None,
str | None,
str,
str,
]:
try:
result = run_full_voicegate(
audio_path,
target_language,
tts_trim_start=tts_trim_start,
timeout=880,
)
lines = result["lines"]
output_audio = result["audio"]
if not output_audio:
lines.append("warning=No output audio file was found in ComfyUI history.")
return (
output_audio,
"\n".join(lines),
result["source_subtitle_file"],
result["translated_subtitle_file"],
result["source_subtitle"],
result["translated_subtitle"],
)
except Exception as exc:
lines = gpu_status_lines()
lines.append(f"error={type(exc).__name__}: {exc}")
if COMFY_LOG.exists():
lines.append("comfy_log_tail:")
lines.extend(COMFY_LOG.read_text(encoding="utf-8", errors="replace").splitlines()[-160:])
return None, "\n".join(lines), None, None, "", ""
with gr.Blocks(title="VoiceGate", fill_width=True) as demo:
with gr.Tab("Translate"):
gr.HTML(
"""
VoiceGate transforms speech clips into precisely time-aligned multilingual dubbing. Each sentence is
automatically matched to the original speech timestamp, so the generated voice follows the source
rhythm and stays synchronized with the subtitles and video timeline. The pipeline combines ASR,
LLM translation, multilingual TTS, SRT-based audio alignment, and ambience preservation to produce
natural translated dubbing while keeping the original pacing and background atmosphere. Runtime is
usually close to the uploaded audio duration.
VoiceGate