| | import os |
| | import platform |
| | import subprocess |
| |
|
| | import json |
| | import re |
| |
|
| | import folder_paths |
| | from .logger import logger |
| |
|
| | import torch |
| | import random |
| |
|
| | import numpy as np |
| |
|
| | from PIL import Image |
| | from typing import Dict, List, Optional, Union |
| |
|
| | import mimetypes |
| |
|
| |
|
| | class AnyType(str): |
| | """A special class that is always equal in not equal comparisons. Credit to pythongosssss and rgthree""" |
| |
|
| | def __ne__(self, __value: object) -> bool: |
| | return False |
| |
|
| |
|
| | any = AnyType("*") |
| |
|
| |
|
| | VIDEO_FORMATS_DIRECTORY = os.path.join( |
| | os.path.dirname(os.path.abspath(__file__)), "video_formats" |
| | ) |
| | VIDEO_FORMATS = [] |
| |
|
| | |
| | for filename in os.listdir(VIDEO_FORMATS_DIRECTORY): |
| | filepath = os.path.join(VIDEO_FORMATS_DIRECTORY, filename) |
| | with open(filepath, "r") as file: |
| | |
| | data = json.load(file) |
| | |
| | extension = data.get("extension") |
| | |
| | if extension not in VIDEO_FORMATS: |
| | VIDEO_FORMATS.append(extension) |
| |
|
| | JNODES_IMAGE_FORMAT_TYPES = [ |
| | "jpg", |
| | "jpeg", |
| | "jfif", |
| | "png", |
| | "gif", |
| | "webp", |
| | "apng", |
| | "mjpeg", |
| | ] + VIDEO_FORMATS |
| | JNODES_VAE_LIST = ["Baked VAE"] + folder_paths.get_filename_list("vae") |
| |
|
| | ACCEPTED_UPLOAD_VIDEO_EXTENSIONS = ["webm", "mp4", "mkv", "ogg"] + VIDEO_FORMATS |
| | ACCEPTED_BROWSER_VIDEO_EXTENSIONS = [ |
| | "webm", |
| | "mp4", |
| | "ogg", |
| | ] |
| |
|
| | ACCEPTED_ANIMATED_IMAGE_EXTENSIONS = ["gif", "webp", "apng", "mjpeg"] |
| | ACCEPTED_STILL_IMAGE_EXTENSIONS = ["gif", "webp", "png", "jpg", "jpeg", "jfif"] |
| | ALL_ACCEPTED_IMAGE_EXTENSIONS = ( |
| | ACCEPTED_STILL_IMAGE_EXTENSIONS + ACCEPTED_ANIMATED_IMAGE_EXTENSIONS |
| | ) |
| |
|
| | ALL_ACCEPTED_UPLOAD_VISUAL_EXTENSIONS = ( |
| | ACCEPTED_UPLOAD_VIDEO_EXTENSIONS + ALL_ACCEPTED_IMAGE_EXTENSIONS |
| | ) |
| |
|
| | ALL_ACCEPTED_BROWSER_VISUAL_EXTENSIONS = ( |
| | ACCEPTED_BROWSER_VIDEO_EXTENSIONS + ALL_ACCEPTED_IMAGE_EXTENSIONS |
| | ) |
| |
|
| |
|
| | @staticmethod |
| | def return_random_int(min=1, max=100000): |
| | return random.randint(min, max) |
| |
|
| |
|
| | @staticmethod |
| | def clamp(value, min_val, max_val): |
| | """ |
| | Clamp the 'value' between 'min_val' and 'max_val'. |
| | """ |
| | return max(min(value, max_val), min_val) |
| |
|
| |
|
| | @staticmethod |
| | def map_to_range(value, input_min, input_max, output_min, output_max): |
| |
|
| | |
| | input_range = input_max - input_min |
| |
|
| | |
| | if input_range == 0: |
| | return output_min |
| |
|
| | |
| | normalized_value = (value - input_min) / input_range |
| |
|
| | |
| | if output_min <= output_max: |
| | |
| | output_range = output_max - output_min |
| | mapped_value = output_min + (normalized_value * output_range) |
| | else: |
| | |
| | output_range = output_min - output_max |
| | mapped_value = output_min - (normalized_value * output_range) |
| |
|
| | |
| | if output_min <= output_max: |
| | return max(min(mapped_value, output_max), output_min) |
| | else: |
| | return min(max(mapped_value, output_max), output_min) |
| |
|
| |
|
| | def convert_relative_comfyui_path_to_full_path(relative_path="output"): |
| | try: |
| | path = folder_paths.get_directory_by_type(relative_path) |
| |
|
| | if path: |
| | return path |
| | else: |
| | paths = folder_paths.get_folder_paths(relative_path) |
| |
|
| | if len(paths) > 0: |
| | return paths[0] |
| | except: |
| | pass |
| | |
| | return os.path.join(folder_paths.base_path, relative_path) |
| |
|
| |
|
| | def resolve_file_path(in_file_path): |
| | if os.path.isabs(in_file_path): |
| | return in_file_path |
| | else: |
| | return convert_relative_comfyui_path_to_full_path(in_file_path) |
| |
|
| |
|
| | def highest_common_folder(path1, path2): |
| | |
| | path1_parts = path1.replace("\\", "/").split(os.path.sep) |
| | path2_parts = path2.replace("\\", "/").split(os.path.sep) |
| |
|
| | |
| | min_length = min(len(path1_parts), len(path2_parts)) |
| |
|
| | |
| | common_folder = "" |
| |
|
| | |
| | for i in range(min_length): |
| | if path1_parts[i] == path2_parts[i]: |
| | |
| | common_folder = os.path.join(common_folder, path1_parts[i]) |
| | else: |
| | |
| | break |
| |
|
| | return common_folder |
| |
|
| |
|
| | def make_exclusive_list(original_list, items_to_remove): |
| | return [item for item in original_list if item not in items_to_remove] |
| |
|
| |
|
| | def get_file_extension(filename): |
| | _, extension = os.path.splitext(filename) |
| | return extension.lower() |
| |
|
| |
|
| | def get_file_extension_without_dot(filename): |
| | _, extension = os.path.splitext(filename) |
| | return extension[1:].lower() |
| |
|
| |
|
| | def is_webp(filename): |
| | return get_file_extension_without_dot(filename).lower() == "webp" |
| |
|
| |
|
| | def is_gif(filename): |
| | return get_file_extension_without_dot(filename).lower() == "gif" |
| |
|
| |
|
| | def is_video(filename): |
| | mime_type, _ = mimetypes.guess_type(filename) |
| | return mime_type and mime_type.startswith("video") |
| |
|
| |
|
| | def is_image(filename): |
| | mime_type, _ = mimetypes.guess_type(filename) |
| | return mime_type and mime_type.startswith("image") |
| |
|
| |
|
| | def is_acceptable_image_or_video_for_upload(filename): |
| | return ( |
| | is_image(filename) or is_video(filename) |
| | ) and get_file_extension_without_dot( |
| | filename |
| | ) in ALL_ACCEPTED_UPLOAD_VISUAL_EXTENSIONS |
| |
|
| |
|
| | def is_acceptable_image_or_video_for_browser_display(filename): |
| | return ( |
| | is_image(filename) or is_video(filename) |
| | ) and get_file_extension_without_dot( |
| | filename |
| | ) in ALL_ACCEPTED_BROWSER_VISUAL_EXTENSIONS |
| |
|
| |
|
| | def pil2tensor(image: Union[Image.Image, List[Image.Image]]) -> torch.Tensor: |
| | if isinstance(image, list): |
| | return torch.cat([pil2tensor(img) for img in image], dim=0) |
| |
|
| | return torch.from_numpy(np.array(image).astype(np.float32) / 255.0).unsqueeze(0) |
| |
|
| | def open_file_manager(path): |
| | if not os.path.exists(path): |
| | raise FileNotFoundError(f"The path '{path}' does not exist.") |
| |
|
| | if platform.system() == 'Windows': |
| | |
| | if os.path.isfile(path): |
| | subprocess.run(['explorer', '/select,', path]) |
| | else: |
| | subprocess.run(['explorer', path]) |
| | elif platform.system() == 'Darwin': |
| | |
| | if os.path.isfile(path): |
| | subprocess.run(['open', '-R', path]) |
| | else: |
| | subprocess.run(['open', path]) |
| | elif platform.system() == 'Linux': |
| | |
| | if os.path.isfile(path): |
| | subprocess.run(['xdg-open', os.path.dirname(path)]) |
| | else: |
| | subprocess.run(['xdg-open', path]) |
| | else: |
| | raise OSError("Unsupported operating system") |
| |
|
| | def search_and_replace_from_dict( |
| | text, replacement_dict: Dict, consider_special_characters=True |
| | ): |
| |
|
| | def replace(match_text): |
| | return replacement_dict[match_text.group(0)] |
| |
|
| | if consider_special_characters: |
| | return re.sub( |
| | "|".join(map(re.escape, replacement_dict.keys())), replace, text |
| | ).strip() |
| | else: |
| | return re.sub( |
| | "|".join(r"\b%s\b" % re.escape(s) for s in replacement_dict.keys()), |
| | replace, |
| | text, |
| | ).strip() |
| |
|
| |
|
| | |
| | def is_pil_image(obj): |
| | return isinstance(obj, Image.Image) |
| |
|
| |
|
| | |
| | def is_torch_tensor(obj): |
| | return isinstance(obj, torch.Tensor) |
| |
|