| from __future__ import annotations |
| import torch |
|
|
|
|
| import os |
| import sys |
| import json |
| import hashlib |
| import inspect |
| import traceback |
| import math |
| import time |
| import random |
| import logging |
|
|
| from PIL import Image, ImageOps, ImageSequence |
| from PIL.PngImagePlugin import PngInfo |
|
|
| import numpy as np |
| import safetensors.torch |
|
|
| sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy")) |
|
|
| import comfy.diffusers_load |
| import comfy.samplers |
| import comfy.sample |
| import comfy.sd |
| import comfy.utils |
| import comfy.controlnet |
| from comfy.comfy_types import IO, ComfyNodeABC, InputTypeDict, FileLocator |
| from comfy_api.internal import register_versions, ComfyAPIWithVersion |
| from comfy_api.version_list import supported_versions |
| from comfy_api.latest import io, ComfyExtension |
|
|
| import comfy.clip_vision |
|
|
| import comfy.model_management |
| from comfy.cli_args import args |
|
|
| import importlib |
|
|
| import folder_paths |
| import latent_preview |
| import node_helpers |
|
|
| def before_node_execution(): |
| comfy.model_management.throw_exception_if_processing_interrupted() |
|
|
| def interrupt_processing(value=True): |
| comfy.model_management.interrupt_current_processing(value) |
|
|
| MAX_RESOLUTION=16384 |
|
|
| class CLIPTextEncode(ComfyNodeABC): |
| @classmethod |
| def INPUT_TYPES(s) -> InputTypeDict: |
| return { |
| "required": { |
| "text": (IO.STRING, {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}), |
| "clip": (IO.CLIP, {"tooltip": "The CLIP model used for encoding the text."}) |
| } |
| } |
| RETURN_TYPES = (IO.CONDITIONING,) |
| OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",) |
| FUNCTION = "encode" |
|
|
| CATEGORY = "conditioning" |
| DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images." |
|
|
| def encode(self, clip, text): |
| if clip is None: |
| raise RuntimeError("ERROR: clip input is invalid: None\n\nIf the clip is from a checkpoint loader node your checkpoint does not contain a valid clip or text encoder model.") |
| tokens = clip.tokenize(text) |
| return (clip.encode_from_tokens_scheduled(tokens), ) |
|
|
|
|
| class ConditioningCombine: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "combine" |
|
|
| CATEGORY = "conditioning" |
|
|
| def combine(self, conditioning_1, conditioning_2): |
| return (conditioning_1 + conditioning_2, ) |
|
|
| class ConditioningAverage : |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), |
| "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "addWeighted" |
|
|
| CATEGORY = "conditioning" |
|
|
| def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): |
| out = [] |
|
|
| if len(conditioning_from) > 1: |
| logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") |
|
|
| cond_from = conditioning_from[0][0] |
| pooled_output_from = conditioning_from[0][1].get("pooled_output", None) |
|
|
| for i in range(len(conditioning_to)): |
| t1 = conditioning_to[i][0] |
| pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) |
| t0 = cond_from[:,:t1.shape[1]] |
| if t0.shape[1] < t1.shape[1]: |
| t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) |
|
|
| tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) |
| t_to = conditioning_to[i][1].copy() |
| if pooled_output_from is not None and pooled_output_to is not None: |
| t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) |
| elif pooled_output_from is not None: |
| t_to["pooled_output"] = pooled_output_from |
|
|
| n = [tw, t_to] |
| out.append(n) |
| return (out, ) |
|
|
| class ConditioningConcat: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { |
| "conditioning_to": ("CONDITIONING",), |
| "conditioning_from": ("CONDITIONING",), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "concat" |
|
|
| CATEGORY = "conditioning" |
|
|
| def concat(self, conditioning_to, conditioning_from): |
| out = [] |
|
|
| if len(conditioning_from) > 1: |
| logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") |
|
|
| cond_from = conditioning_from[0][0] |
|
|
| for i in range(len(conditioning_to)): |
| t1 = conditioning_to[i][0] |
| tw = torch.cat((t1, cond_from),1) |
| n = [tw, conditioning_to[i][1].copy()] |
| out.append(n) |
|
|
| return (out, ) |
|
|
| class ConditioningSetArea: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), |
| "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), |
| "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "append" |
|
|
| CATEGORY = "conditioning" |
|
|
| def append(self, conditioning, width, height, x, y, strength): |
| c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8), |
| "strength": strength, |
| "set_area_to_bounds": False}) |
| return (c, ) |
|
|
| class ConditioningSetAreaPercentage: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), |
| "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), |
| "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), |
| "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "append" |
|
|
| CATEGORY = "conditioning" |
|
|
| def append(self, conditioning, width, height, x, y, strength): |
| c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x), |
| "strength": strength, |
| "set_area_to_bounds": False}) |
| return (c, ) |
|
|
| class ConditioningSetAreaStrength: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "append" |
|
|
| CATEGORY = "conditioning" |
|
|
| def append(self, conditioning, strength): |
| c = node_helpers.conditioning_set_values(conditioning, {"strength": strength}) |
| return (c, ) |
|
|
|
|
| class ConditioningSetMask: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "mask": ("MASK", ), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
| "set_cond_area": (["default", "mask bounds"],), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "append" |
|
|
| CATEGORY = "conditioning" |
|
|
| def append(self, conditioning, mask, set_cond_area, strength): |
| set_area_to_bounds = False |
| if set_cond_area != "default": |
| set_area_to_bounds = True |
| if len(mask.shape) < 3: |
| mask = mask.unsqueeze(0) |
|
|
| c = node_helpers.conditioning_set_values(conditioning, {"mask": mask, |
| "set_area_to_bounds": set_area_to_bounds, |
| "mask_strength": strength}) |
| return (c, ) |
|
|
| class ConditioningZeroOut: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", )}} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "zero_out" |
|
|
| CATEGORY = "advanced/conditioning" |
|
|
| def zero_out(self, conditioning): |
| c = [] |
| for t in conditioning: |
| d = t[1].copy() |
| pooled_output = d.get("pooled_output", None) |
| if pooled_output is not None: |
| d["pooled_output"] = torch.zeros_like(pooled_output) |
| conditioning_lyrics = d.get("conditioning_lyrics", None) |
| if conditioning_lyrics is not None: |
| d["conditioning_lyrics"] = torch.zeros_like(conditioning_lyrics) |
| n = [torch.zeros_like(t[0]), d] |
| c.append(n) |
| return (c, ) |
|
|
| class ConditioningSetTimestepRange: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), |
| "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "set_range" |
|
|
| CATEGORY = "advanced/conditioning" |
|
|
| def set_range(self, conditioning, start, end): |
| c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start, |
| "end_percent": end}) |
| return (c, ) |
|
|
| class VAEDecode: |
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "samples": ("LATENT", {"tooltip": "The latent to be decoded."}), |
| "vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."}) |
| } |
| } |
| RETURN_TYPES = ("IMAGE",) |
| OUTPUT_TOOLTIPS = ("The decoded image.",) |
| FUNCTION = "decode" |
|
|
| CATEGORY = "latent" |
| DESCRIPTION = "Decodes latent images back into pixel space images." |
|
|
| def decode(self, vae, samples): |
| images = vae.decode(samples["samples"]) |
| if len(images.shape) == 5: |
| images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) |
| return (images, ) |
|
|
| class VAEDecodeTiled: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), |
| "tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 32}), |
| "overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), |
| "temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to decode at a time."}), |
| "temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), |
| }} |
| RETURN_TYPES = ("IMAGE",) |
| FUNCTION = "decode" |
|
|
| CATEGORY = "_for_testing" |
|
|
| def decode(self, vae, samples, tile_size, overlap=64, temporal_size=64, temporal_overlap=8): |
| if tile_size < overlap * 4: |
| overlap = tile_size // 4 |
| if temporal_size < temporal_overlap * 2: |
| temporal_overlap = temporal_overlap // 2 |
| temporal_compression = vae.temporal_compression_decode() |
| if temporal_compression is not None: |
| temporal_size = max(2, temporal_size // temporal_compression) |
| temporal_overlap = max(1, min(temporal_size // 2, temporal_overlap // temporal_compression)) |
| else: |
| temporal_size = None |
| temporal_overlap = None |
|
|
| compression = vae.spacial_compression_decode() |
| images = vae.decode_tiled(samples["samples"], tile_x=tile_size // compression, tile_y=tile_size // compression, overlap=overlap // compression, tile_t=temporal_size, overlap_t=temporal_overlap) |
| if len(images.shape) == 5: |
| images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) |
| return (images, ) |
|
|
| class VAEEncode: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "encode" |
|
|
| CATEGORY = "latent" |
|
|
| def encode(self, vae, pixels): |
| t = vae.encode(pixels[:,:,:,:3]) |
| return ({"samples":t}, ) |
|
|
| class VAEEncodeTiled: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), |
| "tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}), |
| "overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), |
| "temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to encode at a time."}), |
| "temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "encode" |
|
|
| CATEGORY = "_for_testing" |
|
|
| def encode(self, vae, pixels, tile_size, overlap, temporal_size=64, temporal_overlap=8): |
| t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, overlap=overlap, tile_t=temporal_size, overlap_t=temporal_overlap) |
| return ({"samples": t}, ) |
|
|
| class VAEEncodeForInpaint: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "encode" |
|
|
| CATEGORY = "latent/inpaint" |
|
|
| def encode(self, vae, pixels, mask, grow_mask_by=6): |
| x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio |
| y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio |
| mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") |
|
|
| pixels = pixels.clone() |
| if pixels.shape[1] != x or pixels.shape[2] != y: |
| x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2 |
| y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2 |
| pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] |
| mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] |
|
|
| |
| if grow_mask_by == 0: |
| mask_erosion = mask |
| else: |
| kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) |
| padding = math.ceil((grow_mask_by - 1) / 2) |
|
|
| mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) |
|
|
| m = (1.0 - mask.round()).squeeze(1) |
| for i in range(3): |
| pixels[:,:,:,i] -= 0.5 |
| pixels[:,:,:,i] *= m |
| pixels[:,:,:,i] += 0.5 |
| t = vae.encode(pixels) |
|
|
| return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) |
|
|
|
|
| class InpaintModelConditioning: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"positive": ("CONDITIONING", ), |
| "negative": ("CONDITIONING", ), |
| "vae": ("VAE", ), |
| "pixels": ("IMAGE", ), |
| "mask": ("MASK", ), |
| "noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}), |
| }} |
|
|
| RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") |
| RETURN_NAMES = ("positive", "negative", "latent") |
| FUNCTION = "encode" |
|
|
| CATEGORY = "conditioning/inpaint" |
|
|
| def encode(self, positive, negative, pixels, vae, mask, noise_mask=True): |
| x = (pixels.shape[1] // 8) * 8 |
| y = (pixels.shape[2] // 8) * 8 |
| mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") |
|
|
| orig_pixels = pixels |
| pixels = orig_pixels.clone() |
| if pixels.shape[1] != x or pixels.shape[2] != y: |
| x_offset = (pixels.shape[1] % 8) // 2 |
| y_offset = (pixels.shape[2] % 8) // 2 |
| pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] |
| mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] |
|
|
| m = (1.0 - mask.round()).squeeze(1) |
| for i in range(3): |
| pixels[:,:,:,i] -= 0.5 |
| pixels[:,:,:,i] *= m |
| pixels[:,:,:,i] += 0.5 |
| concat_latent = vae.encode(pixels) |
| orig_latent = vae.encode(orig_pixels) |
|
|
| out_latent = {} |
|
|
| out_latent["samples"] = orig_latent |
| if noise_mask: |
| out_latent["noise_mask"] = mask |
|
|
| out = [] |
| for conditioning in [positive, negative]: |
| c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent, |
| "concat_mask": mask}) |
| out.append(c) |
| return (out[0], out[1], out_latent) |
|
|
|
|
| class SaveLatent: |
| def __init__(self): |
| self.output_dir = folder_paths.get_output_directory() |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT", ), |
| "filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, |
| "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, |
| } |
| RETURN_TYPES = () |
| FUNCTION = "save" |
|
|
| OUTPUT_NODE = True |
|
|
| CATEGORY = "_for_testing" |
|
|
| def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): |
| full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) |
|
|
| |
| prompt_info = "" |
| if prompt is not None: |
| prompt_info = json.dumps(prompt) |
|
|
| metadata = None |
| if not args.disable_metadata: |
| metadata = {"prompt": prompt_info} |
| if extra_pnginfo is not None: |
| for x in extra_pnginfo: |
| metadata[x] = json.dumps(extra_pnginfo[x]) |
|
|
| file = f"{filename}_{counter:05}_.latent" |
|
|
| results: list[FileLocator] = [] |
| results.append({ |
| "filename": file, |
| "subfolder": subfolder, |
| "type": "output" |
| }) |
|
|
| file = os.path.join(full_output_folder, file) |
|
|
| output = {} |
| output["latent_tensor"] = samples["samples"].contiguous() |
| output["latent_format_version_0"] = torch.tensor([]) |
|
|
| comfy.utils.save_torch_file(output, file, metadata=metadata) |
| return { "ui": { "latents": results } } |
|
|
|
|
| class LoadLatent: |
| @classmethod |
| def INPUT_TYPES(s): |
| input_dir = folder_paths.get_input_directory() |
| files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] |
| return {"required": {"latent": [sorted(files), ]}, } |
|
|
| CATEGORY = "_for_testing" |
|
|
| RETURN_TYPES = ("LATENT", ) |
| FUNCTION = "load" |
|
|
| def load(self, latent): |
| latent_path = folder_paths.get_annotated_filepath(latent) |
| latent = safetensors.torch.load_file(latent_path, device="cpu") |
| multiplier = 1.0 |
| if "latent_format_version_0" not in latent: |
| multiplier = 1.0 / 0.18215 |
| samples = {"samples": latent["latent_tensor"].float() * multiplier} |
| return (samples, ) |
|
|
| @classmethod |
| def IS_CHANGED(s, latent): |
| image_path = folder_paths.get_annotated_filepath(latent) |
| m = hashlib.sha256() |
| with open(image_path, 'rb') as f: |
| m.update(f.read()) |
| return m.digest().hex() |
|
|
| @classmethod |
| def VALIDATE_INPUTS(s, latent): |
| if not folder_paths.exists_annotated_filepath(latent): |
| return "Invalid latent file: {}".format(latent) |
| return True |
|
|
|
|
| class CheckpointLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ), |
| "ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}} |
| RETURN_TYPES = ("MODEL", "CLIP", "VAE") |
| FUNCTION = "load_checkpoint" |
|
|
| CATEGORY = "advanced/loaders" |
| DEPRECATED = True |
|
|
| def load_checkpoint(self, config_name, ckpt_name): |
| config_path = folder_paths.get_full_path("configs", config_name) |
| ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) |
| return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) |
|
|
| class CheckpointLoaderSimple: |
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "ckpt_name": (folder_paths.get_filename_list("checkpoints"), {"tooltip": "The name of the checkpoint (model) to load."}), |
| } |
| } |
| RETURN_TYPES = ("MODEL", "CLIP", "VAE") |
| OUTPUT_TOOLTIPS = ("The model used for denoising latents.", |
| "The CLIP model used for encoding text prompts.", |
| "The VAE model used for encoding and decoding images to and from latent space.") |
| FUNCTION = "load_checkpoint" |
|
|
| CATEGORY = "loaders" |
| DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents." |
|
|
| def load_checkpoint(self, ckpt_name): |
| ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) |
| out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) |
| return out[:3] |
|
|
| class DiffusersLoader: |
| @classmethod |
| def INPUT_TYPES(cls): |
| paths = [] |
| for search_path in folder_paths.get_folder_paths("diffusers"): |
| if os.path.exists(search_path): |
| for root, subdir, files in os.walk(search_path, followlinks=True): |
| if "model_index.json" in files: |
| paths.append(os.path.relpath(root, start=search_path)) |
|
|
| return {"required": {"model_path": (paths,), }} |
| RETURN_TYPES = ("MODEL", "CLIP", "VAE") |
| FUNCTION = "load_checkpoint" |
|
|
| CATEGORY = "advanced/loaders/deprecated" |
|
|
| def load_checkpoint(self, model_path, output_vae=True, output_clip=True): |
| for search_path in folder_paths.get_folder_paths("diffusers"): |
| if os.path.exists(search_path): |
| path = os.path.join(search_path, model_path) |
| if os.path.exists(path): |
| model_path = path |
| break |
|
|
| return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) |
|
|
|
|
| class unCLIPCheckpointLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), |
| }} |
| RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") |
| FUNCTION = "load_checkpoint" |
|
|
| CATEGORY = "loaders" |
|
|
| def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): |
| ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) |
| out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) |
| return out |
|
|
| class CLIPSetLastLayer: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "clip": ("CLIP", ), |
| "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), |
| }} |
| RETURN_TYPES = ("CLIP",) |
| FUNCTION = "set_last_layer" |
|
|
| CATEGORY = "conditioning" |
|
|
| def set_last_layer(self, clip, stop_at_clip_layer): |
| clip = clip.clone() |
| clip.clip_layer(stop_at_clip_layer) |
| return (clip,) |
|
|
| class LoraLoader: |
| def __init__(self): |
| self.loaded_lora = None |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}), |
| "clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}), |
| "lora_name": (folder_paths.get_filename_list("loras"), {"tooltip": "The name of the LoRA."}), |
| "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}), |
| "strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}), |
| } |
| } |
|
|
| RETURN_TYPES = ("MODEL", "CLIP") |
| OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.") |
| FUNCTION = "load_lora" |
|
|
| CATEGORY = "loaders" |
| DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together." |
|
|
| def load_lora(self, model, clip, lora_name, strength_model, strength_clip): |
| if strength_model == 0 and strength_clip == 0: |
| return (model, clip) |
|
|
| lora_path = folder_paths.get_full_path_or_raise("loras", lora_name) |
| lora = None |
| if self.loaded_lora is not None: |
| if self.loaded_lora[0] == lora_path: |
| lora = self.loaded_lora[1] |
| else: |
| self.loaded_lora = None |
|
|
| if lora is None: |
| lora = comfy.utils.load_torch_file(lora_path, safe_load=True) |
| self.loaded_lora = (lora_path, lora) |
|
|
| model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) |
| return (model_lora, clip_lora) |
|
|
| class LoraLoaderModelOnly(LoraLoader): |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "model": ("MODEL",), |
| "lora_name": (folder_paths.get_filename_list("loras"), ), |
| "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), |
| }} |
| RETURN_TYPES = ("MODEL",) |
| FUNCTION = "load_lora_model_only" |
|
|
| def load_lora_model_only(self, model, lora_name, strength_model): |
| return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) |
|
|
| class VAELoader: |
| @staticmethod |
| def vae_list(): |
| vaes = folder_paths.get_filename_list("vae") |
| approx_vaes = folder_paths.get_filename_list("vae_approx") |
| sdxl_taesd_enc = False |
| sdxl_taesd_dec = False |
| sd1_taesd_enc = False |
| sd1_taesd_dec = False |
| sd3_taesd_enc = False |
| sd3_taesd_dec = False |
| f1_taesd_enc = False |
| f1_taesd_dec = False |
|
|
| for v in approx_vaes: |
| if v.startswith("taesd_decoder."): |
| sd1_taesd_dec = True |
| elif v.startswith("taesd_encoder."): |
| sd1_taesd_enc = True |
| elif v.startswith("taesdxl_decoder."): |
| sdxl_taesd_dec = True |
| elif v.startswith("taesdxl_encoder."): |
| sdxl_taesd_enc = True |
| elif v.startswith("taesd3_decoder."): |
| sd3_taesd_dec = True |
| elif v.startswith("taesd3_encoder."): |
| sd3_taesd_enc = True |
| elif v.startswith("taef1_encoder."): |
| f1_taesd_dec = True |
| elif v.startswith("taef1_decoder."): |
| f1_taesd_enc = True |
| if sd1_taesd_dec and sd1_taesd_enc: |
| vaes.append("taesd") |
| if sdxl_taesd_dec and sdxl_taesd_enc: |
| vaes.append("taesdxl") |
| if sd3_taesd_dec and sd3_taesd_enc: |
| vaes.append("taesd3") |
| if f1_taesd_dec and f1_taesd_enc: |
| vaes.append("taef1") |
| vaes.append("pixel_space") |
| return vaes |
|
|
| @staticmethod |
| def load_taesd(name): |
| sd = {} |
| approx_vaes = folder_paths.get_filename_list("vae_approx") |
|
|
| encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) |
| decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) |
|
|
| enc = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder)) |
| for k in enc: |
| sd["taesd_encoder.{}".format(k)] = enc[k] |
|
|
| dec = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder)) |
| for k in dec: |
| sd["taesd_decoder.{}".format(k)] = dec[k] |
|
|
| if name == "taesd": |
| sd["vae_scale"] = torch.tensor(0.18215) |
| sd["vae_shift"] = torch.tensor(0.0) |
| elif name == "taesdxl": |
| sd["vae_scale"] = torch.tensor(0.13025) |
| sd["vae_shift"] = torch.tensor(0.0) |
| elif name == "taesd3": |
| sd["vae_scale"] = torch.tensor(1.5305) |
| sd["vae_shift"] = torch.tensor(0.0609) |
| elif name == "taef1": |
| sd["vae_scale"] = torch.tensor(0.3611) |
| sd["vae_shift"] = torch.tensor(0.1159) |
| return sd |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "vae_name": (s.vae_list(), )}} |
| RETURN_TYPES = ("VAE",) |
| FUNCTION = "load_vae" |
|
|
| CATEGORY = "loaders" |
|
|
| |
| def load_vae(self, vae_name): |
| if vae_name == "pixel_space": |
| sd = {} |
| sd["pixel_space_vae"] = torch.tensor(1.0) |
| elif vae_name in ["taesd", "taesdxl", "taesd3", "taef1"]: |
| sd = self.load_taesd(vae_name) |
| else: |
| vae_path = folder_paths.get_full_path_or_raise("vae", vae_name) |
| sd = comfy.utils.load_torch_file(vae_path) |
| vae = comfy.sd.VAE(sd=sd) |
| vae.throw_exception_if_invalid() |
| return (vae,) |
|
|
| class ControlNetLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} |
|
|
| RETURN_TYPES = ("CONTROL_NET",) |
| FUNCTION = "load_controlnet" |
|
|
| CATEGORY = "loaders" |
|
|
| def load_controlnet(self, control_net_name): |
| controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) |
| controlnet = comfy.controlnet.load_controlnet(controlnet_path) |
| if controlnet is None: |
| raise RuntimeError("ERROR: controlnet file is invalid and does not contain a valid controlnet model.") |
| return (controlnet,) |
|
|
| class DiffControlNetLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "model": ("MODEL",), |
| "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} |
|
|
| RETURN_TYPES = ("CONTROL_NET",) |
| FUNCTION = "load_controlnet" |
|
|
| CATEGORY = "loaders" |
|
|
| def load_controlnet(self, model, control_net_name): |
| controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) |
| controlnet = comfy.controlnet.load_controlnet(controlnet_path, model) |
| return (controlnet,) |
|
|
|
|
| class ControlNetApply: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "control_net": ("CONTROL_NET", ), |
| "image": ("IMAGE", ), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "apply_controlnet" |
|
|
| DEPRECATED = True |
| CATEGORY = "conditioning/controlnet" |
|
|
| def apply_controlnet(self, conditioning, control_net, image, strength): |
| if strength == 0: |
| return (conditioning, ) |
|
|
| c = [] |
| control_hint = image.movedim(-1,1) |
| for t in conditioning: |
| n = [t[0], t[1].copy()] |
| c_net = control_net.copy().set_cond_hint(control_hint, strength) |
| if 'control' in t[1]: |
| c_net.set_previous_controlnet(t[1]['control']) |
| n[1]['control'] = c_net |
| n[1]['control_apply_to_uncond'] = True |
| c.append(n) |
| return (c, ) |
|
|
|
|
| class ControlNetApplyAdvanced: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"positive": ("CONDITIONING", ), |
| "negative": ("CONDITIONING", ), |
| "control_net": ("CONTROL_NET", ), |
| "image": ("IMAGE", ), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
| "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), |
| "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) |
| }, |
| "optional": {"vae": ("VAE", ), |
| } |
| } |
|
|
| RETURN_TYPES = ("CONDITIONING","CONDITIONING") |
| RETURN_NAMES = ("positive", "negative") |
| FUNCTION = "apply_controlnet" |
|
|
| CATEGORY = "conditioning/controlnet" |
|
|
| def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]): |
| if strength == 0: |
| return (positive, negative) |
|
|
| control_hint = image.movedim(-1,1) |
| cnets = {} |
|
|
| out = [] |
| for conditioning in [positive, negative]: |
| c = [] |
| for t in conditioning: |
| d = t[1].copy() |
|
|
| prev_cnet = d.get('control', None) |
| if prev_cnet in cnets: |
| c_net = cnets[prev_cnet] |
| else: |
| c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat) |
| c_net.set_previous_controlnet(prev_cnet) |
| cnets[prev_cnet] = c_net |
|
|
| d['control'] = c_net |
| d['control_apply_to_uncond'] = False |
| n = [t[0], d] |
| c.append(n) |
| out.append(c) |
| return (out[0], out[1]) |
|
|
|
|
| class UNETLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "unet_name": (folder_paths.get_filename_list("diffusion_models"), ), |
| "weight_dtype": (["default", "fp8_e4m3fn", "fp8_e4m3fn_fast", "fp8_e5m2"],) |
| }} |
| RETURN_TYPES = ("MODEL",) |
| FUNCTION = "load_unet" |
|
|
| CATEGORY = "advanced/loaders" |
|
|
| def load_unet(self, unet_name, weight_dtype): |
| model_options = {} |
| if weight_dtype == "fp8_e4m3fn": |
| model_options["dtype"] = torch.float8_e4m3fn |
| elif weight_dtype == "fp8_e4m3fn_fast": |
| model_options["dtype"] = torch.float8_e4m3fn |
| model_options["fp8_optimizations"] = True |
| elif weight_dtype == "fp8_e5m2": |
| model_options["dtype"] = torch.float8_e5m2 |
|
|
| unet_path = folder_paths.get_full_path_or_raise("diffusion_models", unet_name) |
| model = comfy.sd.load_diffusion_model(unet_path, model_options=model_options) |
| return (model,) |
|
|
| class CLIPLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "clip_name": (folder_paths.get_filename_list("text_encoders"), ), |
| "type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart", "cosmos", "lumina2", "wan", "hidream", "chroma", "ace", "omnigen2", "qwen_image", "hunyuan_image"], ), |
| }, |
| "optional": { |
| "device": (["default", "cpu"], {"advanced": True}), |
| }} |
| RETURN_TYPES = ("CLIP",) |
| FUNCTION = "load_clip" |
|
|
| CATEGORY = "advanced/loaders" |
|
|
| DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 xxl/ clip-g / clip-l\nstable_audio: t5 base\nmochi: t5 xxl\ncosmos: old t5 xxl\nlumina2: gemma 2 2B\nwan: umt5 xxl\n hidream: llama-3.1 (Recommend) or t5\nomnigen2: qwen vl 2.5 3B" |
|
|
| def load_clip(self, clip_name, type="stable_diffusion", device="default"): |
| clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION) |
|
|
| model_options = {} |
| if device == "cpu": |
| model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") |
|
|
| clip_path = folder_paths.get_full_path_or_raise("text_encoders", clip_name) |
| clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) |
| return (clip,) |
|
|
| class DualCLIPLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "clip_name1": (folder_paths.get_filename_list("text_encoders"), ), |
| "clip_name2": (folder_paths.get_filename_list("text_encoders"), ), |
| "type": (["sdxl", "sd3", "flux", "hunyuan_video", "hidream", "hunyuan_image"], ), |
| }, |
| "optional": { |
| "device": (["default", "cpu"], {"advanced": True}), |
| }} |
| RETURN_TYPES = ("CLIP",) |
| FUNCTION = "load_clip" |
|
|
| CATEGORY = "advanced/loaders" |
|
|
| DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5\nhidream: at least one of t5 or llama, recommended t5 and llama\nhunyuan_image: qwen2.5vl 7b and byt5 small" |
|
|
| def load_clip(self, clip_name1, clip_name2, type, device="default"): |
| clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION) |
|
|
| clip_path1 = folder_paths.get_full_path_or_raise("text_encoders", clip_name1) |
| clip_path2 = folder_paths.get_full_path_or_raise("text_encoders", clip_name2) |
|
|
| model_options = {} |
| if device == "cpu": |
| model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") |
|
|
| clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) |
| return (clip,) |
|
|
| class CLIPVisionLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ), |
| }} |
| RETURN_TYPES = ("CLIP_VISION",) |
| FUNCTION = "load_clip" |
|
|
| CATEGORY = "loaders" |
|
|
| def load_clip(self, clip_name): |
| clip_path = folder_paths.get_full_path_or_raise("clip_vision", clip_name) |
| clip_vision = comfy.clip_vision.load(clip_path) |
| if clip_vision is None: |
| raise RuntimeError("ERROR: clip vision file is invalid and does not contain a valid vision model.") |
| return (clip_vision,) |
|
|
| class CLIPVisionEncode: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "clip_vision": ("CLIP_VISION",), |
| "image": ("IMAGE",), |
| "crop": (["center", "none"],) |
| }} |
| RETURN_TYPES = ("CLIP_VISION_OUTPUT",) |
| FUNCTION = "encode" |
|
|
| CATEGORY = "conditioning" |
|
|
| def encode(self, clip_vision, image, crop): |
| crop_image = True |
| if crop != "center": |
| crop_image = False |
| output = clip_vision.encode_image(image, crop=crop_image) |
| return (output,) |
|
|
| class StyleModelLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}} |
|
|
| RETURN_TYPES = ("STYLE_MODEL",) |
| FUNCTION = "load_style_model" |
|
|
| CATEGORY = "loaders" |
|
|
| def load_style_model(self, style_model_name): |
| style_model_path = folder_paths.get_full_path_or_raise("style_models", style_model_name) |
| style_model = comfy.sd.load_style_model(style_model_path) |
| return (style_model,) |
|
|
|
|
| class StyleModelApply: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "style_model": ("STYLE_MODEL", ), |
| "clip_vision_output": ("CLIP_VISION_OUTPUT", ), |
| "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.001}), |
| "strength_type": (["multiply", "attn_bias"], ), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "apply_stylemodel" |
|
|
| CATEGORY = "conditioning/style_model" |
|
|
| def apply_stylemodel(self, conditioning, style_model, clip_vision_output, strength, strength_type): |
| cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) |
| if strength_type == "multiply": |
| cond *= strength |
|
|
| n = cond.shape[1] |
| c_out = [] |
| for t in conditioning: |
| (txt, keys) = t |
| keys = keys.copy() |
| |
| if "attention_mask" in keys or (strength_type == "attn_bias" and strength != 1.0): |
| |
| |
| attn_bias = torch.log(torch.Tensor([strength if strength_type == "attn_bias" else 1.0])) |
| |
| mask_ref_size = keys.get("attention_mask_img_shape", (1, 1)) |
| n_ref = mask_ref_size[0] * mask_ref_size[1] |
| n_txt = txt.shape[1] |
| |
| mask = keys.get("attention_mask", None) |
| |
| if mask is None: |
| mask = torch.zeros((txt.shape[0], n_txt + n_ref, n_txt + n_ref), dtype=torch.float16) |
| |
| |
| if mask.dtype == torch.bool: |
| |
| |
| mask = torch.log(mask.to(dtype=torch.float16)) |
| |
| new_mask = torch.zeros((txt.shape[0], n_txt + n + n_ref, n_txt + n + n_ref), dtype=torch.float16) |
| |
| new_mask[:, :n_txt, :n_txt] = mask[:, :n_txt, :n_txt] |
| new_mask[:, :n_txt, n_txt+n:] = mask[:, :n_txt, n_txt:] |
| new_mask[:, n_txt+n:, :n_txt] = mask[:, n_txt:, :n_txt] |
| new_mask[:, n_txt+n:, n_txt+n:] = mask[:, n_txt:, n_txt:] |
| |
| new_mask[:, :n_txt, n_txt:n_txt+n] = attn_bias |
| new_mask[:, n_txt+n:, n_txt:n_txt+n] = attn_bias |
| keys["attention_mask"] = new_mask.to(txt.device) |
| keys["attention_mask_img_shape"] = mask_ref_size |
|
|
| c_out.append([torch.cat((txt, cond), dim=1), keys]) |
|
|
| return (c_out,) |
|
|
| class unCLIPConditioning: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning": ("CONDITIONING", ), |
| "clip_vision_output": ("CLIP_VISION_OUTPUT", ), |
| "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), |
| "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "apply_adm" |
|
|
| CATEGORY = "conditioning" |
|
|
| def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): |
| if strength == 0: |
| return (conditioning, ) |
|
|
| c = node_helpers.conditioning_set_values(conditioning, {"unclip_conditioning": [{"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}]}, append=True) |
| return (c, ) |
|
|
| class GLIGENLoader: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}} |
|
|
| RETURN_TYPES = ("GLIGEN",) |
| FUNCTION = "load_gligen" |
|
|
| CATEGORY = "loaders" |
|
|
| def load_gligen(self, gligen_name): |
| gligen_path = folder_paths.get_full_path_or_raise("gligen", gligen_name) |
| gligen = comfy.sd.load_gligen(gligen_path) |
| return (gligen,) |
|
|
| class GLIGENTextBoxApply: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": {"conditioning_to": ("CONDITIONING", ), |
| "clip": ("CLIP", ), |
| "gligen_textbox_model": ("GLIGEN", ), |
| "text": ("STRING", {"multiline": True, "dynamicPrompts": True}), |
| "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), |
| "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), |
| "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| }} |
| RETURN_TYPES = ("CONDITIONING",) |
| FUNCTION = "append" |
|
|
| CATEGORY = "conditioning/gligen" |
|
|
| def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): |
| c = [] |
| cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") |
| for t in conditioning_to: |
| n = [t[0], t[1].copy()] |
| position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] |
| prev = [] |
| if "gligen" in n[1]: |
| prev = n[1]['gligen'][2] |
|
|
| n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) |
| c.append(n) |
| return (c, ) |
|
|
| class EmptyLatentImage: |
| def __init__(self): |
| self.device = comfy.model_management.intermediate_device() |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}), |
| "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}), |
| "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."}) |
| } |
| } |
| RETURN_TYPES = ("LATENT",) |
| OUTPUT_TOOLTIPS = ("The empty latent image batch.",) |
| FUNCTION = "generate" |
|
|
| CATEGORY = "latent" |
| DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling." |
|
|
| def generate(self, width, height, batch_size=1): |
| latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) |
| return ({"samples":latent}, ) |
|
|
|
|
| class LatentFromBatch: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), |
| "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), |
| "length": ("INT", {"default": 1, "min": 1, "max": 64}), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "frombatch" |
|
|
| CATEGORY = "latent/batch" |
|
|
| def frombatch(self, samples, batch_index, length): |
| s = samples.copy() |
| s_in = samples["samples"] |
| batch_index = min(s_in.shape[0] - 1, batch_index) |
| length = min(s_in.shape[0] - batch_index, length) |
| s["samples"] = s_in[batch_index:batch_index + length].clone() |
| if "noise_mask" in samples: |
| masks = samples["noise_mask"] |
| if masks.shape[0] == 1: |
| s["noise_mask"] = masks.clone() |
| else: |
| if masks.shape[0] < s_in.shape[0]: |
| masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] |
| s["noise_mask"] = masks[batch_index:batch_index + length].clone() |
| if "batch_index" not in s: |
| s["batch_index"] = [x for x in range(batch_index, batch_index+length)] |
| else: |
| s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] |
| return (s,) |
|
|
| class RepeatLatentBatch: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), |
| "amount": ("INT", {"default": 1, "min": 1, "max": 64}), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "repeat" |
|
|
| CATEGORY = "latent/batch" |
|
|
| def repeat(self, samples, amount): |
| s = samples.copy() |
| s_in = samples["samples"] |
|
|
| s["samples"] = s_in.repeat((amount,) + ((1,) * (s_in.ndim - 1))) |
| if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: |
| masks = samples["noise_mask"] |
| if masks.shape[0] < s_in.shape[0]: |
| masks = masks.repeat((math.ceil(s_in.shape[0] / masks.shape[0]),) + ((1,) * (masks.ndim - 1)))[:s_in.shape[0]] |
| s["noise_mask"] = samples["noise_mask"].repeat((amount,) + ((1,) * (samples["noise_mask"].ndim - 1))) |
| if "batch_index" in s: |
| offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 |
| s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] |
| return (s,) |
|
|
| class LatentUpscale: |
| upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] |
| crop_methods = ["disabled", "center"] |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), |
| "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "crop": (s.crop_methods,)}} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "upscale" |
|
|
| CATEGORY = "latent" |
|
|
| def upscale(self, samples, upscale_method, width, height, crop): |
| if width == 0 and height == 0: |
| s = samples |
| else: |
| s = samples.copy() |
|
|
| if width == 0: |
| height = max(64, height) |
| width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2])) |
| elif height == 0: |
| width = max(64, width) |
| height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1])) |
| else: |
| width = max(64, width) |
| height = max(64, height) |
|
|
| s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) |
| return (s,) |
|
|
| class LatentUpscaleBy: |
| upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), |
| "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "upscale" |
|
|
| CATEGORY = "latent" |
|
|
| def upscale(self, samples, upscale_method, scale_by): |
| s = samples.copy() |
| width = round(samples["samples"].shape[-1] * scale_by) |
| height = round(samples["samples"].shape[-2] * scale_by) |
| s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") |
| return (s,) |
|
|
| class LatentRotate: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), |
| "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "rotate" |
|
|
| CATEGORY = "latent/transform" |
|
|
| def rotate(self, samples, rotation): |
| s = samples.copy() |
| rotate_by = 0 |
| if rotation.startswith("90"): |
| rotate_by = 1 |
| elif rotation.startswith("180"): |
| rotate_by = 2 |
| elif rotation.startswith("270"): |
| rotate_by = 3 |
|
|
| s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) |
| return (s,) |
|
|
| class LatentFlip: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), |
| "flip_method": (["x-axis: vertically", "y-axis: horizontally"],), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "flip" |
|
|
| CATEGORY = "latent/transform" |
|
|
| def flip(self, samples, flip_method): |
| s = samples.copy() |
| if flip_method.startswith("x"): |
| s["samples"] = torch.flip(samples["samples"], dims=[2]) |
| elif flip_method.startswith("y"): |
| s["samples"] = torch.flip(samples["samples"], dims=[3]) |
|
|
| return (s,) |
|
|
| class LatentComposite: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples_to": ("LATENT",), |
| "samples_from": ("LATENT",), |
| "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "composite" |
|
|
| CATEGORY = "latent" |
|
|
| def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): |
| x = x // 8 |
| y = y // 8 |
| feather = feather // 8 |
| samples_out = samples_to.copy() |
| s = samples_to["samples"].clone() |
| samples_to = samples_to["samples"] |
| samples_from = samples_from["samples"] |
| if feather == 0: |
| s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] |
| else: |
| samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] |
| mask = torch.ones_like(samples_from) |
| for t in range(feather): |
| if y != 0: |
| mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) |
|
|
| if y + samples_from.shape[2] < samples_to.shape[2]: |
| mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) |
| if x != 0: |
| mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) |
| if x + samples_from.shape[3] < samples_to.shape[3]: |
| mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) |
| rev_mask = torch.ones_like(mask) - mask |
| s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask |
| samples_out["samples"] = s |
| return (samples_out,) |
|
|
| class LatentBlend: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { |
| "samples1": ("LATENT",), |
| "samples2": ("LATENT",), |
| "blend_factor": ("FLOAT", { |
| "default": 0.5, |
| "min": 0, |
| "max": 1, |
| "step": 0.01 |
| }), |
| }} |
|
|
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "blend" |
|
|
| CATEGORY = "_for_testing" |
|
|
| def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): |
|
|
| samples_out = samples1.copy() |
| samples1 = samples1["samples"] |
| samples2 = samples2["samples"] |
|
|
| if samples1.shape != samples2.shape: |
| samples2.permute(0, 3, 1, 2) |
| samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') |
| samples2.permute(0, 2, 3, 1) |
|
|
| samples_blended = self.blend_mode(samples1, samples2, blend_mode) |
| samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) |
| samples_out["samples"] = samples_blended |
| return (samples_out,) |
|
|
| def blend_mode(self, img1, img2, mode): |
| if mode == "normal": |
| return img2 |
| else: |
| raise ValueError(f"Unsupported blend mode: {mode}") |
|
|
| class LatentCrop: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), |
| "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), |
| "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), |
| "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "crop" |
|
|
| CATEGORY = "latent/transform" |
|
|
| def crop(self, samples, width, height, x, y): |
| s = samples.copy() |
| samples = samples['samples'] |
| x = x // 8 |
| y = y // 8 |
|
|
| |
| if x > (samples.shape[3] - 8): |
| x = samples.shape[3] - 8 |
| if y > (samples.shape[2] - 8): |
| y = samples.shape[2] - 8 |
|
|
| new_height = height // 8 |
| new_width = width // 8 |
| to_x = new_width + x |
| to_y = new_height + y |
| s['samples'] = samples[:,:,y:to_y, x:to_x] |
| return (s,) |
|
|
| class SetLatentNoiseMask: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "samples": ("LATENT",), |
| "mask": ("MASK",), |
| }} |
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "set_mask" |
|
|
| CATEGORY = "latent/inpaint" |
|
|
| def set_mask(self, samples, mask): |
| s = samples.copy() |
| s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) |
| return (s,) |
|
|
| def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): |
| latent_image = latent["samples"] |
| latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) |
|
|
| if disable_noise: |
| noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") |
| else: |
| batch_inds = latent["batch_index"] if "batch_index" in latent else None |
| noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) |
|
|
| noise_mask = None |
| if "noise_mask" in latent: |
| noise_mask = latent["noise_mask"] |
|
|
| callback = latent_preview.prepare_callback(model, steps) |
| disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED |
| samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, |
| denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, |
| force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) |
| out = latent.copy() |
| out["samples"] = samples |
| return (out, ) |
|
|
| class KSampler: |
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}), |
| "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True, "tooltip": "The random seed used for creating the noise."}), |
| "steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}), |
| "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}), |
| "sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}), |
| "scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}), |
| "positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}), |
| "negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}), |
| "latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}), |
| "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}), |
| } |
| } |
|
|
| RETURN_TYPES = ("LATENT",) |
| OUTPUT_TOOLTIPS = ("The denoised latent.",) |
| FUNCTION = "sample" |
|
|
| CATEGORY = "sampling" |
| DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image." |
|
|
| def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): |
| return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) |
|
|
| class KSamplerAdvanced: |
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": |
| {"model": ("MODEL",), |
| "add_noise": (["enable", "disable"], ), |
| "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True}), |
| "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), |
| "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), |
| "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), |
| "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), |
| "positive": ("CONDITIONING", ), |
| "negative": ("CONDITIONING", ), |
| "latent_image": ("LATENT", ), |
| "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), |
| "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), |
| "return_with_leftover_noise": (["disable", "enable"], ), |
| } |
| } |
|
|
| RETURN_TYPES = ("LATENT",) |
| FUNCTION = "sample" |
|
|
| CATEGORY = "sampling" |
|
|
| def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): |
| force_full_denoise = True |
| if return_with_leftover_noise == "enable": |
| force_full_denoise = False |
| disable_noise = False |
| if add_noise == "disable": |
| disable_noise = True |
| return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) |
|
|
| class SaveImage: |
| def __init__(self): |
| self.output_dir = folder_paths.get_output_directory() |
| self.type = "output" |
| self.prefix_append = "" |
| self.compress_level = 4 |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "images": ("IMAGE", {"tooltip": "The images to save."}), |
| "filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."}) |
| }, |
| "hidden": { |
| "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO" |
| }, |
| } |
|
|
| RETURN_TYPES = () |
| FUNCTION = "save_images" |
|
|
| OUTPUT_NODE = True |
|
|
| CATEGORY = "image" |
| DESCRIPTION = "Saves the input images to your ComfyUI output directory." |
|
|
| def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): |
| filename_prefix += self.prefix_append |
| full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) |
| results = list() |
| for (batch_number, image) in enumerate(images): |
| i = 255. * image.cpu().numpy() |
| img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) |
| metadata = None |
| if not args.disable_metadata: |
| metadata = PngInfo() |
| if prompt is not None: |
| metadata.add_text("prompt", json.dumps(prompt)) |
| if extra_pnginfo is not None: |
| for x in extra_pnginfo: |
| metadata.add_text(x, json.dumps(extra_pnginfo[x])) |
|
|
| filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) |
| file = f"{filename_with_batch_num}_{counter:05}_.png" |
| img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level) |
| results.append({ |
| "filename": file, |
| "subfolder": subfolder, |
| "type": self.type |
| }) |
| counter += 1 |
|
|
| return { "ui": { "images": results } } |
|
|
| class PreviewImage(SaveImage): |
| def __init__(self): |
| self.output_dir = folder_paths.get_temp_directory() |
| self.type = "temp" |
| self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) |
| self.compress_level = 1 |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": |
| {"images": ("IMAGE", ), }, |
| "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, |
| } |
|
|
| class LoadImage: |
| @classmethod |
| def INPUT_TYPES(s): |
| input_dir = folder_paths.get_input_directory() |
| files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] |
| files = folder_paths.filter_files_content_types(files, ["image"]) |
| return {"required": |
| {"image": (sorted(files), {"image_upload": True})}, |
| } |
|
|
| CATEGORY = "image" |
|
|
| RETURN_TYPES = ("IMAGE", "MASK") |
| FUNCTION = "load_image" |
| def load_image(self, image): |
| image_path = folder_paths.get_annotated_filepath(image) |
|
|
| img = node_helpers.pillow(Image.open, image_path) |
|
|
| output_images = [] |
| output_masks = [] |
| w, h = None, None |
|
|
| excluded_formats = ['MPO'] |
|
|
| for i in ImageSequence.Iterator(img): |
| i = node_helpers.pillow(ImageOps.exif_transpose, i) |
|
|
| if i.mode == 'I': |
| i = i.point(lambda i: i * (1 / 255)) |
| image = i.convert("RGB") |
|
|
| if len(output_images) == 0: |
| w = image.size[0] |
| h = image.size[1] |
|
|
| if image.size[0] != w or image.size[1] != h: |
| continue |
|
|
| image = np.array(image).astype(np.float32) / 255.0 |
| image = torch.from_numpy(image)[None,] |
| if 'A' in i.getbands(): |
| mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 |
| mask = 1. - torch.from_numpy(mask) |
| elif i.mode == 'P' and 'transparency' in i.info: |
| mask = np.array(i.convert('RGBA').getchannel('A')).astype(np.float32) / 255.0 |
| mask = 1. - torch.from_numpy(mask) |
| else: |
| mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") |
| output_images.append(image) |
| output_masks.append(mask.unsqueeze(0)) |
|
|
| if len(output_images) > 1 and img.format not in excluded_formats: |
| output_image = torch.cat(output_images, dim=0) |
| output_mask = torch.cat(output_masks, dim=0) |
| else: |
| output_image = output_images[0] |
| output_mask = output_masks[0] |
|
|
| return (output_image, output_mask) |
|
|
| @classmethod |
| def IS_CHANGED(s, image): |
| image_path = folder_paths.get_annotated_filepath(image) |
| m = hashlib.sha256() |
| with open(image_path, 'rb') as f: |
| m.update(f.read()) |
| return m.digest().hex() |
|
|
| @classmethod |
| def VALIDATE_INPUTS(s, image): |
| if not folder_paths.exists_annotated_filepath(image): |
| return "Invalid image file: {}".format(image) |
|
|
| return True |
|
|
| class LoadImageMask: |
| _color_channels = ["alpha", "red", "green", "blue"] |
| @classmethod |
| def INPUT_TYPES(s): |
| input_dir = folder_paths.get_input_directory() |
| files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] |
| return {"required": |
| {"image": (sorted(files), {"image_upload": True}), |
| "channel": (s._color_channels, ), } |
| } |
|
|
| CATEGORY = "mask" |
|
|
| RETURN_TYPES = ("MASK",) |
| FUNCTION = "load_image" |
| def load_image(self, image, channel): |
| image_path = folder_paths.get_annotated_filepath(image) |
| i = node_helpers.pillow(Image.open, image_path) |
| i = node_helpers.pillow(ImageOps.exif_transpose, i) |
| if i.getbands() != ("R", "G", "B", "A"): |
| if i.mode == 'I': |
| i = i.point(lambda i: i * (1 / 255)) |
| i = i.convert("RGBA") |
| mask = None |
| c = channel[0].upper() |
| if c in i.getbands(): |
| mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 |
| mask = torch.from_numpy(mask) |
| if c == 'A': |
| mask = 1. - mask |
| else: |
| mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") |
| return (mask.unsqueeze(0),) |
|
|
| @classmethod |
| def IS_CHANGED(s, image, channel): |
| image_path = folder_paths.get_annotated_filepath(image) |
| m = hashlib.sha256() |
| with open(image_path, 'rb') as f: |
| m.update(f.read()) |
| return m.digest().hex() |
|
|
| @classmethod |
| def VALIDATE_INPUTS(s, image): |
| if not folder_paths.exists_annotated_filepath(image): |
| return "Invalid image file: {}".format(image) |
|
|
| return True |
|
|
|
|
| class LoadImageOutput(LoadImage): |
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "image": ("COMBO", { |
| "image_upload": True, |
| "image_folder": "output", |
| "remote": { |
| "route": "/internal/files/output", |
| "refresh_button": True, |
| "control_after_refresh": "first", |
| }, |
| }), |
| } |
| } |
|
|
| DESCRIPTION = "Load an image from the output folder. When the refresh button is clicked, the node will update the image list and automatically select the first image, allowing for easy iteration." |
| EXPERIMENTAL = True |
| FUNCTION = "load_image" |
|
|
|
|
| class ImageScale: |
| upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] |
| crop_methods = ["disabled", "center"] |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), |
| "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), |
| "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), |
| "crop": (s.crop_methods,)}} |
| RETURN_TYPES = ("IMAGE",) |
| FUNCTION = "upscale" |
|
|
| CATEGORY = "image/upscaling" |
|
|
| def upscale(self, image, upscale_method, width, height, crop): |
| if width == 0 and height == 0: |
| s = image |
| else: |
| samples = image.movedim(-1,1) |
|
|
| if width == 0: |
| width = max(1, round(samples.shape[3] * height / samples.shape[2])) |
| elif height == 0: |
| height = max(1, round(samples.shape[2] * width / samples.shape[3])) |
|
|
| s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop) |
| s = s.movedim(1,-1) |
| return (s,) |
|
|
| class ImageScaleBy: |
| upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), |
| "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} |
| RETURN_TYPES = ("IMAGE",) |
| FUNCTION = "upscale" |
|
|
| CATEGORY = "image/upscaling" |
|
|
| def upscale(self, image, upscale_method, scale_by): |
| samples = image.movedim(-1,1) |
| width = round(samples.shape[3] * scale_by) |
| height = round(samples.shape[2] * scale_by) |
| s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled") |
| s = s.movedim(1,-1) |
| return (s,) |
|
|
| class ImageInvert: |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "image": ("IMAGE",)}} |
|
|
| RETURN_TYPES = ("IMAGE",) |
| FUNCTION = "invert" |
|
|
| CATEGORY = "image" |
|
|
| def invert(self, image): |
| s = 1.0 - image |
| return (s,) |
|
|
| class ImageBatch: |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} |
|
|
| RETURN_TYPES = ("IMAGE",) |
| FUNCTION = "batch" |
|
|
| CATEGORY = "image" |
|
|
| def batch(self, image1, image2): |
| if image1.shape[1:] != image2.shape[1:]: |
| image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) |
| s = torch.cat((image1, image2), dim=0) |
| return (s,) |
|
|
| class EmptyImage: |
| def __init__(self, device="cpu"): |
| self.device = device |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), |
| "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), |
| "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), |
| "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), |
| }} |
| RETURN_TYPES = ("IMAGE",) |
| FUNCTION = "generate" |
|
|
| CATEGORY = "image" |
|
|
| def generate(self, width, height, batch_size=1, color=0): |
| r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) |
| g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) |
| b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) |
| return (torch.cat((r, g, b), dim=-1), ) |
|
|
| class ImagePadForOutpaint: |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "image": ("IMAGE",), |
| "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), |
| "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), |
| } |
| } |
|
|
| RETURN_TYPES = ("IMAGE", "MASK") |
| FUNCTION = "expand_image" |
|
|
| CATEGORY = "image" |
|
|
| def expand_image(self, image, left, top, right, bottom, feathering): |
| d1, d2, d3, d4 = image.size() |
|
|
| new_image = torch.ones( |
| (d1, d2 + top + bottom, d3 + left + right, d4), |
| dtype=torch.float32, |
| ) * 0.5 |
|
|
| new_image[:, top:top + d2, left:left + d3, :] = image |
|
|
| mask = torch.ones( |
| (d2 + top + bottom, d3 + left + right), |
| dtype=torch.float32, |
| ) |
|
|
| t = torch.zeros( |
| (d2, d3), |
| dtype=torch.float32 |
| ) |
|
|
| if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: |
|
|
| for i in range(d2): |
| for j in range(d3): |
| dt = i if top != 0 else d2 |
| db = d2 - i if bottom != 0 else d2 |
|
|
| dl = j if left != 0 else d3 |
| dr = d3 - j if right != 0 else d3 |
|
|
| d = min(dt, db, dl, dr) |
|
|
| if d >= feathering: |
| continue |
|
|
| v = (feathering - d) / feathering |
|
|
| t[i, j] = v * v |
|
|
| mask[top:top + d2, left:left + d3] = t |
|
|
| return (new_image, mask.unsqueeze(0)) |
|
|
|
|
| NODE_CLASS_MAPPINGS = { |
| "KSampler": KSampler, |
| "CheckpointLoaderSimple": CheckpointLoaderSimple, |
| "CLIPTextEncode": CLIPTextEncode, |
| "CLIPSetLastLayer": CLIPSetLastLayer, |
| "VAEDecode": VAEDecode, |
| "VAEEncode": VAEEncode, |
| "VAEEncodeForInpaint": VAEEncodeForInpaint, |
| "VAELoader": VAELoader, |
| "EmptyLatentImage": EmptyLatentImage, |
| "LatentUpscale": LatentUpscale, |
| "LatentUpscaleBy": LatentUpscaleBy, |
| "LatentFromBatch": LatentFromBatch, |
| "RepeatLatentBatch": RepeatLatentBatch, |
| "SaveImage": SaveImage, |
| "PreviewImage": PreviewImage, |
| "LoadImage": LoadImage, |
| "LoadImageMask": LoadImageMask, |
| "LoadImageOutput": LoadImageOutput, |
| "ImageScale": ImageScale, |
| "ImageScaleBy": ImageScaleBy, |
| "ImageInvert": ImageInvert, |
| "ImageBatch": ImageBatch, |
| "ImagePadForOutpaint": ImagePadForOutpaint, |
| "EmptyImage": EmptyImage, |
| "ConditioningAverage": ConditioningAverage , |
| "ConditioningCombine": ConditioningCombine, |
| "ConditioningConcat": ConditioningConcat, |
| "ConditioningSetArea": ConditioningSetArea, |
| "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, |
| "ConditioningSetAreaStrength": ConditioningSetAreaStrength, |
| "ConditioningSetMask": ConditioningSetMask, |
| "KSamplerAdvanced": KSamplerAdvanced, |
| "SetLatentNoiseMask": SetLatentNoiseMask, |
| "LatentComposite": LatentComposite, |
| "LatentBlend": LatentBlend, |
| "LatentRotate": LatentRotate, |
| "LatentFlip": LatentFlip, |
| "LatentCrop": LatentCrop, |
| "LoraLoader": LoraLoader, |
| "CLIPLoader": CLIPLoader, |
| "UNETLoader": UNETLoader, |
| "DualCLIPLoader": DualCLIPLoader, |
| "CLIPVisionEncode": CLIPVisionEncode, |
| "StyleModelApply": StyleModelApply, |
| "unCLIPConditioning": unCLIPConditioning, |
| "ControlNetApply": ControlNetApply, |
| "ControlNetApplyAdvanced": ControlNetApplyAdvanced, |
| "ControlNetLoader": ControlNetLoader, |
| "DiffControlNetLoader": DiffControlNetLoader, |
| "StyleModelLoader": StyleModelLoader, |
| "CLIPVisionLoader": CLIPVisionLoader, |
| "VAEDecodeTiled": VAEDecodeTiled, |
| "VAEEncodeTiled": VAEEncodeTiled, |
| "unCLIPCheckpointLoader": unCLIPCheckpointLoader, |
| "GLIGENLoader": GLIGENLoader, |
| "GLIGENTextBoxApply": GLIGENTextBoxApply, |
| "InpaintModelConditioning": InpaintModelConditioning, |
|
|
| "CheckpointLoader": CheckpointLoader, |
| "DiffusersLoader": DiffusersLoader, |
|
|
| "LoadLatent": LoadLatent, |
| "SaveLatent": SaveLatent, |
|
|
| "ConditioningZeroOut": ConditioningZeroOut, |
| "ConditioningSetTimestepRange": ConditioningSetTimestepRange, |
| "LoraLoaderModelOnly": LoraLoaderModelOnly, |
| } |
|
|
| NODE_DISPLAY_NAME_MAPPINGS = { |
| |
| "KSampler": "KSampler", |
| "KSamplerAdvanced": "KSampler (Advanced)", |
| |
| "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", |
| "CheckpointLoaderSimple": "Load Checkpoint", |
| "VAELoader": "Load VAE", |
| "LoraLoader": "Load LoRA", |
| "CLIPLoader": "Load CLIP", |
| "ControlNetLoader": "Load ControlNet Model", |
| "DiffControlNetLoader": "Load ControlNet Model (diff)", |
| "StyleModelLoader": "Load Style Model", |
| "CLIPVisionLoader": "Load CLIP Vision", |
| "UpscaleModelLoader": "Load Upscale Model", |
| "UNETLoader": "Load Diffusion Model", |
| |
| "CLIPVisionEncode": "CLIP Vision Encode", |
| "StyleModelApply": "Apply Style Model", |
| "CLIPTextEncode": "CLIP Text Encode (Prompt)", |
| "CLIPSetLastLayer": "CLIP Set Last Layer", |
| "ConditioningCombine": "Conditioning (Combine)", |
| "ConditioningAverage ": "Conditioning (Average)", |
| "ConditioningConcat": "Conditioning (Concat)", |
| "ConditioningSetArea": "Conditioning (Set Area)", |
| "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", |
| "ConditioningSetMask": "Conditioning (Set Mask)", |
| "ControlNetApply": "Apply ControlNet (OLD)", |
| "ControlNetApplyAdvanced": "Apply ControlNet", |
| |
| "VAEEncodeForInpaint": "VAE Encode (for Inpainting)", |
| "SetLatentNoiseMask": "Set Latent Noise Mask", |
| "VAEDecode": "VAE Decode", |
| "VAEEncode": "VAE Encode", |
| "LatentRotate": "Rotate Latent", |
| "LatentFlip": "Flip Latent", |
| "LatentCrop": "Crop Latent", |
| "EmptyLatentImage": "Empty Latent Image", |
| "LatentUpscale": "Upscale Latent", |
| "LatentUpscaleBy": "Upscale Latent By", |
| "LatentComposite": "Latent Composite", |
| "LatentBlend": "Latent Blend", |
| "LatentFromBatch" : "Latent From Batch", |
| "RepeatLatentBatch": "Repeat Latent Batch", |
| |
| "SaveImage": "Save Image", |
| "PreviewImage": "Preview Image", |
| "LoadImage": "Load Image", |
| "LoadImageMask": "Load Image (as Mask)", |
| "LoadImageOutput": "Load Image (from Outputs)", |
| "ImageScale": "Upscale Image", |
| "ImageScaleBy": "Upscale Image By", |
| "ImageUpscaleWithModel": "Upscale Image (using Model)", |
| "ImageInvert": "Invert Image", |
| "ImagePadForOutpaint": "Pad Image for Outpainting", |
| "ImageBatch": "Batch Images", |
| "ImageCrop": "Image Crop", |
| "ImageStitch": "Image Stitch", |
| "ImageBlend": "Image Blend", |
| "ImageBlur": "Image Blur", |
| "ImageQuantize": "Image Quantize", |
| "ImageSharpen": "Image Sharpen", |
| "ImageScaleToTotalPixels": "Scale Image to Total Pixels", |
| "GetImageSize": "Get Image Size", |
| |
| "VAEDecodeTiled": "VAE Decode (Tiled)", |
| "VAEEncodeTiled": "VAE Encode (Tiled)", |
| } |
|
|
| EXTENSION_WEB_DIRS = {} |
|
|
| |
| LOADED_MODULE_DIRS = {} |
|
|
|
|
| def get_module_name(module_path: str) -> str: |
| """ |
| Returns the module name based on the given module path. |
| Examples: |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.py") -> "my_custom_node" |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node") -> "my_custom_node" |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/") -> "my_custom_node" |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__.py") -> "my_custom_node" |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__") -> "my_custom_node" |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__/") -> "my_custom_node" |
| get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.disabled") -> "custom_nodes |
| Args: |
| module_path (str): The path of the module. |
| Returns: |
| str: The module name. |
| """ |
| base_path = os.path.basename(module_path) |
| if os.path.isfile(module_path): |
| base_path = os.path.splitext(base_path)[0] |
| return base_path |
|
|
|
|
| async def load_custom_node(module_path: str, ignore=set(), module_parent="custom_nodes") -> bool: |
| module_name = get_module_name(module_path) |
| if os.path.isfile(module_path): |
| sp = os.path.splitext(module_path) |
| module_name = sp[0] |
| sys_module_name = module_name |
| elif os.path.isdir(module_path): |
| sys_module_name = module_path.replace(".", "_x_") |
|
|
| try: |
| logging.debug("Trying to load custom node {}".format(module_path)) |
| if os.path.isfile(module_path): |
| module_spec = importlib.util.spec_from_file_location(sys_module_name, module_path) |
| module_dir = os.path.split(module_path)[0] |
| else: |
| module_spec = importlib.util.spec_from_file_location(sys_module_name, os.path.join(module_path, "__init__.py")) |
| module_dir = module_path |
|
|
| module = importlib.util.module_from_spec(module_spec) |
| sys.modules[sys_module_name] = module |
| module_spec.loader.exec_module(module) |
|
|
| LOADED_MODULE_DIRS[module_name] = os.path.abspath(module_dir) |
|
|
| try: |
| from comfy_config import config_parser |
|
|
| project_config = config_parser.extract_node_configuration(module_path) |
|
|
| web_dir_name = project_config.tool_comfy.web |
|
|
| if web_dir_name: |
| web_dir_path = os.path.join(module_path, web_dir_name) |
|
|
| if os.path.isdir(web_dir_path): |
| project_name = project_config.project.name |
|
|
| EXTENSION_WEB_DIRS[project_name] = web_dir_path |
|
|
| logging.info("Automatically register web folder {} for {}".format(web_dir_name, project_name)) |
| except Exception as e: |
| logging.warning(f"Unable to parse pyproject.toml due to lack dependency pydantic-settings, please run 'pip install -r requirements.txt': {e}") |
|
|
| if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None: |
| web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY"))) |
| if os.path.isdir(web_dir): |
| EXTENSION_WEB_DIRS[module_name] = web_dir |
|
|
| |
| if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None: |
| for name, node_cls in module.NODE_CLASS_MAPPINGS.items(): |
| if name not in ignore: |
| NODE_CLASS_MAPPINGS[name] = node_cls |
| node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path)) |
| if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None: |
| NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) |
| return True |
| |
| elif hasattr(module, "comfy_entrypoint"): |
| entrypoint = getattr(module, "comfy_entrypoint") |
| if not callable(entrypoint): |
| logging.warning(f"comfy_entrypoint in {module_path} is not callable, skipping.") |
| return False |
| try: |
| if inspect.iscoroutinefunction(entrypoint): |
| extension = await entrypoint() |
| else: |
| extension = entrypoint() |
| if not isinstance(extension, ComfyExtension): |
| logging.warning(f"comfy_entrypoint in {module_path} did not return a ComfyExtension, skipping.") |
| return False |
| node_list = await extension.get_node_list() |
| if not isinstance(node_list, list): |
| logging.warning(f"comfy_entrypoint in {module_path} did not return a list of nodes, skipping.") |
| return False |
| for node_cls in node_list: |
| node_cls: io.ComfyNode |
| schema = node_cls.GET_SCHEMA() |
| if schema.node_id not in ignore: |
| NODE_CLASS_MAPPINGS[schema.node_id] = node_cls |
| node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path)) |
| if schema.display_name is not None: |
| NODE_DISPLAY_NAME_MAPPINGS[schema.node_id] = schema.display_name |
| return True |
| except Exception as e: |
| logging.warning(f"Error while calling comfy_entrypoint in {module_path}: {e}") |
| return False |
| else: |
| logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS or NODES_LIST (need one).") |
| return False |
| except Exception as e: |
| logging.warning(traceback.format_exc()) |
| logging.warning(f"Cannot import {module_path} module for custom nodes: {e}") |
| return False |
|
|
| async def init_external_custom_nodes(): |
| """ |
| Initializes the external custom nodes. |
| |
| This function loads custom nodes from the specified folder paths and imports them into the application. |
| It measures the import times for each custom node and logs the results. |
| |
| Returns: |
| None |
| """ |
| base_node_names = set(NODE_CLASS_MAPPINGS.keys()) |
| node_paths = folder_paths.get_folder_paths("custom_nodes") |
| node_import_times = [] |
| for custom_node_path in node_paths: |
| possible_modules = os.listdir(os.path.realpath(custom_node_path)) |
| if "__pycache__" in possible_modules: |
| possible_modules.remove("__pycache__") |
|
|
| for possible_module in possible_modules: |
| module_path = os.path.join(custom_node_path, possible_module) |
| if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue |
| if module_path.endswith(".disabled"): continue |
| if args.disable_all_custom_nodes and possible_module not in args.whitelist_custom_nodes: |
| logging.info(f"Skipping {possible_module} due to disable_all_custom_nodes and whitelist_custom_nodes") |
| continue |
| time_before = time.perf_counter() |
| success = await load_custom_node(module_path, base_node_names, module_parent="custom_nodes") |
| node_import_times.append((time.perf_counter() - time_before, module_path, success)) |
|
|
| if len(node_import_times) > 0: |
| logging.info("\nImport times for custom nodes:") |
| for n in sorted(node_import_times): |
| if n[2]: |
| import_message = "" |
| else: |
| import_message = " (IMPORT FAILED)" |
| logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1])) |
| logging.info("") |
|
|
| async def init_builtin_extra_nodes(): |
| """ |
| Initializes the built-in extra nodes in ComfyUI. |
| |
| This function loads the extra node files located in the "comfy_extras" directory and imports them into ComfyUI. |
| If any of the extra node files fail to import, a warning message is logged. |
| |
| Returns: |
| None |
| """ |
| extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras") |
| extras_files = [ |
| "nodes_latent.py", |
| "nodes_hypernetwork.py", |
| "nodes_upscale_model.py", |
| "nodes_post_processing.py", |
| "nodes_mask.py", |
| "nodes_compositing.py", |
| "nodes_rebatch.py", |
| "nodes_model_merging.py", |
| "nodes_tomesd.py", |
| "nodes_clip_sdxl.py", |
| "nodes_canny.py", |
| "nodes_freelunch.py", |
| "nodes_custom_sampler.py", |
| "nodes_hypertile.py", |
| "nodes_model_advanced.py", |
| "nodes_model_downscale.py", |
| "nodes_images.py", |
| "nodes_video_model.py", |
| "nodes_train.py", |
| "nodes_sag.py", |
| "nodes_perpneg.py", |
| "nodes_stable3d.py", |
| "nodes_sdupscale.py", |
| "nodes_photomaker.py", |
| "nodes_pixart.py", |
| "nodes_cond.py", |
| "nodes_morphology.py", |
| "nodes_stable_cascade.py", |
| "nodes_differential_diffusion.py", |
| "nodes_ip2p.py", |
| "nodes_model_merging_model_specific.py", |
| "nodes_pag.py", |
| "nodes_align_your_steps.py", |
| "nodes_attention_multiply.py", |
| "nodes_advanced_samplers.py", |
| "nodes_webcam.py", |
| "nodes_audio.py", |
| "nodes_sd3.py", |
| "nodes_gits.py", |
| "nodes_controlnet.py", |
| "nodes_hunyuan.py", |
| "nodes_flux.py", |
| "nodes_lora_extract.py", |
| "nodes_torch_compile.py", |
| "nodes_mochi.py", |
| "nodes_slg.py", |
| "nodes_mahiro.py", |
| "nodes_lt.py", |
| "nodes_hooks.py", |
| "nodes_load_3d.py", |
| "nodes_cosmos.py", |
| "nodes_video.py", |
| "nodes_lumina2.py", |
| "nodes_wan.py", |
| "nodes_lotus.py", |
| "nodes_hunyuan3d.py", |
| "nodes_primitive.py", |
| "nodes_cfg.py", |
| "nodes_optimalsteps.py", |
| "nodes_hidream.py", |
| "nodes_fresca.py", |
| "nodes_apg.py", |
| "nodes_preview_any.py", |
| "nodes_ace.py", |
| "nodes_string.py", |
| "nodes_camera_trajectory.py", |
| "nodes_edit_model.py", |
| "nodes_tcfg.py", |
| "nodes_context_windows.py", |
| "nodes_qwen.py", |
| "nodes_chroma_radiance.py", |
| "nodes_model_patch.py", |
| "nodes_easycache.py", |
| "nodes_audio_encoder.py", |
| ] |
|
|
| import_failed = [] |
| for node_file in extras_files: |
| if not await load_custom_node(os.path.join(extras_dir, node_file), module_parent="comfy_extras"): |
| import_failed.append(node_file) |
|
|
| return import_failed |
|
|
|
|
| async def init_builtin_api_nodes(): |
| api_nodes_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_api_nodes") |
| api_nodes_files = [ |
| "nodes_ideogram.py", |
| "nodes_openai.py", |
| "nodes_minimax.py", |
| "nodes_veo2.py", |
| "nodes_kling.py", |
| "nodes_bfl.py", |
| "nodes_bytedance.py", |
| "nodes_luma.py", |
| "nodes_recraft.py", |
| "nodes_pixverse.py", |
| "nodes_stability.py", |
| "nodes_pika.py", |
| "nodes_runway.py", |
| "nodes_tripo.py", |
| "nodes_moonvalley.py", |
| "nodes_rodin.py", |
| "nodes_gemini.py", |
| "nodes_vidu.py", |
| ] |
|
|
| if not await load_custom_node(os.path.join(api_nodes_dir, "canary.py"), module_parent="comfy_api_nodes"): |
| return api_nodes_files |
|
|
| import_failed = [] |
| for node_file in api_nodes_files: |
| if not await load_custom_node(os.path.join(api_nodes_dir, node_file), module_parent="comfy_api_nodes"): |
| import_failed.append(node_file) |
|
|
| return import_failed |
|
|
| async def init_public_apis(): |
| register_versions([ |
| ComfyAPIWithVersion( |
| version=getattr(v, "VERSION"), |
| api_class=v |
| ) for v in supported_versions |
| ]) |
|
|
| async def init_extra_nodes(init_custom_nodes=True, init_api_nodes=True): |
| await init_public_apis() |
|
|
| import_failed = await init_builtin_extra_nodes() |
|
|
| import_failed_api = [] |
| if init_api_nodes: |
| import_failed_api = await init_builtin_api_nodes() |
|
|
| if init_custom_nodes: |
| await init_external_custom_nodes() |
| else: |
| logging.info("Skipping loading of custom nodes") |
|
|
| if len(import_failed_api) > 0: |
| logging.warning("WARNING: some comfy_api_nodes/ nodes did not import correctly. This may be because they are missing some dependencies.\n") |
| for node in import_failed_api: |
| logging.warning("IMPORT FAILED: {}".format(node)) |
| logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") |
| if args.windows_standalone_build: |
| logging.warning("Please run the update script: update/update_comfyui.bat") |
| else: |
| logging.warning("Please do a: pip install -r requirements.txt") |
| logging.warning("") |
|
|
| if len(import_failed) > 0: |
| logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n") |
| for node in import_failed: |
| logging.warning("IMPORT FAILED: {}".format(node)) |
| logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") |
| if args.windows_standalone_build: |
| logging.warning("Please run the update script: update/update_comfyui.bat") |
| else: |
| logging.warning("Please do a: pip install -r requirements.txt") |
| logging.warning("") |
|
|
| return import_failed |
|
|