CharacterForgePro / src /comfyui_client.py
ghmk's picture
Deploy full Character Sheet Pro with HF auth
da23dfe
"""
ComfyUI Client for Qwen-Image-Edit-2511
========================================
Client to interact with ComfyUI API for running Qwen-Image-Edit-2511.
Model setup (download from HuggingFace):
Lightning (default, 4-step):
diffusion_models/ qwen_image_edit_2511_fp8_e4m3fn_scaled_lightning_comfyui_4steps_v1.0.safetensors
(lightx2v/Qwen-Image-Edit-2511-Lightning)
Standard (20-step, optional):
diffusion_models/ qwen_image_edit_2511_fp8mixed.safetensors
(Comfy-Org/Qwen-Image-Edit_ComfyUI)
Shared:
text_encoders/ qwen_2.5_vl_7b_fp8_scaled.safetensors (Comfy-Org/Qwen-Image_ComfyUI)
vae/ qwen_image_vae.safetensors (Comfy-Org/Qwen-Image_ComfyUI)
Required custom nodes:
- Comfyui-QwenEditUtils (lrzjason) for TextEncodeQwenImageEditPlus
"""
import logging
import time
import uuid
import json
import io
import base64
from typing import Optional, List, Tuple
from PIL import Image
import websocket
import urllib.request
import urllib.parse
from .models import GenerationRequest, GenerationResult
logger = logging.getLogger(__name__)
class ComfyUIClient:
"""
Client for ComfyUI API to run Qwen-Image-Edit-2511.
Requires ComfyUI running with:
- Qwen-Image-Edit-2511 model in models/diffusion_models/
- Qwen 2.5 VL 7B text encoder in models/text_encoders/
- Qwen Image VAE in models/vae/
- Comfyui-QwenEditUtils custom node installed
"""
# Default ComfyUI settings
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8188
# Model file names (expected in ComfyUI models/ subfolders)
# Lightning: baked model (LoRA pre-merged, ComfyUI-specific format)
UNET_MODEL_LIGHTNING = "qwen_image_edit_2511_fp8_e4m3fn_scaled_lightning_comfyui_4steps_v1.0.safetensors"
# Standard: base fp8mixed model (20-step, higher quality)
UNET_MODEL_STANDARD = "qwen_image_edit_2511_fp8mixed.safetensors"
TEXT_ENCODER = "qwen_2.5_vl_7b_fp8_scaled.safetensors"
VAE_MODEL = "qwen_image_vae.safetensors"
# Target output dimensions per aspect ratio.
# Generation happens at 1024x1024, then crop+resize to these.
ASPECT_RATIOS = {
"1:1": (1024, 1024),
"16:9": (1344, 768),
"9:16": (768, 1344),
"21:9": (1680, 720),
"3:2": (1248, 832),
"2:3": (832, 1248),
"3:4": (896, 1152),
"4:3": (1152, 896),
"4:5": (1024, 1280),
"5:4": (1280, 1024),
}
# Generate at 1024x1024 (proven safe for Qwen's VAE), then crop+resize
NATIVE_RESOLUTION = (1024, 1024)
# With Lightning LoRA: 4 steps, CFG 1.0 (fast, ~seconds per view)
# Without LoRA: 20 steps, CFG 4.0
DEFAULT_STEPS_LIGHTNING = 4
DEFAULT_STEPS_STANDARD = 20
DEFAULT_CFG_LIGHTNING = 1.0
DEFAULT_CFG_STANDARD = 4.0
def __init__(
self,
host: str = DEFAULT_HOST,
port: int = DEFAULT_PORT,
use_lightning: bool = True,
):
"""
Initialize ComfyUI client.
Args:
host: ComfyUI server host
port: ComfyUI server port
use_lightning: Use Lightning LoRA for 4-step generation (much faster)
"""
self.host = host
self.port = port
self.use_lightning = use_lightning
self.client_id = str(uuid.uuid4())
self.server_address = f"{host}:{port}"
if use_lightning:
self.num_inference_steps = self.DEFAULT_STEPS_LIGHTNING
self.cfg_scale = self.DEFAULT_CFG_LIGHTNING
else:
self.num_inference_steps = self.DEFAULT_STEPS_STANDARD
self.cfg_scale = self.DEFAULT_CFG_STANDARD
logger.info(
f"ComfyUIClient initialized for {self.server_address} "
f"(lightning={use_lightning}, steps={self.num_inference_steps})"
)
def is_healthy(self) -> bool:
"""Check if ComfyUI server is running and accessible."""
try:
url = f"http://{self.server_address}/system_stats"
with urllib.request.urlopen(url, timeout=5) as response:
return response.status == 200
except Exception:
return False
def _upload_image(self, image: Image.Image, name: str = "input.png") -> Optional[str]:
"""
Upload an image to ComfyUI, pre-resized to fit within 1024x1024.
Args:
image: PIL Image to upload
name: Filename for the uploaded image
Returns:
Filename on server, or None if failed
"""
try:
# Pre-resize to keep total pixels around 1024x1024 (matching reference workflow)
max_pixels = 1024 * 1024
w, h = image.size
if w * h > max_pixels:
scale = (max_pixels / (w * h)) ** 0.5
new_w = int(w * scale)
new_h = int(h * scale)
image = image.resize((new_w, new_h), Image.LANCZOS)
logger.debug(f"Pre-resized input from {w}x{h} to {new_w}x{new_h}")
# Convert image to bytes
img_bytes = io.BytesIO()
image.save(img_bytes, format='PNG')
img_bytes.seek(0)
# Create multipart form data
boundary = uuid.uuid4().hex
body = b''
body += f'--{boundary}\r\n'.encode()
body += f'Content-Disposition: form-data; name="image"; filename="{name}"\r\n'.encode()
body += b'Content-Type: image/png\r\n\r\n'
body += img_bytes.read()
body += f'\r\n--{boundary}--\r\n'.encode()
url = f"http://{self.server_address}/upload/image"
req = urllib.request.Request(
url,
data=body,
headers={
'Content-Type': f'multipart/form-data; boundary={boundary}'
}
)
with urllib.request.urlopen(req) as response:
result = json.loads(response.read())
return result.get('name')
except Exception as e:
logger.error(f"Failed to upload image: {e}")
return None
def _queue_prompt(self, prompt: dict) -> str:
"""
Queue a prompt for execution.
Args:
prompt: Workflow prompt dict
Returns:
Prompt ID
"""
prompt_id = str(uuid.uuid4())
p = {"prompt": prompt, "client_id": self.client_id, "prompt_id": prompt_id}
data = json.dumps(p).encode('utf-8')
url = f"http://{self.server_address}/prompt"
req = urllib.request.Request(url, data=data)
urllib.request.urlopen(req)
return prompt_id
def _get_history(self, prompt_id: str) -> dict:
"""Get execution history for a prompt."""
url = f"http://{self.server_address}/history/{prompt_id}"
with urllib.request.urlopen(url) as response:
return json.loads(response.read())
def _get_image(self, filename: str, subfolder: str, folder_type: str) -> bytes:
"""Get an image from ComfyUI."""
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
url = f"http://{self.server_address}/view?{url_values}"
with urllib.request.urlopen(url) as response:
return response.read()
def _wait_for_completion(self, prompt_id: str, timeout: float = 900.0) -> bool:
"""
Wait for prompt execution to complete using websocket.
Args:
prompt_id: The prompt ID to wait for
timeout: Maximum time to wait in seconds (default 15 min for image editing)
Returns:
True if completed successfully, False if timeout/error
"""
ws = None
try:
ws_url = f"ws://{self.server_address}/ws?clientId={self.client_id}"
ws = websocket.WebSocket()
ws.settimeout(timeout)
ws.connect(ws_url)
start_time = time.time()
while time.time() - start_time < timeout:
try:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
return True # Execution complete
elif message['type'] == 'execution_error':
logger.error(f"Execution error: {message}")
return False
except websocket.WebSocketTimeoutException:
continue
logger.error("Timeout waiting for completion")
return False
except Exception as e:
logger.error(f"WebSocket error: {e}")
return False
finally:
if ws:
try:
ws.close()
except:
pass
def _get_dimensions(self, aspect_ratio: str) -> Tuple[int, int]:
"""Get pixel dimensions for aspect ratio."""
ratio = aspect_ratio.split()[0] if " " in aspect_ratio else aspect_ratio
return self.ASPECT_RATIOS.get(ratio, (1024, 1024))
@staticmethod
def _crop_and_resize(image: Image.Image, target_w: int, target_h: int) -> Image.Image:
"""Crop to target aspect ratio, then resize. Centers the crop."""
src_w, src_h = image.size
target_ratio = target_w / target_h
src_ratio = src_w / src_h
if abs(target_ratio - src_ratio) < 0.01:
return image.resize((target_w, target_h), Image.LANCZOS)
if target_ratio < src_ratio:
crop_w = int(src_h * target_ratio)
offset = (src_w - crop_w) // 2
image = image.crop((offset, 0, offset + crop_w, src_h))
else:
crop_h = int(src_w / target_ratio)
offset = (src_h - crop_h) // 2
image = image.crop((0, offset, src_w, offset + crop_h))
return image.resize((target_w, target_h), Image.LANCZOS)
def _build_workflow(
self,
prompt: str,
width: int,
height: int,
input_images: List[str] = None,
negative_prompt: str = ""
) -> dict:
"""
Build the ComfyUI workflow for Qwen-Image-Edit-2511.
Workflow graph:
UNETLoader → KSampler
CLIPLoader → TextEncodeQwenImageEditPlus (pos/neg)
VAELoader → TextEncode + VAEDecode
LoadImage(s) → TextEncodeQwenImageEditPlus
EmptyQwenImageLayeredLatentImage → KSampler
KSampler → VAEDecode → PreviewImage
Lightning mode uses a baked model (LoRA pre-merged), no separate
LoRA or ModelSamplingAuraFlow nodes needed.
"""
workflow = {}
node_id = 1
# --- Model loading ---
# Select model based on lightning mode
unet_name = (self.UNET_MODEL_LIGHTNING if self.use_lightning
else self.UNET_MODEL_STANDARD)
# UNETLoader - weight_dtype "default" lets ComfyUI auto-detect fp8
unet_id = str(node_id)
workflow[unet_id] = {
"class_type": "UNETLoader",
"inputs": {
"unet_name": unet_name,
"weight_dtype": "default"
}
}
node_id += 1
# CLIPLoader
clip_id = str(node_id)
workflow[clip_id] = {
"class_type": "CLIPLoader",
"inputs": {
"clip_name": self.TEXT_ENCODER,
"type": "qwen_image"
}
}
node_id += 1
# VAELoader
vae_id = str(node_id)
workflow[vae_id] = {
"class_type": "VAELoader",
"inputs": {
"vae_name": self.VAE_MODEL
}
}
node_id += 1
model_out_id = unet_id
# --- Input images ---
image_loader_ids = []
if input_images:
for img_name in input_images[:3]: # Max 3 reference images
img_loader_id = str(node_id)
workflow[img_loader_id] = {
"class_type": "LoadImage",
"inputs": {
"image": img_name
}
}
image_loader_ids.append(img_loader_id)
node_id += 1
# --- Text encoding ---
# Positive: prompt + vision references + VAE
pos_encode_id = str(node_id)
pos_inputs = {
"clip": [clip_id, 0],
"prompt": prompt,
"vae": [vae_id, 0]
}
for i, loader_id in enumerate(image_loader_ids):
pos_inputs[f"image{i+1}"] = [loader_id, 0]
workflow[pos_encode_id] = {
"class_type": "TextEncodeQwenImageEditPlus",
"inputs": pos_inputs
}
node_id += 1
# Negative: text only, no images
neg_encode_id = str(node_id)
workflow[neg_encode_id] = {
"class_type": "TextEncodeQwenImageEditPlus",
"inputs": {
"clip": [clip_id, 0],
"prompt": negative_prompt or " ",
"vae": [vae_id, 0]
}
}
node_id += 1
# --- Latent + sampling ---
latent_id = str(node_id)
workflow[latent_id] = {
"class_type": "EmptySD3LatentImage",
"inputs": {
"width": width,
"height": height,
"batch_size": 1
}
}
node_id += 1
sampler_id = str(node_id)
workflow[sampler_id] = {
"class_type": "KSampler",
"inputs": {
"model": [model_out_id, 0],
"positive": [pos_encode_id, 0],
"negative": [neg_encode_id, 0],
"latent_image": [latent_id, 0],
"seed": int(time.time()) % 2**32,
"steps": self.num_inference_steps,
"cfg": self.cfg_scale,
"sampler_name": "euler",
"scheduler": "simple",
"denoise": 1.0
}
}
node_id += 1
# --- Decode + output ---
decode_id = str(node_id)
workflow[decode_id] = {
"class_type": "VAEDecode",
"inputs": {
"samples": [sampler_id, 0],
"vae": [vae_id, 0]
}
}
node_id += 1
preview_id = str(node_id)
workflow[preview_id] = {
"class_type": "PreviewImage",
"inputs": {
"images": [decode_id, 0]
}
}
return workflow
def generate(
self,
request: GenerationRequest,
num_inference_steps: Optional[int] = None,
cfg_scale: Optional[float] = None
) -> GenerationResult:
"""
Generate/edit image using Qwen-Image-Edit-2511 via ComfyUI.
Generates at native 1024x1024, then crop+resize to requested
aspect ratio for clean VAE output.
"""
if not self.is_healthy():
return GenerationResult.error_result(
"ComfyUI server is not accessible. Make sure ComfyUI is running on "
f"{self.server_address}"
)
try:
start_time = time.time()
# Target dimensions for post-processing
target_w, target_h = self._get_dimensions(request.aspect_ratio)
# Generate at native resolution (VAE-safe)
native_w, native_h = self.NATIVE_RESOLUTION
# Upload input images (max 3)
uploaded_images = []
if request.has_input_images:
for i, img in enumerate(request.input_images):
if img is not None:
name = f"input_{i}_{uuid.uuid4().hex[:8]}.png"
uploaded_name = self._upload_image(img, name)
if uploaded_name:
uploaded_images.append(uploaded_name)
else:
logger.warning(f"Failed to upload image {i}")
steps = num_inference_steps or self.num_inference_steps
cfg = cfg_scale or self.cfg_scale
# Temporarily set for workflow build
old_steps, old_cfg = self.num_inference_steps, self.cfg_scale
self.num_inference_steps, self.cfg_scale = steps, cfg
workflow = self._build_workflow(
prompt=request.prompt,
width=native_w,
height=native_h,
input_images=uploaded_images or None,
negative_prompt=request.negative_prompt or ""
)
self.num_inference_steps, self.cfg_scale = old_steps, old_cfg
logger.info(f"Generating with ComfyUI/Qwen: {request.prompt[:80]}...")
logger.info(
f"Native: {native_w}x{native_h}, target: {target_w}x{target_h}, "
f"steps: {steps}, cfg: {cfg}, images: {len(uploaded_images)}, "
f"lightning: {self.use_lightning}"
)
# Queue and wait
prompt_id = self._queue_prompt(workflow)
logger.info(f"Queued prompt: {prompt_id}")
if not self._wait_for_completion(prompt_id):
return GenerationResult.error_result("Generation failed or timed out")
# Retrieve output
history = self._get_history(prompt_id)
if prompt_id not in history:
return GenerationResult.error_result("No history found for prompt")
outputs = history[prompt_id].get('outputs', {})
for nid, node_output in outputs.items():
if 'images' in node_output:
for img_info in node_output['images']:
img_data = self._get_image(
img_info['filename'],
img_info.get('subfolder', ''),
img_info.get('type', 'temp')
)
image = Image.open(io.BytesIO(img_data))
generation_time = time.time() - start_time
logger.info(f"Generated in {generation_time:.2f}s: {image.size}")
# Crop+resize to target aspect ratio
if (target_w, target_h) != (native_w, native_h):
image = self._crop_and_resize(image, target_w, target_h)
logger.info(f"Post-processed to: {image.size}")
return GenerationResult.success_result(
image=image,
message=f"Generated with ComfyUI/Qwen in {generation_time:.2f}s",
generation_time=generation_time
)
return GenerationResult.error_result("No output images found")
except Exception as e:
logger.error(f"ComfyUI generation failed: {e}", exc_info=True)
return GenerationResult.error_result(f"ComfyUI error: {str(e)}")
def unload_model(self):
"""
Request ComfyUI to free memory.
Note: ComfyUI manages models automatically, but we can request cleanup.
"""
try:
url = f"http://{self.server_address}/free"
data = json.dumps({"unload_models": True}).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
urllib.request.urlopen(req)
logger.info("Requested ComfyUI to free memory")
except Exception as e:
logger.warning(f"Failed to request memory cleanup: {e}")
@classmethod
def get_dimensions(cls, aspect_ratio: str) -> Tuple[int, int]:
"""Get pixel dimensions for aspect ratio."""
ratio = aspect_ratio.split()[0] if " " in aspect_ratio else aspect_ratio
return cls.ASPECT_RATIOS.get(ratio, (1024, 1024))