| | import torch |
| | import torchvision |
| | import cv2 |
| | import numpy as np |
| | import folder_paths |
| | import nodes |
| | from . import config |
| | from PIL import Image, ImageFilter |
| | from scipy.ndimage import zoom |
| | import comfy |
| |
|
| |
|
| | class TensorBatchBuilder: |
| | def __init__(self): |
| | self.tensor = None |
| |
|
| | def concat(self, new_tensor): |
| | if self.tensor is None: |
| | self.tensor = new_tensor |
| | else: |
| | self.tensor = torch.concat((self.tensor, new_tensor), dim=0) |
| |
|
| |
|
| | def tensor_convert_rgba(image, prefer_copy=True): |
| | """Assumes NHWC format tensor with 1, 3 or 4 channels.""" |
| | _tensor_check_image(image) |
| | n_channel = image.shape[-1] |
| | if n_channel == 4: |
| | return image |
| |
|
| | if n_channel == 3: |
| | alpha = torch.ones((*image.shape[:-1], 1)) |
| | return torch.cat((image, alpha), axis=-1) |
| |
|
| | if n_channel == 1: |
| | if prefer_copy: |
| | image = image.repeat(1, -1, -1, 4) |
| | else: |
| | image = image.expand(1, -1, -1, 3) |
| | return image |
| |
|
| | |
| | raise ValueError(f"illegal conversion (channels: {n_channel} -> 4)") |
| |
|
| |
|
| | def tensor_convert_rgb(image, prefer_copy=True): |
| | """Assumes NHWC format tensor with 1, 3 or 4 channels.""" |
| | _tensor_check_image(image) |
| | n_channel = image.shape[-1] |
| | if n_channel == 3: |
| | return image |
| |
|
| | if n_channel == 4: |
| | image = image[..., :3] |
| | if prefer_copy: |
| | image = image.copy() |
| | return image |
| |
|
| | if n_channel == 1: |
| | if prefer_copy: |
| | image = image.repeat(1, -1, -1, 4) |
| | else: |
| | image = image.expand(1, -1, -1, 3) |
| | return image |
| |
|
| | |
| | raise ValueError(f"illegal conversion (channels: {n_channel} -> 3)") |
| |
|
| |
|
| | def general_tensor_resize(image, w: int, h: int): |
| | _tensor_check_image(image) |
| | image = image.permute(0, 3, 1, 2) |
| | image = torch.nn.functional.interpolate(image, size=(h, w), mode="bilinear") |
| | image = image.permute(0, 2, 3, 1) |
| | return image |
| |
|
| |
|
| | |
| | LANCZOS = (Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS) |
| | def tensor_resize(image, w: int, h: int): |
| | _tensor_check_image(image) |
| | if image.shape[3] >= 3: |
| | scaled_images = TensorBatchBuilder() |
| | for single_image in image: |
| | single_image = single_image.unsqueeze(0) |
| | single_pil = tensor2pil(single_image) |
| | scaled_pil = single_pil.resize((w, h), resample=LANCZOS) |
| |
|
| | single_image = pil2tensor(scaled_pil) |
| | scaled_images.concat(single_image) |
| |
|
| | return scaled_images.tensor |
| | else: |
| | return general_tensor_resize(image, w, h) |
| |
|
| |
|
| | def tensor_get_size(image): |
| | """Mimicking `PIL.Image.size`""" |
| | _tensor_check_image(image) |
| | _, h, w, _ = image.shape |
| | return (w, h) |
| |
|
| |
|
| | def tensor2pil(image): |
| | _tensor_check_image(image) |
| | return Image.fromarray(np.clip(255. * image.cpu().numpy().squeeze(0), 0, 255).astype(np.uint8)) |
| |
|
| |
|
| | def pil2tensor(image): |
| | return torch.from_numpy(np.array(image).astype(np.float32) / 255.0).unsqueeze(0) |
| |
|
| |
|
| | def numpy2pil(image): |
| | return Image.fromarray(np.clip(255. * image.squeeze(0), 0, 255).astype(np.uint8)) |
| |
|
| |
|
| | def to_pil(image): |
| | if isinstance(image, Image.Image): |
| | return image |
| | if isinstance(image, torch.Tensor): |
| | return tensor2pil(image) |
| | if isinstance(image, np.ndarray): |
| | return numpy2pil(image) |
| | raise ValueError(f"Cannot convert {type(image)} to PIL.Image") |
| |
|
| |
|
| | def to_tensor(image): |
| | if isinstance(image, Image.Image): |
| | return torch.from_numpy(np.array(image)) / 255.0 |
| | if isinstance(image, torch.Tensor): |
| | return image |
| | if isinstance(image, np.ndarray): |
| | return torch.from_numpy(image) |
| | raise ValueError(f"Cannot convert {type(image)} to torch.Tensor") |
| |
|
| |
|
| | def to_numpy(image): |
| | if isinstance(image, Image.Image): |
| | return np.array(image) |
| | if isinstance(image, torch.Tensor): |
| | return image.numpy() |
| | if isinstance(image, np.ndarray): |
| | return image |
| | raise ValueError(f"Cannot convert {type(image)} to numpy.ndarray") |
| | |
| |
|
| |
|
| | def tensor_putalpha(image, mask): |
| | _tensor_check_image(image) |
| | _tensor_check_mask(mask) |
| | image[..., -1] = mask[..., 0] |
| |
|
| |
|
| | def _tensor_check_image(image): |
| | if image.ndim != 4: |
| | raise ValueError(f"Expected NHWC tensor, but found {image.ndim} dimensions") |
| | if image.shape[-1] not in (1, 3, 4): |
| | raise ValueError(f"Expected 1, 3 or 4 channels for image, but found {image.shape[-1]} channels") |
| | return |
| |
|
| |
|
| | def _tensor_check_mask(mask): |
| | if mask.ndim != 4: |
| | raise ValueError(f"Expected NHWC tensor, but found {mask.ndim} dimensions") |
| | if mask.shape[-1] != 1: |
| | raise ValueError(f"Expected 1 channel for mask, but found {mask.shape[-1]} channels") |
| | return |
| |
|
| |
|
| | def tensor_crop(image, crop_region): |
| | _tensor_check_image(image) |
| | return crop_ndarray4(image, crop_region) |
| |
|
| |
|
| | def tensor2numpy(image): |
| | _tensor_check_image(image) |
| | return image.numpy() |
| |
|
| |
|
| | def tensor_paste(image1, image2, left_top, mask): |
| | """Mask and image2 has to be the same size""" |
| | _tensor_check_image(image1) |
| | _tensor_check_image(image2) |
| | _tensor_check_mask(mask) |
| | if image2.shape[1:3] != mask.shape[1:3]: |
| | mask = resize_mask(mask.squeeze(dim=3), image2.shape[1:3]).unsqueeze(dim=3) |
| | |
| |
|
| | x, y = left_top |
| | _, h1, w1, _ = image1.shape |
| | _, h2, w2, _ = image2.shape |
| |
|
| | |
| | w = min(w1, x + w2) - x |
| | h = min(h1, y + h2) - y |
| |
|
| | |
| | if w <= 0 or h <= 0: |
| | return |
| |
|
| | mask = mask[:, :h, :w, :] |
| | image1[:, y:y+h, x:x+w, :] = ( |
| | (1 - mask) * image1[:, y:y+h, x:x+w, :] + |
| | mask * image2[:, :h, :w, :] |
| | ) |
| | return |
| |
|
| |
|
| | def center_of_bbox(bbox): |
| | w, h = bbox[2] - bbox[0], bbox[3] - bbox[1] |
| | return bbox[0] + w/2, bbox[1] + h/2 |
| |
|
| |
|
| | def combine_masks(masks): |
| | if len(masks) == 0: |
| | return None |
| | else: |
| | initial_cv2_mask = np.array(masks[0][1]) |
| | combined_cv2_mask = initial_cv2_mask |
| |
|
| | for i in range(1, len(masks)): |
| | cv2_mask = np.array(masks[i][1]) |
| |
|
| | if combined_cv2_mask.shape == cv2_mask.shape: |
| | combined_cv2_mask = cv2.bitwise_or(combined_cv2_mask, cv2_mask) |
| | else: |
| | |
| | pass |
| |
|
| | mask = torch.from_numpy(combined_cv2_mask) |
| | return mask |
| |
|
| |
|
| | def combine_masks2(masks): |
| | if len(masks) == 0: |
| | return None |
| | else: |
| | initial_cv2_mask = np.array(masks[0]).astype(np.uint8) |
| | combined_cv2_mask = initial_cv2_mask |
| |
|
| | for i in range(1, len(masks)): |
| | cv2_mask = np.array(masks[i]).astype(np.uint8) |
| |
|
| | if combined_cv2_mask.shape == cv2_mask.shape: |
| | combined_cv2_mask = cv2.bitwise_or(combined_cv2_mask, cv2_mask) |
| | else: |
| | |
| | pass |
| |
|
| | mask = torch.from_numpy(combined_cv2_mask) |
| | return mask |
| |
|
| |
|
| | def bitwise_and_masks(mask1, mask2): |
| | mask1 = mask1.cpu() |
| | mask2 = mask2.cpu() |
| | cv2_mask1 = np.array(mask1) |
| | cv2_mask2 = np.array(mask2) |
| |
|
| | if cv2_mask1.shape == cv2_mask2.shape: |
| | cv2_mask = cv2.bitwise_and(cv2_mask1, cv2_mask2) |
| | return torch.from_numpy(cv2_mask) |
| | else: |
| | |
| | return mask1 |
| |
|
| |
|
| | def to_binary_mask(mask, threshold=0): |
| | mask = make_3d_mask(mask) |
| |
|
| | mask = mask.clone().cpu() |
| | mask[mask > threshold] = 1. |
| | mask[mask <= threshold] = 0. |
| | return mask |
| |
|
| |
|
| | def use_gpu_opencv(): |
| | return not config.get_config()['disable_gpu_opencv'] |
| |
|
| |
|
| | def dilate_mask(mask, dilation_factor, iter=1): |
| | if dilation_factor == 0: |
| | return make_2d_mask(mask) |
| |
|
| | mask = make_2d_mask(mask) |
| |
|
| | kernel = np.ones((abs(dilation_factor), abs(dilation_factor)), np.uint8) |
| |
|
| | if use_gpu_opencv(): |
| | mask = cv2.UMat(mask) |
| | kernel = cv2.UMat(kernel) |
| |
|
| | if dilation_factor > 0: |
| | result = cv2.dilate(mask, kernel, iter) |
| | else: |
| | result = cv2.erode(mask, kernel, iter) |
| |
|
| | if use_gpu_opencv(): |
| | return result.get() |
| | else: |
| | return result |
| |
|
| |
|
| | def dilate_masks(segmasks, dilation_factor, iter=1): |
| | if dilation_factor == 0: |
| | return segmasks |
| |
|
| | dilated_masks = [] |
| | kernel = np.ones((abs(dilation_factor), abs(dilation_factor)), np.uint8) |
| |
|
| | if use_gpu_opencv(): |
| | kernel = cv2.UMat(kernel) |
| |
|
| | for i in range(len(segmasks)): |
| | cv2_mask = segmasks[i][1] |
| |
|
| | if use_gpu_opencv(): |
| | cv2_mask = cv2.UMat(cv2_mask) |
| |
|
| | if dilation_factor > 0: |
| | dilated_mask = cv2.dilate(cv2_mask, kernel, iter) |
| | else: |
| | dilated_mask = cv2.erode(cv2_mask, kernel, iter) |
| |
|
| | if use_gpu_opencv(): |
| | dilated_mask = dilated_mask.get() |
| |
|
| | item = (segmasks[i][0], dilated_mask, segmasks[i][2]) |
| | dilated_masks.append(item) |
| |
|
| | return dilated_masks |
| |
|
| | import torch.nn.functional as F |
| | def feather_mask(mask, thickness): |
| | mask = mask.permute(0, 3, 1, 2) |
| |
|
| | |
| | kernel_size = 2 * int(thickness) + 1 |
| | sigma = thickness / 3 |
| | blur_kernel = _gaussian_kernel(kernel_size, sigma).to(mask.device, mask.dtype) |
| |
|
| | |
| | blurred_mask = F.conv2d(mask, blur_kernel.unsqueeze(0).unsqueeze(0), padding=thickness) |
| |
|
| | blurred_mask = blurred_mask.permute(0, 2, 3, 1) |
| |
|
| | return blurred_mask |
| |
|
| | def _gaussian_kernel(kernel_size, sigma): |
| | |
| | kernel = torch.exp(-(torch.arange(kernel_size) - kernel_size // 2)**2 / (2 * sigma**2)) |
| | return kernel / kernel.sum() |
| |
|
| |
|
| | def tensor_gaussian_blur_mask(mask, kernel_size, sigma=10.0): |
| | """Return NHWC torch.Tenser from ndim == 2 or 4 `np.ndarray` or `torch.Tensor`""" |
| | if isinstance(mask, np.ndarray): |
| | mask = torch.from_numpy(mask) |
| |
|
| | if mask.ndim == 2: |
| | mask = mask[None, ..., None] |
| | elif mask.ndim == 3: |
| | mask = mask[..., None] |
| |
|
| | _tensor_check_mask(mask) |
| |
|
| | if kernel_size <= 0: |
| | return mask |
| |
|
| | kernel_size = kernel_size*2+1 |
| |
|
| | shortest = min(mask.shape[1], mask.shape[2]) |
| | if shortest <= kernel_size: |
| | kernel_size = int(shortest/2) |
| | if kernel_size % 2 == 0: |
| | kernel_size += 1 |
| | if kernel_size < 3: |
| | return mask |
| |
|
| | prev_device = mask.device |
| | device = comfy.model_management.get_torch_device() |
| | mask.to(device) |
| |
|
| | |
| | mask = mask[:, None, ..., 0] |
| | blurred_mask = torchvision.transforms.GaussianBlur(kernel_size=kernel_size, sigma=sigma)(mask) |
| | blurred_mask = blurred_mask[:, 0, ..., None] |
| |
|
| | blurred_mask.to(prev_device) |
| |
|
| | return blurred_mask |
| |
|
| |
|
| | def subtract_masks(mask1, mask2): |
| | mask1 = mask1.cpu() |
| | mask2 = mask2.cpu() |
| | cv2_mask1 = np.array(mask1) * 255 |
| | cv2_mask2 = np.array(mask2) * 255 |
| |
|
| | if cv2_mask1.shape == cv2_mask2.shape: |
| | cv2_mask = cv2.subtract(cv2_mask1, cv2_mask2) |
| | return torch.clamp(torch.from_numpy(cv2_mask) / 255.0, min=0, max=1) |
| | else: |
| | |
| | return mask1 |
| |
|
| |
|
| | def add_masks(mask1, mask2): |
| | mask1 = mask1.cpu() |
| | mask2 = mask2.cpu() |
| | cv2_mask1 = np.array(mask1) * 255 |
| | cv2_mask2 = np.array(mask2) * 255 |
| |
|
| | if cv2_mask1.shape == cv2_mask2.shape: |
| | cv2_mask = cv2.add(cv2_mask1, cv2_mask2) |
| | return torch.clamp(torch.from_numpy(cv2_mask) / 255.0, min=0, max=1) |
| | else: |
| | |
| | return mask1 |
| |
|
| |
|
| | def normalize_region(limit, startp, size): |
| | if startp < 0: |
| | new_endp = min(limit, size) |
| | new_startp = 0 |
| | elif startp + size > limit: |
| | new_startp = max(0, limit - size) |
| | new_endp = limit |
| | else: |
| | new_startp = startp |
| | new_endp = min(limit, startp+size) |
| |
|
| | return int(new_startp), int(new_endp) |
| |
|
| |
|
| | def make_crop_region(w, h, bbox, crop_factor, crop_min_size=None): |
| | x1 = bbox[0] |
| | y1 = bbox[1] |
| | x2 = bbox[2] |
| | y2 = bbox[3] |
| |
|
| | bbox_w = x2 - x1 |
| | bbox_h = y2 - y1 |
| |
|
| | crop_w = bbox_w * crop_factor |
| | crop_h = bbox_h * crop_factor |
| |
|
| | if crop_min_size is not None: |
| | crop_w = max(crop_min_size, crop_w) |
| | crop_h = max(crop_min_size, crop_h) |
| |
|
| | kernel_x = x1 + bbox_w / 2 |
| | kernel_y = y1 + bbox_h / 2 |
| |
|
| | new_x1 = int(kernel_x - crop_w / 2) |
| | new_y1 = int(kernel_y - crop_h / 2) |
| |
|
| | |
| | new_x1, new_x2 = normalize_region(w, new_x1, crop_w) |
| | new_y1, new_y2 = normalize_region(h, new_y1, crop_h) |
| |
|
| | return [new_x1, new_y1, new_x2, new_y2] |
| |
|
| |
|
| | def crop_ndarray4(npimg, crop_region): |
| | x1 = crop_region[0] |
| | y1 = crop_region[1] |
| | x2 = crop_region[2] |
| | y2 = crop_region[3] |
| |
|
| | cropped = npimg[:, y1:y2, x1:x2, :] |
| |
|
| | return cropped |
| |
|
| |
|
| | crop_tensor4 = crop_ndarray4 |
| |
|
| |
|
| | def crop_ndarray3(npimg, crop_region): |
| | x1 = crop_region[0] |
| | y1 = crop_region[1] |
| | x2 = crop_region[2] |
| | y2 = crop_region[3] |
| |
|
| | cropped = npimg[:, y1:y2, x1:x2] |
| |
|
| | return cropped |
| |
|
| |
|
| | def crop_ndarray2(npimg, crop_region): |
| | x1 = crop_region[0] |
| | y1 = crop_region[1] |
| | x2 = crop_region[2] |
| | y2 = crop_region[3] |
| |
|
| | cropped = npimg[y1:y2, x1:x2] |
| |
|
| | return cropped |
| |
|
| |
|
| | def crop_image(image, crop_region): |
| | return crop_tensor4(image, crop_region) |
| |
|
| |
|
| | def to_latent_image(pixels, vae): |
| | x = pixels.shape[1] |
| | y = pixels.shape[2] |
| | if pixels.shape[1] != x or pixels.shape[2] != y: |
| | pixels = pixels[:, :x, :y, :] |
| |
|
| | vae_encode = nodes.VAEEncode() |
| |
|
| | return vae_encode.encode(vae, pixels)[0] |
| |
|
| |
|
| | def empty_pil_tensor(w=64, h=64): |
| | return torch.zeros((1, h, w, 3), dtype=torch.float32) |
| |
|
| |
|
| | def make_2d_mask(mask): |
| | if len(mask.shape) == 4: |
| | return mask.squeeze(0).squeeze(0) |
| |
|
| | elif len(mask.shape) == 3: |
| | return mask.squeeze(0) |
| |
|
| | return mask |
| |
|
| |
|
| | def make_3d_mask(mask): |
| | if len(mask.shape) == 4: |
| | return mask.squeeze(0) |
| |
|
| | elif len(mask.shape) == 2: |
| | return mask.unsqueeze(0) |
| |
|
| | return mask |
| |
|
| |
|
| | def make_4d_mask(mask): |
| | if len(mask.shape) == 3: |
| | return mask.unsqueeze(0) |
| |
|
| | elif len(mask.shape) == 2: |
| | return mask.unsqueeze(0).unsqueeze(0) |
| |
|
| | return mask |
| |
|
| |
|
| | def is_same_device(a, b): |
| | a_device = torch.device(a) if isinstance(a, str) else a |
| | b_device = torch.device(b) if isinstance(b, str) else b |
| | return a_device.type == b_device.type and a_device.index == b_device.index |
| |
|
| |
|
| | def collect_non_reroute_nodes(node_map, links, res, node_id): |
| | if node_map[node_id]['type'] != 'Reroute' and node_map[node_id]['type'] != 'Reroute (rgthree)': |
| | res.append(node_id) |
| | else: |
| | for link in node_map[node_id]['outputs'][0]['links']: |
| | next_node_id = str(links[link][2]) |
| | collect_non_reroute_nodes(node_map, links, res, next_node_id) |
| |
|
| |
|
| | from torchvision.transforms.functional import to_pil_image |
| |
|
| |
|
| | def resize_mask(mask, size): |
| | mask = make_4d_mask(mask) |
| | resized_mask = torch.nn.functional.interpolate(mask, size=size, mode='bilinear', align_corners=False) |
| | return resized_mask.squeeze(0) |
| |
|
| |
|
| | def apply_mask_alpha_to_pil(decoded_pil, mask): |
| | decoded_rgba = decoded_pil.convert('RGBA') |
| | mask_pil = to_pil_image(mask) |
| | decoded_rgba.putalpha(mask_pil) |
| |
|
| | return decoded_rgba |
| |
|
| |
|
| | def try_install_custom_node(custom_node_url, msg): |
| | try: |
| | import cm_global |
| | cm_global.try_call(api='cm.try-install-custom-node', |
| | sender="Impact Pack", custom_node_url=custom_node_url, msg=msg) |
| | except Exception: |
| | print(msg) |
| | print(f"[Impact Pack] ComfyUI-Manager is outdated. The custom node installation feature is not available.") |
| |
|
| |
|
| | |
| | class TautologyStr(str): |
| | def __ne__(self, other): |
| | return False |
| |
|
| |
|
| | class ByPassTypeTuple(tuple): |
| | def __getitem__(self, index): |
| | if index > 0: |
| | index = 0 |
| | item = super().__getitem__(index) |
| | if isinstance(item, str): |
| | return TautologyStr(item) |
| | return item |
| |
|
| |
|
| | class NonListIterable: |
| | def __init__(self, data): |
| | self.data = data |
| |
|
| | def __getitem__(self, index): |
| | return self.data[index] |
| |
|
| |
|
| | def add_folder_path_and_extensions(folder_name, full_folder_paths, extensions): |
| | |
| | for full_folder_path in full_folder_paths: |
| | |
| | folder_paths.add_model_folder_path(folder_name, full_folder_path) |
| |
|
| | |
| | if folder_name in folder_paths.folder_names_and_paths: |
| | |
| | current_paths, current_extensions = folder_paths.folder_names_and_paths[folder_name] |
| | |
| | updated_extensions = current_extensions | extensions |
| | |
| | folder_paths.folder_names_and_paths[folder_name] = (current_paths, updated_extensions) |
| | else: |
| | |
| | |
| | |
| | folder_paths.folder_names_and_paths[folder_name] = (full_folder_paths, extensions) |
| | |
| |
|
| | |
| | class AnyType(str): |
| | def __ne__(self, __value: object) -> bool: |
| | return False |
| |
|
| | any_typ = AnyType("*") |
| |
|