""" ComfyUI Client for Qwen-Image-Edit-2511 ======================================== Client to interact with ComfyUI API for running Qwen-Image-Edit-2511. Model setup (download from HuggingFace): Lightning (default, 4-step): diffusion_models/ qwen_image_edit_2511_fp8_e4m3fn_scaled_lightning_comfyui_4steps_v1.0.safetensors (lightx2v/Qwen-Image-Edit-2511-Lightning) Standard (20-step, optional): diffusion_models/ qwen_image_edit_2511_fp8mixed.safetensors (Comfy-Org/Qwen-Image-Edit_ComfyUI) Shared: text_encoders/ qwen_2.5_vl_7b_fp8_scaled.safetensors (Comfy-Org/Qwen-Image_ComfyUI) vae/ qwen_image_vae.safetensors (Comfy-Org/Qwen-Image_ComfyUI) Required custom nodes: - Comfyui-QwenEditUtils (lrzjason) for TextEncodeQwenImageEditPlus """ import logging import time import uuid import json import io import base64 from typing import Optional, List, Tuple from PIL import Image import websocket import urllib.request import urllib.parse from .models import GenerationRequest, GenerationResult logger = logging.getLogger(__name__) class ComfyUIClient: """ Client for ComfyUI API to run Qwen-Image-Edit-2511. Requires ComfyUI running with: - Qwen-Image-Edit-2511 model in models/diffusion_models/ - Qwen 2.5 VL 7B text encoder in models/text_encoders/ - Qwen Image VAE in models/vae/ - Comfyui-QwenEditUtils custom node installed """ # Default ComfyUI settings DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 8188 # Model file names (expected in ComfyUI models/ subfolders) # Lightning: baked model (LoRA pre-merged, ComfyUI-specific format) UNET_MODEL_LIGHTNING = "qwen_image_edit_2511_fp8_e4m3fn_scaled_lightning_comfyui_4steps_v1.0.safetensors" # Standard: base fp8mixed model (20-step, higher quality) UNET_MODEL_STANDARD = "qwen_image_edit_2511_fp8mixed.safetensors" TEXT_ENCODER = "qwen_2.5_vl_7b_fp8_scaled.safetensors" VAE_MODEL = "qwen_image_vae.safetensors" # Target output dimensions per aspect ratio. # Generation happens at 1024x1024, then crop+resize to these. ASPECT_RATIOS = { "1:1": (1024, 1024), "16:9": (1344, 768), "9:16": (768, 1344), "21:9": (1680, 720), "3:2": (1248, 832), "2:3": (832, 1248), "3:4": (896, 1152), "4:3": (1152, 896), "4:5": (1024, 1280), "5:4": (1280, 1024), } # Generate at 1024x1024 (proven safe for Qwen's VAE), then crop+resize NATIVE_RESOLUTION = (1024, 1024) # With Lightning LoRA: 4 steps, CFG 1.0 (fast, ~seconds per view) # Without LoRA: 20 steps, CFG 4.0 DEFAULT_STEPS_LIGHTNING = 4 DEFAULT_STEPS_STANDARD = 20 DEFAULT_CFG_LIGHTNING = 1.0 DEFAULT_CFG_STANDARD = 4.0 def __init__( self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, use_lightning: bool = True, ): """ Initialize ComfyUI client. Args: host: ComfyUI server host port: ComfyUI server port use_lightning: Use Lightning LoRA for 4-step generation (much faster) """ self.host = host self.port = port self.use_lightning = use_lightning self.client_id = str(uuid.uuid4()) self.server_address = f"{host}:{port}" if use_lightning: self.num_inference_steps = self.DEFAULT_STEPS_LIGHTNING self.cfg_scale = self.DEFAULT_CFG_LIGHTNING else: self.num_inference_steps = self.DEFAULT_STEPS_STANDARD self.cfg_scale = self.DEFAULT_CFG_STANDARD logger.info( f"ComfyUIClient initialized for {self.server_address} " f"(lightning={use_lightning}, steps={self.num_inference_steps})" ) def is_healthy(self) -> bool: """Check if ComfyUI server is running and accessible.""" try: url = f"http://{self.server_address}/system_stats" with urllib.request.urlopen(url, timeout=5) as response: return response.status == 200 except Exception: return False def _upload_image(self, image: Image.Image, name: str = "input.png") -> Optional[str]: """ Upload an image to ComfyUI, pre-resized to fit within 1024x1024. Args: image: PIL Image to upload name: Filename for the uploaded image Returns: Filename on server, or None if failed """ try: # Pre-resize to keep total pixels around 1024x1024 (matching reference workflow) max_pixels = 1024 * 1024 w, h = image.size if w * h > max_pixels: scale = (max_pixels / (w * h)) ** 0.5 new_w = int(w * scale) new_h = int(h * scale) image = image.resize((new_w, new_h), Image.LANCZOS) logger.debug(f"Pre-resized input from {w}x{h} to {new_w}x{new_h}") # Convert image to bytes img_bytes = io.BytesIO() image.save(img_bytes, format='PNG') img_bytes.seek(0) # Create multipart form data boundary = uuid.uuid4().hex body = b'' body += f'--{boundary}\r\n'.encode() body += f'Content-Disposition: form-data; name="image"; filename="{name}"\r\n'.encode() body += b'Content-Type: image/png\r\n\r\n' body += img_bytes.read() body += f'\r\n--{boundary}--\r\n'.encode() url = f"http://{self.server_address}/upload/image" req = urllib.request.Request( url, data=body, headers={ 'Content-Type': f'multipart/form-data; boundary={boundary}' } ) with urllib.request.urlopen(req) as response: result = json.loads(response.read()) return result.get('name') except Exception as e: logger.error(f"Failed to upload image: {e}") return None def _queue_prompt(self, prompt: dict) -> str: """ Queue a prompt for execution. Args: prompt: Workflow prompt dict Returns: Prompt ID """ prompt_id = str(uuid.uuid4()) p = {"prompt": prompt, "client_id": self.client_id, "prompt_id": prompt_id} data = json.dumps(p).encode('utf-8') url = f"http://{self.server_address}/prompt" req = urllib.request.Request(url, data=data) urllib.request.urlopen(req) return prompt_id def _get_history(self, prompt_id: str) -> dict: """Get execution history for a prompt.""" url = f"http://{self.server_address}/history/{prompt_id}" with urllib.request.urlopen(url) as response: return json.loads(response.read()) def _get_image(self, filename: str, subfolder: str, folder_type: str) -> bytes: """Get an image from ComfyUI.""" data = {"filename": filename, "subfolder": subfolder, "type": folder_type} url_values = urllib.parse.urlencode(data) url = f"http://{self.server_address}/view?{url_values}" with urllib.request.urlopen(url) as response: return response.read() def _wait_for_completion(self, prompt_id: str, timeout: float = 900.0) -> bool: """ Wait for prompt execution to complete using websocket. Args: prompt_id: The prompt ID to wait for timeout: Maximum time to wait in seconds (default 15 min for image editing) Returns: True if completed successfully, False if timeout/error """ ws = None try: ws_url = f"ws://{self.server_address}/ws?clientId={self.client_id}" ws = websocket.WebSocket() ws.settimeout(timeout) ws.connect(ws_url) start_time = time.time() while time.time() - start_time < timeout: try: out = ws.recv() if isinstance(out, str): message = json.loads(out) if message['type'] == 'executing': data = message['data'] if data['node'] is None and data['prompt_id'] == prompt_id: return True # Execution complete elif message['type'] == 'execution_error': logger.error(f"Execution error: {message}") return False except websocket.WebSocketTimeoutException: continue logger.error("Timeout waiting for completion") return False except Exception as e: logger.error(f"WebSocket error: {e}") return False finally: if ws: try: ws.close() except: pass def _get_dimensions(self, aspect_ratio: str) -> Tuple[int, int]: """Get pixel dimensions for aspect ratio.""" ratio = aspect_ratio.split()[0] if " " in aspect_ratio else aspect_ratio return self.ASPECT_RATIOS.get(ratio, (1024, 1024)) @staticmethod def _crop_and_resize(image: Image.Image, target_w: int, target_h: int) -> Image.Image: """Crop to target aspect ratio, then resize. Centers the crop.""" src_w, src_h = image.size target_ratio = target_w / target_h src_ratio = src_w / src_h if abs(target_ratio - src_ratio) < 0.01: return image.resize((target_w, target_h), Image.LANCZOS) if target_ratio < src_ratio: crop_w = int(src_h * target_ratio) offset = (src_w - crop_w) // 2 image = image.crop((offset, 0, offset + crop_w, src_h)) else: crop_h = int(src_w / target_ratio) offset = (src_h - crop_h) // 2 image = image.crop((0, offset, src_w, offset + crop_h)) return image.resize((target_w, target_h), Image.LANCZOS) def _build_workflow( self, prompt: str, width: int, height: int, input_images: List[str] = None, negative_prompt: str = "" ) -> dict: """ Build the ComfyUI workflow for Qwen-Image-Edit-2511. Workflow graph: UNETLoader → KSampler CLIPLoader → TextEncodeQwenImageEditPlus (pos/neg) VAELoader → TextEncode + VAEDecode LoadImage(s) → TextEncodeQwenImageEditPlus EmptyQwenImageLayeredLatentImage → KSampler KSampler → VAEDecode → PreviewImage Lightning mode uses a baked model (LoRA pre-merged), no separate LoRA or ModelSamplingAuraFlow nodes needed. """ workflow = {} node_id = 1 # --- Model loading --- # Select model based on lightning mode unet_name = (self.UNET_MODEL_LIGHTNING if self.use_lightning else self.UNET_MODEL_STANDARD) # UNETLoader - weight_dtype "default" lets ComfyUI auto-detect fp8 unet_id = str(node_id) workflow[unet_id] = { "class_type": "UNETLoader", "inputs": { "unet_name": unet_name, "weight_dtype": "default" } } node_id += 1 # CLIPLoader clip_id = str(node_id) workflow[clip_id] = { "class_type": "CLIPLoader", "inputs": { "clip_name": self.TEXT_ENCODER, "type": "qwen_image" } } node_id += 1 # VAELoader vae_id = str(node_id) workflow[vae_id] = { "class_type": "VAELoader", "inputs": { "vae_name": self.VAE_MODEL } } node_id += 1 model_out_id = unet_id # --- Input images --- image_loader_ids = [] if input_images: for img_name in input_images[:3]: # Max 3 reference images img_loader_id = str(node_id) workflow[img_loader_id] = { "class_type": "LoadImage", "inputs": { "image": img_name } } image_loader_ids.append(img_loader_id) node_id += 1 # --- Text encoding --- # Positive: prompt + vision references + VAE pos_encode_id = str(node_id) pos_inputs = { "clip": [clip_id, 0], "prompt": prompt, "vae": [vae_id, 0] } for i, loader_id in enumerate(image_loader_ids): pos_inputs[f"image{i+1}"] = [loader_id, 0] workflow[pos_encode_id] = { "class_type": "TextEncodeQwenImageEditPlus", "inputs": pos_inputs } node_id += 1 # Negative: text only, no images neg_encode_id = str(node_id) workflow[neg_encode_id] = { "class_type": "TextEncodeQwenImageEditPlus", "inputs": { "clip": [clip_id, 0], "prompt": negative_prompt or " ", "vae": [vae_id, 0] } } node_id += 1 # --- Latent + sampling --- latent_id = str(node_id) workflow[latent_id] = { "class_type": "EmptySD3LatentImage", "inputs": { "width": width, "height": height, "batch_size": 1 } } node_id += 1 sampler_id = str(node_id) workflow[sampler_id] = { "class_type": "KSampler", "inputs": { "model": [model_out_id, 0], "positive": [pos_encode_id, 0], "negative": [neg_encode_id, 0], "latent_image": [latent_id, 0], "seed": int(time.time()) % 2**32, "steps": self.num_inference_steps, "cfg": self.cfg_scale, "sampler_name": "euler", "scheduler": "simple", "denoise": 1.0 } } node_id += 1 # --- Decode + output --- decode_id = str(node_id) workflow[decode_id] = { "class_type": "VAEDecode", "inputs": { "samples": [sampler_id, 0], "vae": [vae_id, 0] } } node_id += 1 preview_id = str(node_id) workflow[preview_id] = { "class_type": "PreviewImage", "inputs": { "images": [decode_id, 0] } } return workflow def generate( self, request: GenerationRequest, num_inference_steps: Optional[int] = None, cfg_scale: Optional[float] = None ) -> GenerationResult: """ Generate/edit image using Qwen-Image-Edit-2511 via ComfyUI. Generates at native 1024x1024, then crop+resize to requested aspect ratio for clean VAE output. """ if not self.is_healthy(): return GenerationResult.error_result( "ComfyUI server is not accessible. Make sure ComfyUI is running on " f"{self.server_address}" ) try: start_time = time.time() # Target dimensions for post-processing target_w, target_h = self._get_dimensions(request.aspect_ratio) # Generate at native resolution (VAE-safe) native_w, native_h = self.NATIVE_RESOLUTION # Upload input images (max 3) uploaded_images = [] if request.has_input_images: for i, img in enumerate(request.input_images): if img is not None: name = f"input_{i}_{uuid.uuid4().hex[:8]}.png" uploaded_name = self._upload_image(img, name) if uploaded_name: uploaded_images.append(uploaded_name) else: logger.warning(f"Failed to upload image {i}") steps = num_inference_steps or self.num_inference_steps cfg = cfg_scale or self.cfg_scale # Temporarily set for workflow build old_steps, old_cfg = self.num_inference_steps, self.cfg_scale self.num_inference_steps, self.cfg_scale = steps, cfg workflow = self._build_workflow( prompt=request.prompt, width=native_w, height=native_h, input_images=uploaded_images or None, negative_prompt=request.negative_prompt or "" ) self.num_inference_steps, self.cfg_scale = old_steps, old_cfg logger.info(f"Generating with ComfyUI/Qwen: {request.prompt[:80]}...") logger.info( f"Native: {native_w}x{native_h}, target: {target_w}x{target_h}, " f"steps: {steps}, cfg: {cfg}, images: {len(uploaded_images)}, " f"lightning: {self.use_lightning}" ) # Queue and wait prompt_id = self._queue_prompt(workflow) logger.info(f"Queued prompt: {prompt_id}") if not self._wait_for_completion(prompt_id): return GenerationResult.error_result("Generation failed or timed out") # Retrieve output history = self._get_history(prompt_id) if prompt_id not in history: return GenerationResult.error_result("No history found for prompt") outputs = history[prompt_id].get('outputs', {}) for nid, node_output in outputs.items(): if 'images' in node_output: for img_info in node_output['images']: img_data = self._get_image( img_info['filename'], img_info.get('subfolder', ''), img_info.get('type', 'temp') ) image = Image.open(io.BytesIO(img_data)) generation_time = time.time() - start_time logger.info(f"Generated in {generation_time:.2f}s: {image.size}") # Crop+resize to target aspect ratio if (target_w, target_h) != (native_w, native_h): image = self._crop_and_resize(image, target_w, target_h) logger.info(f"Post-processed to: {image.size}") return GenerationResult.success_result( image=image, message=f"Generated with ComfyUI/Qwen in {generation_time:.2f}s", generation_time=generation_time ) return GenerationResult.error_result("No output images found") except Exception as e: logger.error(f"ComfyUI generation failed: {e}", exc_info=True) return GenerationResult.error_result(f"ComfyUI error: {str(e)}") def unload_model(self): """ Request ComfyUI to free memory. Note: ComfyUI manages models automatically, but we can request cleanup. """ try: url = f"http://{self.server_address}/free" data = json.dumps({"unload_models": True}).encode('utf-8') req = urllib.request.Request(url, data=data, method='POST') urllib.request.urlopen(req) logger.info("Requested ComfyUI to free memory") except Exception as e: logger.warning(f"Failed to request memory cleanup: {e}") @classmethod def get_dimensions(cls, aspect_ratio: str) -> Tuple[int, int]: """Get pixel dimensions for aspect ratio.""" ratio = aspect_ratio.split()[0] if " " in aspect_ratio else aspect_ratio return cls.ASPECT_RATIOS.get(ratio, (1024, 1024))