| | import os |
| | import folder_paths |
| | import comfy.diffusers_load |
| | import comfy.samplers |
| | import comfy.sample |
| | import comfy.sd |
| | import comfy.utils |
| | import comfy.controlnet |
| | import comfy.clip_vision |
| | import comfy.model_management |
| | from comfy.cli_args import args |
| | import torch |
| | import torch.nn as nn |
| | import numpy as np |
| | import latent_preview |
| | from PIL import Image |
| | from einops import rearrange |
| | import scipy.ndimage |
| | import sys |
| | import cv2 |
| | from magic_utils import HWC3, apply_color, common_input_validate, resize_image_with_pad |
| | from pidi import pidinet |
| |
|
| |
|
| | supported_pt_extensions = set(['.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl']) |
| | folder_names_and_paths = {} |
| |
|
| | base_path = os.path.dirname(os.path.realpath(__file__)) |
| | models_dir = os.path.join(base_path, "../models") |
| |
|
| | folder_names_and_paths["checkpoints"] = ([os.path.join(models_dir, "checkpoints")], supported_pt_extensions) |
| | folder_names_and_paths["configs"] = ([os.path.join(models_dir, "configs")], [".yaml"]) |
| |
|
| | folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions) |
| | folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions) |
| | folder_names_and_paths["clip"] = ([os.path.join(models_dir, "clip")], supported_pt_extensions) |
| | folder_names_and_paths["unet"] = ([os.path.join(models_dir, "unet")], supported_pt_extensions) |
| | folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions) |
| | folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions) |
| | folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions) |
| | folder_names_and_paths["diffusers"] = ([os.path.join(models_dir, "diffusers")], ["folder"]) |
| | folder_names_and_paths["vae_approx"] = ([os.path.join(models_dir, "vae_approx")], supported_pt_extensions) |
| |
|
| | folder_names_and_paths["controlnet"] = ([os.path.join(models_dir, "controlnet"), os.path.join(models_dir, "t2i_adapter")], supported_pt_extensions) |
| | folder_names_and_paths["gligen"] = ([os.path.join(models_dir, "gligen")], supported_pt_extensions) |
| | folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions) |
| | folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions) |
| | folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")], supported_pt_extensions) |
| | folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""}) |
| |
|
| | def common_annotator_call(model, tensor_image, input_batch=False, show_pbar=True, **kwargs): |
| | if "detect_resolution" in kwargs: |
| | del kwargs["detect_resolution"] |
| |
|
| | if "resolution" in kwargs: |
| | detect_resolution = kwargs["resolution"] if type(kwargs["resolution"]) == int and kwargs["resolution"] >= 64 else 512 |
| | del kwargs["resolution"] |
| | else: |
| | detect_resolution = 512 |
| |
|
| | if input_batch: |
| | np_images = np.asarray(tensor_image * 255., dtype=np.uint8) |
| | np_results = model(np_images, output_type="np", detect_resolution=detect_resolution, **kwargs) |
| | return torch.from_numpy(np_results.astype(np.float32) / 255.0) |
| |
|
| | batch_size = tensor_image.shape[0] |
| | if show_pbar: |
| | pbar = comfy.utils.ProgressBar(batch_size) |
| | out_tensor = None |
| | for i, image in enumerate(tensor_image): |
| | np_image = np.asarray(image.cpu() * 255., dtype=np.uint8) |
| | np_result = model(np_image, output_type="np", detect_resolution=detect_resolution, **kwargs) |
| | out = torch.from_numpy(np_result.astype(np.float32) / 255.0) |
| | if out_tensor is None: |
| | out_tensor = torch.zeros(batch_size, *out.shape, dtype=torch.float32) |
| | out_tensor[i] = out |
| | if show_pbar: |
| | pbar.update(1) |
| | return out_tensor |
| |
|
| | class CheckpointLoaderSimple: |
| | def load_checkpoint(self, ckpt_name): |
| | ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) |
| | print("Loading checkpoint from:", ckpt_path) |
| | out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) |
| | return out[:3] |
| |
|
| | class ControlNetLoader: |
| | def load_controlnet(self, control_net_name): |
| | controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) |
| | controlnet = comfy.controlnet.load_controlnet(controlnet_path) |
| | return (controlnet, ) |
| |
|
| | class ControlNetApplyAdvanced: |
| | def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent): |
| | if strength == 0: |
| | return (positive, negative) |
| |
|
| | control_hint = image.movedim(-1,1) |
| | cnets = {} |
| |
|
| | out = [] |
| | for conditioning in [positive, negative]: |
| | c = [] |
| | for t in conditioning: |
| | d = t[1].copy() |
| |
|
| | prev_cnet = d.get('control', None) |
| | if prev_cnet in cnets: |
| | c_net = cnets[prev_cnet] |
| | else: |
| | c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent)) |
| | c_net.set_previous_controlnet(prev_cnet) |
| | cnets[prev_cnet] = c_net |
| |
|
| | d['control'] = c_net |
| | d['control_apply_to_uncond'] = False |
| | n = [t[0], d] |
| | c.append(n) |
| | out.append(c) |
| | return (out[0], out[1]) |
| | |
| | class CLIPTextEncode: |
| | def encode(self, clip, text): |
| | tokens = clip.tokenize(text) |
| | cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True) |
| | return ([[cond, {"pooled_output": pooled}]], ) |
| |
|
| | class KSampler: |
| | def common_ksampler(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): |
| | latent_image = latent["samples"] |
| | latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) |
| |
|
| | if disable_noise: |
| | noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") |
| | else: |
| | batch_inds = latent["batch_index"] if "batch_index" in latent else None |
| | noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) |
| |
|
| | noise_mask = None |
| | if "noise_mask" in latent: |
| | noise_mask = latent["noise_mask"] |
| |
|
| | callback = latent_preview.prepare_callback(model, steps) |
| | disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED |
| | samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, |
| | denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, |
| | force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) |
| | out = latent.copy() |
| | out["samples"] = samples |
| | return (out, ) |
| |
|
| | def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): |
| | return self.common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) |
| |
|
| | class VAEDecode: |
| | def decode(self, vae, samples): |
| | return (vae.decode(samples["samples"]), ) |
| |
|
| | class ColorDetector: |
| | def __call__(self, input_image=None, detect_resolution=2048, output_type=None, **kwargs): |
| | input_image, output_type = common_input_validate(input_image, output_type, **kwargs) |
| | input_image = HWC3(input_image) |
| | detected_map = HWC3(apply_color(input_image, detect_resolution)) |
| | |
| | if output_type == "pil": |
| | detected_map = Image.fromarray(detected_map) |
| | |
| | return detected_map |
| |
|
| | class Color_Preprocessor: |
| | def execute(self, image, resolution=512, **kwargs): |
| | return (common_annotator_call(ColorDetector(), image, resolution=resolution), ) |
| | |
| | norm_layer = nn.InstanceNorm2d |
| | class ResidualBlock(nn.Module): |
| | def __init__(self, in_features): |
| | super(ResidualBlock, self).__init__() |
| |
|
| | conv_block = [ nn.ReflectionPad2d(1), |
| | nn.Conv2d(in_features, in_features, 3), |
| | norm_layer(in_features), |
| | nn.ReLU(inplace=True), |
| | nn.ReflectionPad2d(1), |
| | nn.Conv2d(in_features, in_features, 3), |
| | norm_layer(in_features) |
| | ] |
| |
|
| | self.conv_block = nn.Sequential(*conv_block) |
| |
|
| | def forward(self, x): |
| | return x + self.conv_block(x) |
| |
|
| | class Generator(nn.Module): |
| | def __init__(self, input_nc, output_nc, n_residual_blocks=9, sigmoid=True): |
| | super(Generator, self).__init__() |
| |
|
| | |
| | model0 = [ nn.ReflectionPad2d(3), |
| | nn.Conv2d(input_nc, 64, 7), |
| | norm_layer(64), |
| | nn.ReLU(inplace=True) ] |
| | self.model0 = nn.Sequential(*model0) |
| |
|
| | |
| | model1 = [] |
| | in_features = 64 |
| | out_features = in_features*2 |
| | for _ in range(2): |
| | model1 += [ nn.Conv2d(in_features, out_features, 3, stride=2, padding=1), |
| | norm_layer(out_features), |
| | nn.ReLU(inplace=True) ] |
| | in_features = out_features |
| | out_features = in_features*2 |
| | self.model1 = nn.Sequential(*model1) |
| |
|
| | model2 = [] |
| | |
| | for _ in range(n_residual_blocks): |
| | model2 += [ResidualBlock(in_features)] |
| | self.model2 = nn.Sequential(*model2) |
| |
|
| | |
| | model3 = [] |
| | out_features = in_features//2 |
| | for _ in range(2): |
| | model3 += [ nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1), |
| | norm_layer(out_features), |
| | nn.ReLU(inplace=True) ] |
| | in_features = out_features |
| | out_features = in_features//2 |
| | self.model3 = nn.Sequential(*model3) |
| |
|
| | |
| | model4 = [ nn.ReflectionPad2d(3), |
| | nn.Conv2d(64, output_nc, 7)] |
| | if sigmoid: |
| | model4 += [nn.Sigmoid()] |
| |
|
| | self.model4 = nn.Sequential(*model4) |
| |
|
| | def forward(self, x, cond=None): |
| | out = self.model0(x) |
| | out = self.model1(out) |
| | out = self.model2(out) |
| | out = self.model3(out) |
| | out = self.model4(out) |
| |
|
| | return out |
| |
|
| | class LineartDetector: |
| | def __init__(self, model, coarse_model): |
| | self.model = model |
| | self.model_coarse = coarse_model |
| | self.device = "cpu" |
| |
|
| | @classmethod |
| | def from_pretrained(cls): |
| | current_dir = os.path.dirname(os.path.abspath(__file__)) |
| | model_path = os.path.join(current_dir, "../models/preprocessor/sk_model.pth") |
| | coarse_model_path = os.path.join(current_dir, "../models/preprocessor/sk_model2.pth") |
| |
|
| | |
| | model = Generator(3, 1, 3) |
| | model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) |
| | model.eval() |
| |
|
| | coarse_model = Generator(3, 1, 3) |
| | coarse_model.load_state_dict(torch.load(coarse_model_path, map_location=torch.device('cpu'))) |
| | coarse_model.eval() |
| |
|
| | return cls(model, coarse_model) |
| | |
| | def to(self, device): |
| | self.model.to(device) |
| | self.model_coarse.to(device) |
| | self.device = device |
| | return self |
| | |
| | def __call__(self, input_image, coarse=False, detect_resolution=512, output_type="pil", upscale_method="INTER_CUBIC", **kwargs): |
| | input_image, output_type = common_input_validate(input_image, output_type, **kwargs) |
| | detected_map, remove_pad = resize_image_with_pad(input_image, detect_resolution, upscale_method) |
| |
|
| | model = self.model_coarse if coarse else self.model |
| | assert detected_map.ndim == 3 |
| | with torch.no_grad(): |
| | image = torch.from_numpy(detected_map).float().to(self.device) |
| | image = image / 255.0 |
| | image = rearrange(image, 'h w c -> 1 c h w') |
| | line = model(image)[0][0] |
| |
|
| | line = line.cpu().numpy() |
| | line = (line * 255.0).clip(0, 255).astype(np.uint8) |
| |
|
| | detected_map = HWC3(line) |
| | detected_map = remove_pad(255 - detected_map) |
| | |
| | if output_type == "pil": |
| | detected_map = Image.fromarray(detected_map) |
| | |
| | return detected_map |
| |
|
| | class LineArt_Preprocessor: |
| | def execute(self, image, resolution=512, **kwargs): |
| | model = LineartDetector.from_pretrained().to(comfy.model_management.get_torch_device()) |
| | print("model.device:", model.device) |
| | out = common_annotator_call(model, image, resolution=resolution, apply_filter=False, coarse = kwargs["coarse"] == "enable") |
| | del model |
| | return (out, ) |
| | |
| | def nms(x, t, s): |
| | x = cv2.GaussianBlur(x.astype(np.float32), (0, 0), s) |
| |
|
| | f1 = np.array([[0, 0, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8) |
| | f2 = np.array([[0, 1, 0], [0, 1, 0], [0, 1, 0]], dtype=np.uint8) |
| | f3 = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=np.uint8) |
| | f4 = np.array([[0, 0, 1], [0, 1, 0], [1, 0, 0]], dtype=np.uint8) |
| |
|
| | y = np.zeros_like(x) |
| |
|
| | for f in [f1, f2, f3, f4]: |
| | np.putmask(y, cv2.dilate(x, kernel=f) == x, x) |
| |
|
| | z = np.zeros_like(y, dtype=np.uint8) |
| | z[y > t] = 255 |
| | return z |
| |
|
| | class PidiNetDetector: |
| | def __init__(self, netNetwork): |
| | self.netNetwork = netNetwork |
| | self.device = "cpu" |
| |
|
| | @classmethod |
| | def from_pretrained(cls, filename="table5_pidinet.pth"): |
| | current_dir = os.path.dirname(os.path.abspath(__file__)) |
| | model_path = os.path.join(current_dir, f"../models/preprocessor/{filename}") |
| |
|
| | netNetwork = pidinet() |
| | netNetwork.load_state_dict({k.replace('module.', ''): v for k, v in torch.load(model_path)['state_dict'].items()}) |
| | netNetwork.eval() |
| |
|
| | return cls(netNetwork) |
| |
|
| | def to(self, device): |
| | self.netNetwork.to(device) |
| | self.device = device |
| | return self |
| | |
| | def __call__(self, input_image, detect_resolution=512, safe=False, output_type="pil", scribble=False, apply_filter=True, upscale_method="INTER_CUBIC", **kwargs): |
| | input_image, output_type = common_input_validate(input_image, output_type, **kwargs) |
| | detected_map, remove_pad = resize_image_with_pad(input_image, detect_resolution, upscale_method) |
| | |
| | detected_map = detected_map[:, :, ::-1].copy() |
| | with torch.no_grad(): |
| | image_pidi = torch.from_numpy(detected_map).float().to(self.device) |
| | image_pidi = image_pidi / 255.0 |
| | image_pidi = rearrange(image_pidi, 'h w c -> 1 c h w') |
| | edge = self.netNetwork(image_pidi)[-1] |
| | edge = edge.cpu().numpy() |
| | if apply_filter: |
| | edge = edge > 0.5 |
| | edge = (edge * 255.0).clip(0, 255).astype(np.uint8) |
| |
|
| | detected_map = edge[0, 0] |
| |
|
| | if scribble: |
| | detected_map = nms(detected_map, 127, 3.0) |
| | detected_map = cv2.GaussianBlur(detected_map, (0, 0), 3.0) |
| | detected_map[detected_map > 4] = 255 |
| | detected_map[detected_map < 255] = 0 |
| |
|
| | detected_map = HWC3(remove_pad(detected_map)) |
| |
|
| | if output_type == "pil": |
| | detected_map = Image.fromarray(detected_map) |
| |
|
| | return detected_map |
| |
|
| | class GrowMask: |
| | def expand_mask(self, mask, expand, tapered_corners): |
| | c = 0 if tapered_corners else 1 |
| | kernel = np.array([[c, 1, c], |
| | [1, 1, 1], |
| | [c, 1, c]]) |
| | mask = mask.reshape((-1, mask.shape[-2], mask.shape[-1])) |
| | out = [] |
| | for m in mask: |
| | output = m.numpy() |
| | for _ in range(abs(expand)): |
| | if expand < 0: |
| | output = scipy.ndimage.grey_erosion(output, footprint=kernel) |
| | else: |
| | output = scipy.ndimage.grey_dilation(output, footprint=kernel) |
| | output = torch.from_numpy(output) |
| | out.append(output) |
| | return (torch.stack(out, dim=0),) |
| | |
| | class PIDINET_Preprocessor: |
| | def execute(self, image, resolution=512, **kwargs): |
| | model = PidiNetDetector.from_pretrained().to(comfy.model_management.get_torch_device()) |
| | out = common_annotator_call(model, image, resolution=resolution, safe=True) |
| | del model |
| | return (out, ) |