| """ |
| E2E tests for Queue-specific Preview Method Override feature. |
| |
| Tests actual execution with different preview_method values. |
| Requires a running ComfyUI server with models. |
| |
| Usage: |
| COMFYUI_SERVER=http://localhost:8988 pytest test_preview_method_e2e.py -v -m preview_method |
| |
| Note: |
| These tests execute actual image generation and wait for completion. |
| Tests verify preview image transmission based on preview_method setting. |
| """ |
| import os |
| import json |
| import pytest |
| import uuid |
| import time |
| import random |
| import websocket |
| import urllib.request |
| from pathlib import Path |
|
|
|
|
| |
| SERVER_URL = os.environ.get("COMFYUI_SERVER", "http://localhost:8988") |
| SERVER_HOST = SERVER_URL.replace("http://", "").replace("https://", "") |
|
|
| |
| GRAPH_FILE = Path(__file__).parent.parent / "inference" / "graphs" / "default_graph_sdxl1_0.json" |
|
|
|
|
| def is_server_running() -> bool: |
| """Check if ComfyUI server is running.""" |
| try: |
| request = urllib.request.Request(f"{SERVER_URL}/system_stats") |
| with urllib.request.urlopen(request, timeout=2.0): |
| return True |
| except Exception: |
| return False |
|
|
|
|
| def prepare_graph_for_test(graph: dict, steps: int = 5) -> dict: |
| """Prepare graph for testing: randomize seeds and reduce steps.""" |
| adapted = json.loads(json.dumps(graph)) |
| for node_id, node in adapted.items(): |
| inputs = node.get("inputs", {}) |
| |
| if "seed" in inputs: |
| inputs["seed"] = random.randint(0, 2**32 - 1) |
| if "noise_seed" in inputs: |
| inputs["noise_seed"] = random.randint(0, 2**32 - 1) |
| |
| if "steps" in inputs: |
| inputs["steps"] = steps |
| return adapted |
|
|
|
|
| |
| randomize_seed = prepare_graph_for_test |
|
|
|
|
| class PreviewMethodClient: |
| """Client for testing preview_method with WebSocket execution tracking.""" |
|
|
| def __init__(self, server_address: str): |
| self.server_address = server_address |
| self.client_id = str(uuid.uuid4()) |
| self.ws = None |
|
|
| def connect(self): |
| """Connect to WebSocket.""" |
| self.ws = websocket.WebSocket() |
| self.ws.settimeout(120) |
| self.ws.connect(f"ws://{self.server_address}/ws?clientId={self.client_id}") |
|
|
| def close(self): |
| """Close WebSocket connection.""" |
| if self.ws: |
| self.ws.close() |
|
|
| def queue_prompt(self, prompt: dict, extra_data: dict = None) -> dict: |
| """Queue a prompt and return response with prompt_id.""" |
| data = { |
| "prompt": prompt, |
| "client_id": self.client_id, |
| "extra_data": extra_data or {} |
| } |
| req = urllib.request.Request( |
| f"http://{self.server_address}/prompt", |
| data=json.dumps(data).encode("utf-8"), |
| headers={"Content-Type": "application/json"} |
| ) |
| return json.loads(urllib.request.urlopen(req).read()) |
|
|
| def wait_for_execution(self, prompt_id: str, timeout: float = 120.0) -> dict: |
| """ |
| Wait for execution to complete via WebSocket. |
| |
| Returns: |
| dict with keys: completed, error, preview_count, execution_time |
| """ |
| result = { |
| "completed": False, |
| "error": None, |
| "preview_count": 0, |
| "execution_time": 0.0 |
| } |
|
|
| start_time = time.time() |
| self.ws.settimeout(timeout) |
|
|
| try: |
| while True: |
| out = self.ws.recv() |
| elapsed = time.time() - start_time |
|
|
| if isinstance(out, str): |
| message = json.loads(out) |
| msg_type = message.get("type") |
| data = message.get("data", {}) |
|
|
| if data.get("prompt_id") != prompt_id: |
| continue |
|
|
| if msg_type == "executing": |
| if data.get("node") is None: |
| |
| result["completed"] = True |
| result["execution_time"] = elapsed |
| break |
|
|
| elif msg_type == "execution_error": |
| result["error"] = data |
| result["execution_time"] = elapsed |
| break |
|
|
| elif msg_type == "progress": |
| |
| pass |
|
|
| elif isinstance(out, bytes): |
| |
| result["preview_count"] += 1 |
|
|
| except websocket.WebSocketTimeoutException: |
| result["error"] = "Timeout waiting for execution" |
| result["execution_time"] = time.time() - start_time |
|
|
| return result |
|
|
|
|
| def load_graph() -> dict: |
| """Load the SDXL graph fixture with randomized seed.""" |
| with open(GRAPH_FILE) as f: |
| graph = json.load(f) |
| return randomize_seed(graph) |
|
|
|
|
| |
| pytestmark = [ |
| pytest.mark.skipif( |
| not is_server_running(), |
| reason=f"ComfyUI server not running at {SERVER_URL}" |
| ), |
| pytest.mark.preview_method, |
| pytest.mark.execution, |
| ] |
|
|
|
|
| @pytest.fixture |
| def client(): |
| """Create and connect a test client.""" |
| c = PreviewMethodClient(SERVER_HOST) |
| c.connect() |
| yield c |
| c.close() |
|
|
|
|
| @pytest.fixture |
| def graph(): |
| """Load the test graph.""" |
| return load_graph() |
|
|
|
|
| class TestPreviewMethodExecution: |
| """Test actual execution with different preview methods.""" |
|
|
| def test_execution_with_latent2rgb(self, client, graph): |
| """ |
| Execute with preview_method=latent2rgb. |
| Should complete and potentially receive preview images. |
| """ |
| extra_data = {"preview_method": "latent2rgb"} |
|
|
| response = client.queue_prompt(graph, extra_data) |
| assert "prompt_id" in response |
|
|
| result = client.wait_for_execution(response["prompt_id"]) |
|
|
| |
| assert result["completed"] or result["error"] is not None |
| |
| if result["completed"]: |
| assert result["execution_time"] > 0.5, "Execution too fast - likely didn't run" |
| |
| print(f"latent2rgb: {result['preview_count']} previews in {result['execution_time']:.2f}s") |
|
|
| def test_execution_with_taesd(self, client, graph): |
| """ |
| Execute with preview_method=taesd. |
| TAESD provides higher quality previews. |
| """ |
| extra_data = {"preview_method": "taesd"} |
|
|
| response = client.queue_prompt(graph, extra_data) |
| assert "prompt_id" in response |
|
|
| result = client.wait_for_execution(response["prompt_id"]) |
|
|
| assert result["completed"] or result["error"] is not None |
| if result["completed"]: |
| assert result["execution_time"] > 0.5 |
| |
| print(f"taesd: {result['preview_count']} previews in {result['execution_time']:.2f}s") |
|
|
| def test_execution_with_none_preview(self, client, graph): |
| """ |
| Execute with preview_method=none. |
| No preview images should be generated. |
| """ |
| extra_data = {"preview_method": "none"} |
|
|
| response = client.queue_prompt(graph, extra_data) |
| assert "prompt_id" in response |
|
|
| result = client.wait_for_execution(response["prompt_id"]) |
|
|
| assert result["completed"] or result["error"] is not None |
| if result["completed"]: |
| |
| assert result["preview_count"] == 0, \ |
| f"Expected no previews with 'none', got {result['preview_count']}" |
| print(f"none: {result['preview_count']} previews in {result['execution_time']:.2f}s") |
|
|
| def test_execution_with_default(self, client, graph): |
| """ |
| Execute with preview_method=default. |
| Should use server's CLI default setting. |
| """ |
| extra_data = {"preview_method": "default"} |
|
|
| response = client.queue_prompt(graph, extra_data) |
| assert "prompt_id" in response |
|
|
| result = client.wait_for_execution(response["prompt_id"]) |
|
|
| assert result["completed"] or result["error"] is not None |
| if result["completed"]: |
| print(f"default: {result['preview_count']} previews in {result['execution_time']:.2f}s") |
|
|
| def test_execution_without_preview_method(self, client, graph): |
| """ |
| Execute without preview_method in extra_data. |
| Should use server's default preview method. |
| """ |
| extra_data = {} |
|
|
| response = client.queue_prompt(graph, extra_data) |
| assert "prompt_id" in response |
|
|
| result = client.wait_for_execution(response["prompt_id"]) |
|
|
| assert result["completed"] or result["error"] is not None |
| if result["completed"]: |
| print(f"(no override): {result['preview_count']} previews in {result['execution_time']:.2f}s") |
|
|
|
|
| class TestPreviewMethodComparison: |
| """Compare preview behavior between different methods.""" |
|
|
| def test_none_vs_latent2rgb_preview_count(self, client, graph): |
| """ |
| Compare preview counts: 'none' should have 0, others should have >0. |
| This is the key verification that preview_method actually works. |
| """ |
| results = {} |
|
|
| |
| graph_none = randomize_seed(graph) |
| extra_data_none = {"preview_method": "none"} |
| response = client.queue_prompt(graph_none, extra_data_none) |
| results["none"] = client.wait_for_execution(response["prompt_id"]) |
|
|
| |
| graph_rgb = randomize_seed(graph) |
| extra_data_rgb = {"preview_method": "latent2rgb"} |
| response = client.queue_prompt(graph_rgb, extra_data_rgb) |
| results["latent2rgb"] = client.wait_for_execution(response["prompt_id"]) |
|
|
| |
| assert results["none"]["completed"], f"'none' execution failed: {results['none']['error']}" |
| assert results["latent2rgb"]["completed"], f"'latent2rgb' execution failed: {results['latent2rgb']['error']}" |
|
|
| |
| assert results["none"]["preview_count"] == 0, \ |
| f"'none' should have 0 previews, got {results['none']['preview_count']}" |
|
|
| |
| assert results["latent2rgb"]["preview_count"] > 0, \ |
| f"'latent2rgb' should have >0 previews, got {results['latent2rgb']['preview_count']}" |
|
|
| print("\nPreview count comparison:") |
| print(f" none: {results['none']['preview_count']} previews") |
| print(f" latent2rgb: {results['latent2rgb']['preview_count']} previews") |
|
|
|
|
| class TestPreviewMethodSequential: |
| """Test sequential execution with different preview methods.""" |
|
|
| def test_sequential_different_methods(self, client, graph): |
| """ |
| Execute multiple prompts sequentially with different preview methods. |
| Each should complete independently with correct preview behavior. |
| """ |
| methods = ["latent2rgb", "none", "default"] |
| results = [] |
|
|
| for method in methods: |
| |
| graph_run = randomize_seed(graph) |
| extra_data = {"preview_method": method} |
| response = client.queue_prompt(graph_run, extra_data) |
|
|
| result = client.wait_for_execution(response["prompt_id"]) |
| results.append({ |
| "method": method, |
| "completed": result["completed"], |
| "preview_count": result["preview_count"], |
| "execution_time": result["execution_time"], |
| "error": result["error"] |
| }) |
|
|
| |
| for r in results: |
| assert r["completed"] or r["error"] is not None, \ |
| f"Method {r['method']} neither completed nor errored" |
|
|
| |
| none_result = next(r for r in results if r["method"] == "none") |
| if none_result["completed"]: |
| assert none_result["preview_count"] == 0, \ |
| f"'none' should have 0 previews, got {none_result['preview_count']}" |
|
|
| print("\nSequential execution results:") |
| for r in results: |
| status = "✓" if r["completed"] else f"✗ ({r['error']})" |
| print(f" {r['method']}: {status}, {r['preview_count']} previews, {r['execution_time']:.2f}s") |
|
|