"""Start and probe the local ComfyUI server.""" from __future__ import annotations import argparse import subprocess import sys import time from pathlib import Path import requests ROOT = Path(__file__).resolve().parents[1] COMFY_DIR = ROOT / "ComfyUI" def wait_for_server(host: str, port: int, timeout: float) -> None: url = f"http://{host}:{port}/system_stats" deadline = time.time() + timeout last_error: Exception | None = None while time.time() < deadline: try: response = requests.get(url, timeout=2) if response.ok: print(f"ComfyUI is ready: {url}") return except requests.RequestException as exc: last_error = exc time.sleep(1) raise RuntimeError(f"ComfyUI did not become ready at {url}: {last_error}") def start_comfy(host: str, port: int, cpu: bool = False) -> subprocess.Popen: if not COMFY_DIR.exists(): raise RuntimeError("ComfyUI is missing. Run scripts/bootstrap_comfy.py first.") cmd = [ sys.executable, "main.py", "--listen", host, "--port", str(port), ] if cpu: cmd.append("--cpu") print("+ " + " ".join(cmd)) return subprocess.Popen(cmd, cwd=COMFY_DIR) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8188) parser.add_argument("--timeout", type=float, default=120) parser.add_argument("--wait-only", action="store_true") parser.add_argument("--cpu", action="store_true") return parser.parse_args() def main() -> None: args = parse_args() process = None if not args.wait_only: process = start_comfy(args.host, args.port, cpu=args.cpu) wait_for_server(args.host, args.port, args.timeout) if process is not None: print(f"ComfyUI process started with pid {process.pid}") process.wait() if __name__ == "__main__": main()