Spaces:
Configuration error
Configuration error
| from typing import Iterator, List, Tuple, Dict, Any, Union, Optional | |
| from _decimal import Context, getcontext | |
| from decimal import Decimal | |
| from .libs.utils import AlwaysEqualProxy, ByPassTypeTuple, cleanGPUUsedForce, compare_revision | |
| from .libs.cache import remove_cache | |
| import numpy as np | |
| import re | |
| import json | |
| import torch | |
| import comfy.utils | |
| DEFAULT_FLOW_NUM = 2 | |
| MAX_FLOW_NUM = 10 | |
| lazy_options = {"lazy": True} if compare_revision(2543) else {} | |
| def validate_list_args(args: Dict[str, List[Any]]) -> Tuple[bool, Optional[str], Optional[str]]: | |
| """ | |
| Checks that if there are multiple arguments, they are all the same length or 1 | |
| :param args: | |
| :return: Tuple (Status, mismatched_key_1, mismatched_key_2) | |
| """ | |
| # Only have 1 arg | |
| if len(args) == 1: | |
| return True, None, None | |
| len_to_match = None | |
| matched_arg_name = None | |
| for arg_name, arg in args.items(): | |
| if arg_name == 'self': | |
| # self is in locals() | |
| continue | |
| if len(arg) != 1: | |
| if len_to_match is None: | |
| len_to_match = len(arg) | |
| matched_arg_name = arg_name | |
| elif len(arg) != len_to_match: | |
| return False, arg_name, matched_arg_name | |
| return True, None, None | |
| def error_if_mismatched_list_args(args: Dict[str, List[Any]]) -> None: | |
| is_valid, failed_key1, failed_key2 = validate_list_args(args) | |
| if not is_valid: | |
| assert failed_key1 is not None | |
| assert failed_key2 is not None | |
| raise ValueError( | |
| f"Mismatched list inputs received. {failed_key1}({len(args[failed_key1])}) !== {failed_key2}({len(args[failed_key2])})" | |
| ) | |
| def zip_with_fill(*lists: Union[List[Any], None]) -> Iterator[Tuple[Any, ...]]: | |
| """ | |
| Zips lists together, but if a list has 1 element, it will be repeated for each element in the other lists. | |
| If a list is None, None will be used for that element. | |
| (Not intended for use with lists of different lengths) | |
| :param lists: | |
| :return: Iterator of tuples of length len(lists) | |
| """ | |
| max_len = max(len(lst) if lst is not None else 0 for lst in lists) | |
| for i in range(max_len): | |
| yield tuple(None if lst is None else (lst[0] if len(lst) == 1 else lst[i]) for lst in lists) | |
| # ---------------------------------------------------------------类型 开始----------------------------------------------------------------------# | |
| # 字符串 | |
| class String: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": {"value": ("STRING", {"default": ""})}, | |
| } | |
| RETURN_TYPES = ("STRING",) | |
| RETURN_NAMES = ("string",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic/Type" | |
| def execute(self, value): | |
| return (value,) | |
| # 整数 | |
| class Int: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": {"value": ("INT", {"default": 0, "min": -999999, "max": 999999,})}, | |
| } | |
| RETURN_TYPES = ("INT",) | |
| RETURN_NAMES = ("int",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic/Type" | |
| def execute(self, value): | |
| return (value,) | |
| # 整数范围 | |
| class RangeInt: | |
| def __init__(self) -> None: | |
| pass | |
| def INPUT_TYPES(s) -> Dict[str, Dict[str, Any]]: | |
| return { | |
| "required": { | |
| "range_mode": (["step", "num_steps"], {"default": "step"}), | |
| "start": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
| "stop": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
| "step": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
| "num_steps": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
| "end_mode": (["Inclusive", "Exclusive"], {"default": "Inclusive"}), | |
| }, | |
| } | |
| RETURN_TYPES = ("INT", "INT") | |
| RETURN_NAMES = ("range", "range_sizes") | |
| INPUT_IS_LIST = True | |
| OUTPUT_IS_LIST = (True, True) | |
| FUNCTION = "build_range" | |
| CATEGORY = "EasyUse/Logic/Type" | |
| def build_range( | |
| self, range_mode, start, stop, step, num_steps, end_mode | |
| ) -> Tuple[List[int], List[int]]: | |
| error_if_mismatched_list_args(locals()) | |
| ranges = [] | |
| range_sizes = [] | |
| for range_mode, e_start, e_stop, e_num_steps, e_step, e_end_mode in zip_with_fill( | |
| range_mode, start, stop, num_steps, step, end_mode | |
| ): | |
| if range_mode == 'step': | |
| if e_end_mode == "Inclusive": | |
| e_stop += 1 | |
| vals = list(range(e_start, e_stop, e_step)) | |
| ranges.extend(vals) | |
| range_sizes.append(len(vals)) | |
| elif range_mode == 'num_steps': | |
| direction = 1 if e_stop > e_start else -1 | |
| if e_end_mode == "Exclusive": | |
| e_stop -= direction | |
| vals = (np.rint(np.linspace(e_start, e_stop, e_num_steps)).astype(int).tolist()) | |
| ranges.extend(vals) | |
| range_sizes.append(len(vals)) | |
| return ranges, range_sizes | |
| # 浮点数 | |
| class Float: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": {"value": ("FLOAT", {"default": 0, "step": 0.01, "min": -999999, "max": 999999,})}, | |
| } | |
| RETURN_TYPES = ("FLOAT",) | |
| RETURN_NAMES = ("float",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic/Type" | |
| def execute(self, value): | |
| return (value,) | |
| # 浮点数范围 | |
| class RangeFloat: | |
| def __init__(self) -> None: | |
| pass | |
| def INPUT_TYPES(s) -> Dict[str, Dict[str, Any]]: | |
| return { | |
| "required": { | |
| "range_mode": (["step", "num_steps"], {"default": "step"}), | |
| "start": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}), | |
| "stop": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}), | |
| "step": ("FLOAT", {"default": 0, "min": -4096, "max": 4096, "step": 0.1}), | |
| "num_steps": ("INT", {"default": 0, "min": -4096, "max": 4096, "step": 1}), | |
| "end_mode": (["Inclusive", "Exclusive"], {"default": "Inclusive"}), | |
| }, | |
| } | |
| RETURN_TYPES = ("FLOAT", "INT") | |
| RETURN_NAMES = ("range", "range_sizes") | |
| INPUT_IS_LIST = True | |
| OUTPUT_IS_LIST = (True, True) | |
| FUNCTION = "build_range" | |
| CATEGORY = "EasyUse/Logic/Type" | |
| def _decimal_range( | |
| range_mode: String, start: Decimal, stop: Decimal, step: Decimal, num_steps: Int, inclusive: bool | |
| ) -> Iterator[float]: | |
| if range_mode == 'step': | |
| ret_val = start | |
| if inclusive: | |
| stop = stop + step | |
| direction = 1 if step > 0 else -1 | |
| while (ret_val - stop) * direction < 0: | |
| yield float(ret_val) | |
| ret_val += step | |
| elif range_mode == 'num_steps': | |
| step = (stop - start) / (num_steps - 1) | |
| direction = 1 if step > 0 else -1 | |
| ret_val = start | |
| for _ in range(num_steps): | |
| if (ret_val - stop) * direction > 0: # Ensure we don't exceed the 'stop' value | |
| break | |
| yield float(ret_val) | |
| ret_val += step | |
| def build_range( | |
| self, | |
| range_mode, | |
| start, | |
| stop, | |
| step, | |
| num_steps, | |
| end_mode, | |
| ) -> Tuple[List[float], List[int]]: | |
| error_if_mismatched_list_args(locals()) | |
| getcontext().prec = 12 | |
| start = [Decimal(s) for s in start] | |
| stop = [Decimal(s) for s in stop] | |
| step = [Decimal(s) for s in step] | |
| ranges = [] | |
| range_sizes = [] | |
| for range_mode, e_start, e_stop, e_step, e_num_steps, e_end_mode in zip_with_fill( | |
| range_mode, start, stop, step, num_steps, end_mode | |
| ): | |
| vals = list( | |
| self._decimal_range(range_mode, e_start, e_stop, e_step, e_num_steps, e_end_mode == 'Inclusive') | |
| ) | |
| ranges.extend(vals) | |
| range_sizes.append(len(vals)) | |
| return ranges, range_sizes | |
| # 布尔 | |
| class Boolean: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": {"value": ("BOOLEAN", {"default": False})}, | |
| } | |
| RETURN_TYPES = ("BOOLEAN",) | |
| RETURN_NAMES = ("boolean",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic/Type" | |
| def execute(self, value): | |
| return (value,) | |
| # ---------------------------------------------------------------开关 开始----------------------------------------------------------------------# | |
| class imageSwitch: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "image_a": ("IMAGE",), | |
| "image_b": ("IMAGE",), | |
| "boolean": ("BOOLEAN", {"default": False}), | |
| } | |
| } | |
| RETURN_TYPES = ("IMAGE",) | |
| FUNCTION = "image_switch" | |
| CATEGORY = "EasyUse/Logic/Switch" | |
| def image_switch(self, image_a, image_b, boolean): | |
| if boolean: | |
| return (image_a, ) | |
| else: | |
| return (image_b, ) | |
| class textSwitch: | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "input": ("INT", {"default": 1, "min": 1, "max": 2}), | |
| }, | |
| "optional": { | |
| "text1": ("STRING", {"forceInput": True}), | |
| "text2": ("STRING", {"forceInput": True}), | |
| } | |
| } | |
| RETURN_TYPES = ("STRING",) | |
| RETURN_NAMES = ("STRING",) | |
| CATEGORY = "EasyUse/Logic/Switch" | |
| FUNCTION = "switch" | |
| def switch(self, input, text1=None, text2=None,): | |
| if input == 1: | |
| return (text1,) | |
| else: | |
| return (text2,) | |
| # ---------------------------------------------------------------Index Switch----------------------------------------------------------------------# | |
| class anythingIndexSwitch: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| inputs = { | |
| "required": { | |
| "index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
| }, | |
| "optional": { | |
| } | |
| } | |
| for i in range(DEFAULT_FLOW_NUM): | |
| inputs["optional"]["value%d" % i] = (AlwaysEqualProxy("*"),lazy_options) | |
| return inputs | |
| RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
| RETURN_NAMES = ("value",) | |
| FUNCTION = "index_switch" | |
| CATEGORY = "EasyUse/Logic/Index Switch" | |
| def check_lazy_status(self, index, **kwargs): | |
| key = "value%d" % index | |
| if kwargs.get(key, None) is None: | |
| return [key] | |
| def index_switch(self, index, **kwargs): | |
| key = "value%d" % index | |
| return (kwargs[key],) | |
| class imageIndexSwitch: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| inputs = { | |
| "required": { | |
| "index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
| }, | |
| "optional": { | |
| } | |
| } | |
| for i in range(DEFAULT_FLOW_NUM): | |
| inputs["optional"]["image%d" % i] = ("IMAGE",lazy_options) | |
| return inputs | |
| RETURN_TYPES = ("IMAGE",) | |
| RETURN_NAMES = ("image",) | |
| FUNCTION = "index_switch" | |
| CATEGORY = "EasyUse/Logic/Index Switch" | |
| def check_lazy_status(self, index, **kwargs): | |
| key = "image%d" % index | |
| if kwargs.get(key, None) is None: | |
| return [key] | |
| def index_switch(self, index, **kwargs): | |
| key = "image%d" % index | |
| return (kwargs[key],) | |
| class textIndexSwitch: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| inputs = { | |
| "required": { | |
| "index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
| }, | |
| "optional": { | |
| } | |
| } | |
| for i in range(DEFAULT_FLOW_NUM): | |
| inputs["optional"]["text%d" % i] = ("STRING",{**lazy_options,"forceInput":True}) | |
| return inputs | |
| RETURN_TYPES = ("STRING",) | |
| RETURN_NAMES = ("text",) | |
| FUNCTION = "index_switch" | |
| CATEGORY = "EasyUse/Logic/Index Switch" | |
| def check_lazy_status(self, index, **kwargs): | |
| key = "text%d" % index | |
| if kwargs.get(key, None) is None: | |
| return [key] | |
| def index_switch(self, index, **kwargs): | |
| key = "text%d" % index | |
| return (kwargs[key],) | |
| class conditioningIndexSwitch: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| inputs = { | |
| "required": { | |
| "index": ("INT", {"default": 0, "min": 0, "max": 9, "step": 1}), | |
| }, | |
| "optional": { | |
| } | |
| } | |
| for i in range(DEFAULT_FLOW_NUM): | |
| inputs["optional"]["cond%d" % i] = ("CONDITIONING",lazy_options) | |
| return inputs | |
| RETURN_TYPES = ("CONDITIONING",) | |
| RETURN_NAMES = ("conditioning",) | |
| FUNCTION = "index_switch" | |
| CATEGORY = "EasyUse/Logic/Index Switch" | |
| def check_lazy_status(self, index, **kwargs): | |
| key = "cond%d" % index | |
| if kwargs.get(key, None) is None: | |
| return [key] | |
| def index_switch(self, index, **kwargs): | |
| key = "cond%d" % index | |
| return (kwargs[key],) | |
| # ---------------------------------------------------------------Math----------------------------------------------------------------------# | |
| class mathIntOperation: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), | |
| "b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), | |
| "operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],), | |
| }, | |
| } | |
| RETURN_TYPES = ("INT",) | |
| FUNCTION = "int_math_operation" | |
| CATEGORY = "EasyUse/Logic/Math" | |
| def int_math_operation(self, a, b, operation): | |
| if operation == "add": | |
| return (a + b,) | |
| elif operation == "subtract": | |
| return (a - b,) | |
| elif operation == "multiply": | |
| return (a * b,) | |
| elif operation == "divide": | |
| return (a // b,) | |
| elif operation == "modulo": | |
| return (a % b,) | |
| elif operation == "power": | |
| return (a ** b,) | |
| class mathFloatOperation: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "a": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}), | |
| "b": ("FLOAT", {"default": 0, "min": -999999999999.0, "max": 999999999999.0, "step": 1}), | |
| "operation": (["==", "!=", "<", ">", "<=", ">="],), | |
| }, | |
| } | |
| RETURN_TYPES = ("BOOLEAN",) | |
| FUNCTION = "float_math_operation" | |
| CATEGORY = "EasyUse/Logic/Math" | |
| def float_math_operation(self, a, b, operation): | |
| if operation == "==": | |
| return (a == b,) | |
| elif operation == "!=": | |
| return (a != b,) | |
| elif operation == "<": | |
| return (a < b,) | |
| elif operation == ">": | |
| return (a > b,) | |
| elif operation == "<=": | |
| return (a <= b,) | |
| elif operation == ">=": | |
| return (a >= b,) | |
| class mathStringOperation: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "a": ("STRING", {"multiline": False}), | |
| "b": ("STRING", {"multiline": False}), | |
| "operation": (["a == b", "a != b", "a IN b", "a MATCH REGEX(b)", "a BEGINSWITH b", "a ENDSWITH b"],), | |
| "case_sensitive": ("BOOLEAN", {"default": True}), | |
| }, | |
| } | |
| RETURN_TYPES = ("BOOLEAN",) | |
| FUNCTION = "string_math_operation" | |
| CATEGORY = "EasyUse/Logic/Math" | |
| def string_math_operation(self, a, b, operation, case_sensitive): | |
| if not case_sensitive: | |
| a = a.lower() | |
| b = b.lower() | |
| if operation == "a == b": | |
| return (a == b,) | |
| elif operation == "a != b": | |
| return (a != b,) | |
| elif operation == "a IN b": | |
| return (a in b,) | |
| elif operation == "a MATCH REGEX(b)": | |
| try: | |
| return (re.match(b, a) is not None,) | |
| except: | |
| return (False,) | |
| elif operation == "a BEGINSWITH b": | |
| return (a.startswith(b),) | |
| elif operation == "a ENDSWITH b": | |
| return (a.endswith(b),) | |
| # ---------------------------------------------------------------Flow----------------------------------------------------------------------# | |
| try: | |
| from comfy_execution.graph_utils import GraphBuilder, is_link | |
| except: | |
| GraphBuilder = None | |
| class whileLoopStart: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| inputs = { | |
| "required": { | |
| "condition": ("BOOLEAN", {"default": True}), | |
| }, | |
| "optional": { | |
| }, | |
| } | |
| for i in range(MAX_FLOW_NUM): | |
| inputs["optional"]["initial_value%d" % i] = ("*",) | |
| return inputs | |
| RETURN_TYPES = ByPassTypeTuple(tuple(["FLOW_CONTROL"] + ["*"] * MAX_FLOW_NUM)) | |
| RETURN_NAMES = ByPassTypeTuple(tuple(["flow"] + ["value%d" % i for i in range(MAX_FLOW_NUM)])) | |
| FUNCTION = "while_loop_open" | |
| CATEGORY = "EasyUse/Logic/While Loop" | |
| def while_loop_open(self, condition, **kwargs): | |
| values = [] | |
| for i in range(MAX_FLOW_NUM): | |
| values.append(kwargs.get("initial_value%d" % i, None)) | |
| return tuple(["stub"] + values) | |
| class whileLoopEnd: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| inputs = { | |
| "required": { | |
| "flow": ("FLOW_CONTROL", {"rawLink": True}), | |
| "condition": ("BOOLEAN", {}), | |
| }, | |
| "optional": { | |
| }, | |
| "hidden": { | |
| "dynprompt": "DYNPROMPT", | |
| "unique_id": "UNIQUE_ID", | |
| } | |
| } | |
| for i in range(MAX_FLOW_NUM): | |
| inputs["optional"]["initial_value%d" % i] = (AlwaysEqualProxy('*'),) | |
| return inputs | |
| RETURN_TYPES = ByPassTypeTuple(tuple([AlwaysEqualProxy('*')] * MAX_FLOW_NUM)) | |
| RETURN_NAMES = ByPassTypeTuple(tuple(["value%d" % i for i in range(MAX_FLOW_NUM)])) | |
| FUNCTION = "while_loop_close" | |
| CATEGORY = "EasyUse/Logic/While Loop" | |
| def explore_dependencies(self, node_id, dynprompt, upstream): | |
| node_info = dynprompt.get_node(node_id) | |
| if "inputs" not in node_info: | |
| return | |
| for k, v in node_info["inputs"].items(): | |
| if is_link(v): | |
| parent_id = v[0] | |
| if parent_id not in upstream: | |
| upstream[parent_id] = [] | |
| self.explore_dependencies(parent_id, dynprompt, upstream) | |
| upstream[parent_id].append(node_id) | |
| def collect_contained(self, node_id, upstream, contained): | |
| if node_id not in upstream: | |
| return | |
| for child_id in upstream[node_id]: | |
| if child_id not in contained: | |
| contained[child_id] = True | |
| self.collect_contained(child_id, upstream, contained) | |
| def while_loop_close(self, flow, condition, dynprompt=None, unique_id=None, **kwargs): | |
| if not condition: | |
| # We're done with the loop | |
| values = [] | |
| for i in range(MAX_FLOW_NUM): | |
| values.append(kwargs.get("initial_value%d" % i, None)) | |
| return tuple(values) | |
| # We want to loop | |
| this_node = dynprompt.get_node(unique_id) | |
| upstream = {} | |
| # Get the list of all nodes between the open and close nodes | |
| self.explore_dependencies(unique_id, dynprompt, upstream) | |
| contained = {} | |
| open_node = flow[0] | |
| self.collect_contained(open_node, upstream, contained) | |
| contained[unique_id] = True | |
| contained[open_node] = True | |
| graph = GraphBuilder() | |
| for node_id in contained: | |
| original_node = dynprompt.get_node(node_id) | |
| node = graph.node(original_node["class_type"], "Recurse" if node_id == unique_id else node_id) | |
| node.set_override_display_id(node_id) | |
| for node_id in contained: | |
| original_node = dynprompt.get_node(node_id) | |
| node = graph.lookup_node("Recurse" if node_id == unique_id else node_id) | |
| for k, v in original_node["inputs"].items(): | |
| if is_link(v) and v[0] in contained: | |
| parent = graph.lookup_node(v[0]) | |
| node.set_input(k, parent.out(v[1])) | |
| else: | |
| node.set_input(k, v) | |
| new_open = graph.lookup_node(open_node) | |
| for i in range(MAX_FLOW_NUM): | |
| key = "initial_value%d" % i | |
| new_open.set_input(key, kwargs.get(key, None)) | |
| my_clone = graph.lookup_node("Recurse") | |
| result = map(lambda x: my_clone.out(x), range(MAX_FLOW_NUM)) | |
| return { | |
| "result": tuple(result), | |
| "expand": graph.finalize(), | |
| } | |
| class forLoopStart: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "total": ("INT", {"default": 1, "min": 1, "max": 100000, "step": 1}), | |
| }, | |
| "optional": { | |
| "initial_value%d" % i: (AlwaysEqualProxy("*"),) for i in range(1, MAX_FLOW_NUM) | |
| }, | |
| "hidden": { | |
| "initial_value0": (AlwaysEqualProxy("*"),), | |
| "prompt": "PROMPT", | |
| "extra_pnginfo": "EXTRA_PNGINFO", | |
| "unique_id": "UNIQUE_ID" | |
| } | |
| } | |
| RETURN_TYPES = ByPassTypeTuple(tuple(["FLOW_CONTROL", "INT"] + [AlwaysEqualProxy("*")] * (MAX_FLOW_NUM - 1))) | |
| RETURN_NAMES = ByPassTypeTuple(tuple(["flow", "index"] + ["value%d" % i for i in range(1, MAX_FLOW_NUM)])) | |
| FUNCTION = "for_loop_start" | |
| CATEGORY = "EasyUse/Logic/For Loop" | |
| def for_loop_start(self, total, prompt=None, extra_pnginfo=None, unique_id=None, **kwargs): | |
| graph = GraphBuilder() | |
| i = 0 | |
| unique_id = unique_id.split('.')[len(unique_id.split('.'))-1] if "." in unique_id else unique_id | |
| node = next((x for x in extra_pnginfo['workflow']['nodes'] if x['id'] == int(unique_id)), None) | |
| if node: | |
| node['properties']['total'] = total | |
| if "initial_value0" in kwargs: | |
| i = kwargs["initial_value0"] | |
| initial_values = {("initial_value%d" % num): kwargs.get("initial_value%d" % num, None) for num in range(1, MAX_FLOW_NUM)} | |
| while_open = graph.node("easy whileLoopStart", condition=total, initial_value0=i, **initial_values) | |
| outputs = [kwargs.get("initial_value%d" % num, None) for num in range(1, MAX_FLOW_NUM)] | |
| return { | |
| "result": tuple(["stub", i] + outputs), | |
| "expand": graph.finalize(), | |
| } | |
| class forLoopEnd: | |
| def __init__(self): | |
| pass | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "flow": ("FLOW_CONTROL", {"rawLink": True}), | |
| }, | |
| "optional": { | |
| "initial_value%d" % i: (AlwaysEqualProxy("*"), {"rawLink": True}) for i in range(1, MAX_FLOW_NUM) | |
| }, | |
| "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, | |
| } | |
| RETURN_TYPES = ByPassTypeTuple(tuple([AlwaysEqualProxy("*")] * (MAX_FLOW_NUM - 1))) | |
| RETURN_NAMES = ByPassTypeTuple(tuple(["value%d" % i for i in range(1, MAX_FLOW_NUM)])) | |
| FUNCTION = "for_loop_end" | |
| CATEGORY = "EasyUse/Logic/For Loop" | |
| def for_loop_end(self, flow, prompt=None, extra_pnginfo=None, my_unique_id=None, **kwargs): | |
| graph = GraphBuilder() | |
| while_open = flow[0] | |
| total = None | |
| if extra_pnginfo: | |
| node = next((x for x in extra_pnginfo['workflow']['nodes'] if x['id'] == int(while_open)), None) | |
| if node: | |
| if 'properties' in node and 'total' in node['properties']: | |
| total = node['properties']['total'] | |
| else: | |
| total = node['widgets_values'][0] if "widgets_values" in node else None | |
| if total is None: | |
| raise Exception("Unable to get parameters for the start of the loop") | |
| sub = graph.node("easy mathInt", operation="add", a=[while_open, 1], b=1) | |
| cond = graph.node("easy compare", a=sub.out(0), b=total, comparison='a < b') | |
| input_values = {("initial_value%d" % i): kwargs.get("initial_value%d" % i, None) for i in | |
| range(1, MAX_FLOW_NUM)} | |
| while_close = graph.node("easy whileLoopEnd", | |
| flow=flow, | |
| condition=cond.out(0), | |
| initial_value0=sub.out(0), | |
| **input_values) | |
| return { | |
| "result": tuple([while_close.out(i) for i in range(1, MAX_FLOW_NUM)]), | |
| "expand": graph.finalize(), | |
| } | |
| COMPARE_FUNCTIONS = { | |
| "a == b": lambda a, b: a == b, | |
| "a != b": lambda a, b: a != b, | |
| "a < b": lambda a, b: a < b, | |
| "a > b": lambda a, b: a > b, | |
| "a <= b": lambda a, b: a <= b, | |
| "a >= b": lambda a, b: a >= b, | |
| } | |
| # 比较 | |
| class Compare: | |
| def INPUT_TYPES(s): | |
| compare_functions = list(COMPARE_FUNCTIONS.keys()) | |
| return { | |
| "required": { | |
| "a": (AlwaysEqualProxy("*"), {"default": 0}), | |
| "b": (AlwaysEqualProxy("*"), {"default": 0}), | |
| "comparison": (compare_functions, {"default": "a == b"}), | |
| }, | |
| } | |
| RETURN_TYPES = ("BOOLEAN",) | |
| RETURN_NAMES = ("boolean",) | |
| FUNCTION = "compare" | |
| CATEGORY = "EasyUse/Logic" | |
| def compare(self, a, b, comparison): | |
| return (COMPARE_FUNCTIONS[comparison](a, b),) | |
| # 判断 | |
| class IfElse: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "boolean": ("BOOLEAN",), | |
| "on_true": (AlwaysEqualProxy("*"), lazy_options), | |
| "on_false": (AlwaysEqualProxy("*"), lazy_options), | |
| }, | |
| } | |
| RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
| RETURN_NAMES = ("*",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic" | |
| def check_lazy_status(self, boolean, on_true=None, on_false=None): | |
| if boolean and on_true is None: | |
| return ["on_true"] | |
| if not boolean and on_false is None: | |
| return ["on_false"] | |
| def execute(self, *args, **kwargs): | |
| return (kwargs['on_true'] if kwargs['boolean'] else kwargs['on_false'],) | |
| #是否为SDXL | |
| from comfy.sdxl_clip import SDXLClipModel, SDXLRefinerClipModel, SDXLClipG | |
| class isNone: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "any": (AlwaysEqualProxy("*"),) | |
| }, | |
| "optional": { | |
| } | |
| } | |
| RETURN_TYPES = ("BOOLEAN",) | |
| RETURN_NAMES = ("boolean",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic" | |
| def execute(self, any): | |
| return (True if any is None else False,) | |
| class isSDXL: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": {}, | |
| "optional": { | |
| "optional_pipe": ("PIPE_LINE",), | |
| "optional_clip": ("CLIP",), | |
| } | |
| } | |
| RETURN_TYPES = ("BOOLEAN",) | |
| RETURN_NAMES = ("boolean",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/Logic" | |
| def execute(self, optional_pipe=None, optional_clip=None): | |
| if optional_pipe is None and optional_clip is None: | |
| raise Exception(f"[ERROR] optional_pipe or optional_clip is missing") | |
| clip = optional_clip if optional_clip is not None else optional_pipe['clip'] | |
| if isinstance(clip.cond_stage_model, (SDXLClipModel, SDXLRefinerClipModel, SDXLClipG)): | |
| return (True,) | |
| else: | |
| return (False,) | |
| #xy矩阵 | |
| class xyAny: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "X": (AlwaysEqualProxy("*"), {}), | |
| "Y": (AlwaysEqualProxy("*"), {}), | |
| "direction": (["horizontal", "vertical"], {"default": "horizontal"}) | |
| } | |
| } | |
| RETURN_TYPES = (AlwaysEqualProxy("*"), AlwaysEqualProxy("*")) | |
| RETURN_NAMES = ("X", "Y") | |
| INPUT_IS_LIST = True | |
| OUTPUT_IS_LIST = (True, True) | |
| CATEGORY = "EasyUse/Logic" | |
| FUNCTION = "to_xy" | |
| def to_xy(self, X, Y, direction): | |
| new_x = list() | |
| new_y = list() | |
| if direction[0] == "horizontal": | |
| for y in Y: | |
| for x in X: | |
| new_x.append(x) | |
| new_y.append(y) | |
| else: | |
| for x in X: | |
| for y in Y: | |
| new_x.append(x) | |
| new_y.append(y) | |
| return (new_x, new_y) | |
| class batchAnything: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "any_1": (AlwaysEqualProxy("*"),{}), | |
| "any_2": (AlwaysEqualProxy("*"),{}) | |
| } | |
| } | |
| RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
| RETURN_NAMES = ("batch",) | |
| FUNCTION = "batch" | |
| CATEGORY = "EasyUse/Logic" | |
| def batch(self, any_1, any_2): | |
| if isinstance(any_1, torch.Tensor) or isinstance(any_2, torch.Tensor): | |
| if any_1 is None: | |
| return (any_2,) | |
| elif any_2 is None: | |
| return (any_1,) | |
| if any_1.shape[1:] != any_2.shape[1:]: | |
| any_2 = comfy.utils.common_upscale(any_2.movedim(-1, 1), any_1.shape[2], any_1.shape[1], "bilinear", "center").movedim(1, -1) | |
| return (torch.cat((any_1, any_2), 0),) | |
| elif isinstance(any_1, (str, float, int)): | |
| if any_2 is None: | |
| return (any_1,) | |
| elif isinstance(any_2, tuple): | |
| return (any_2 + (any_1,),) | |
| return ((any_1, any_2),) | |
| elif isinstance(any_2, (str, float, int)): | |
| if any_1 is None: | |
| return (any_2,) | |
| elif isinstance(any_1, tuple): | |
| return (any_1 + (any_2,),) | |
| return ((any_2, any_1),) | |
| else: | |
| if any_1 is None: | |
| return (any_2,) | |
| elif any_2 is None: | |
| return (any_1,) | |
| return (any_1 + any_2,) | |
| # 转换所有类型 | |
| class convertAnything: | |
| def INPUT_TYPES(s): | |
| return {"required": { | |
| "*": (AlwaysEqualProxy("*"),), | |
| "output_type": (["string", "int", "float", "boolean"], {"default": "string"}), | |
| }} | |
| RETURN_TYPES = ByPassTypeTuple((AlwaysEqualProxy("*"),)) | |
| OUTPUT_NODE = True | |
| FUNCTION = "convert" | |
| CATEGORY = "EasyUse/Logic" | |
| def convert(self, *args, **kwargs): | |
| anything = kwargs['*'] | |
| output_type = kwargs['output_type'] | |
| params = None | |
| if output_type == 'string': | |
| params = str(anything) | |
| elif output_type == 'int': | |
| params = int(anything) | |
| elif output_type == 'float': | |
| params = float(anything) | |
| elif output_type == 'boolean': | |
| params = bool(anything) | |
| return (params,) | |
| # 将所有类型的内容都转成字符串输出 | |
| class showAnything: | |
| def INPUT_TYPES(s): | |
| return {"required": {}, "optional": {"anything": (AlwaysEqualProxy("*"), {}), }, | |
| "hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO", | |
| }} | |
| RETURN_TYPES = () | |
| INPUT_IS_LIST = True | |
| OUTPUT_NODE = True | |
| FUNCTION = "log_input" | |
| CATEGORY = "EasyUse/Logic" | |
| def log_input(self, unique_id=None, extra_pnginfo=None, **kwargs): | |
| values = [] | |
| if "anything" in kwargs: | |
| for val in kwargs['anything']: | |
| try: | |
| if type(val) is str: | |
| values.append(val) | |
| else: | |
| val = json.dumps(val) | |
| values.append(str(val)) | |
| except Exception: | |
| values.append(str(val)) | |
| pass | |
| if not extra_pnginfo: | |
| print("Error: extra_pnginfo is empty") | |
| elif (not isinstance(extra_pnginfo[0], dict) or "workflow" not in extra_pnginfo[0]): | |
| print("Error: extra_pnginfo[0] is not a dict or missing 'workflow' key") | |
| else: | |
| workflow = extra_pnginfo[0]["workflow"] | |
| node = next((x for x in workflow["nodes"] if str(x["id"]) == unique_id[0]), None) | |
| if node: | |
| node["widgets_values"] = [values] | |
| return {"ui": {"text": values}} | |
| class showTensorShape: | |
| def INPUT_TYPES(s): | |
| return {"required": {"tensor": (AlwaysEqualProxy("*"),)}, "optional": {}, | |
| "hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO" | |
| }} | |
| RETURN_TYPES = () | |
| RETURN_NAMES = () | |
| OUTPUT_NODE = True | |
| FUNCTION = "log_input" | |
| CATEGORY = "EasyUse/Logic" | |
| def log_input(self, tensor, unique_id=None, extra_pnginfo=None): | |
| shapes = [] | |
| def tensorShape(tensor): | |
| if isinstance(tensor, dict): | |
| for k in tensor: | |
| tensorShape(tensor[k]) | |
| elif isinstance(tensor, list): | |
| for i in range(len(tensor)): | |
| tensorShape(tensor[i]) | |
| elif hasattr(tensor, 'shape'): | |
| shapes.append(list(tensor.shape)) | |
| tensorShape(tensor) | |
| return {"ui": {"text": shapes}} | |
| class outputToList: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "tuple": (AlwaysEqualProxy("*"), {}), | |
| }, "optional": {}, | |
| } | |
| RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
| RETURN_NAMES = ("list",) | |
| OUTPUT_IS_LIST = (True,) | |
| FUNCTION = "output_to_List" | |
| CATEGORY = "EasyUse/Logic" | |
| def output_to_List(self, tuple): | |
| return (tuple,) | |
| # cleanGpuUsed | |
| class cleanGPUUsed: | |
| def INPUT_TYPES(s): | |
| return {"required": {"anything": (AlwaysEqualProxy("*"), {})}, "optional": {}, | |
| "hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO", | |
| }} | |
| RETURN_TYPES = () | |
| RETURN_NAMES = () | |
| OUTPUT_NODE = True | |
| FUNCTION = "empty_cache" | |
| CATEGORY = "EasyUse/Logic" | |
| def empty_cache(self, anything, unique_id=None, extra_pnginfo=None): | |
| cleanGPUUsedForce() | |
| remove_cache('*') | |
| return () | |
| class clearCacheKey: | |
| def INPUT_TYPES(s): | |
| return {"required": { | |
| "anything": (AlwaysEqualProxy("*"), {}), | |
| "cache_key": ("STRING", {"default": "*"}), | |
| }, "optional": {}, | |
| "hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",} | |
| } | |
| RETURN_TYPES = () | |
| RETURN_NAMES = () | |
| OUTPUT_NODE = True | |
| FUNCTION = "empty_cache" | |
| CATEGORY = "EasyUse/Logic" | |
| def empty_cache(self, anything, cache_name, unique_id=None, extra_pnginfo=None): | |
| remove_cache(cache_name) | |
| return () | |
| class clearCacheAll: | |
| def INPUT_TYPES(s): | |
| return {"required": { | |
| "anything": (AlwaysEqualProxy("*"), {}), | |
| }, "optional": {}, | |
| "hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO",} | |
| } | |
| RETURN_TYPES = () | |
| RETURN_NAMES = () | |
| OUTPUT_NODE = True | |
| FUNCTION = "empty_cache" | |
| CATEGORY = "EasyUse/Logic" | |
| def empty_cache(self, anything, unique_id=None, extra_pnginfo=None): | |
| remove_cache('*') | |
| return () | |
| # Deprecated | |
| class If: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "any": (AlwaysEqualProxy("*"),), | |
| "if": (AlwaysEqualProxy("*"),), | |
| "else": (AlwaysEqualProxy("*"),), | |
| }, | |
| } | |
| RETURN_TYPES = (AlwaysEqualProxy("*"),) | |
| RETURN_NAMES = ("?",) | |
| FUNCTION = "execute" | |
| CATEGORY = "EasyUse/🚫 Deprecated" | |
| DEPRECATED = True | |
| def execute(self, *args, **kwargs): | |
| return (kwargs['if'] if kwargs['any'] else kwargs['else'],) | |
| class poseEditor: | |
| def INPUT_TYPES(s): | |
| return {"required": { | |
| "image": ("STRING", {"default":""}) | |
| }} | |
| FUNCTION = "output_pose" | |
| CATEGORY = "EasyUse/🚫 Deprecated" | |
| DEPRECATED = True | |
| RETURN_TYPES = () | |
| RETURN_NAMES = () | |
| def output_pose(self, image): | |
| return () | |
| class imageToMask: | |
| def INPUT_TYPES(s): | |
| return {"required": { | |
| "image": ("IMAGE",), | |
| "channel": (['red', 'green', 'blue'],), | |
| } | |
| } | |
| RETURN_TYPES = ("MASK",) | |
| FUNCTION = "convert" | |
| CATEGORY = "EasyUse/🚫 Deprecated" | |
| DEPRECATED = True | |
| def convert_to_single_channel(self, image, channel='red'): | |
| from PIL import Image | |
| # Convert to RGB mode to access individual channels | |
| image = image.convert('RGB') | |
| # Extract the desired channel and convert to greyscale | |
| if channel == 'red': | |
| channel_img = image.split()[0].convert('L') | |
| elif channel == 'green': | |
| channel_img = image.split()[1].convert('L') | |
| elif channel == 'blue': | |
| channel_img = image.split()[2].convert('L') | |
| else: | |
| raise ValueError( | |
| "Invalid channel option. Please choose 'red', 'green', or 'blue'.") | |
| # Convert the greyscale channel back to RGB mode | |
| channel_img = Image.merge( | |
| 'RGB', (channel_img, channel_img, channel_img)) | |
| return channel_img | |
| def convert(self, image, channel='red'): | |
| from .libs.image import pil2tensor, tensor2pil | |
| image = self.convert_to_single_channel(tensor2pil(image), channel) | |
| image = pil2tensor(image) | |
| return (image.squeeze().mean(2),) | |
| NODE_CLASS_MAPPINGS = { | |
| "easy string": String, | |
| "easy int": Int, | |
| "easy rangeInt": RangeInt, | |
| "easy float": Float, | |
| "easy rangeFloat": RangeFloat, | |
| "easy boolean": Boolean, | |
| "easy mathString": mathStringOperation, | |
| "easy mathInt": mathIntOperation, | |
| "easy mathFloat": mathFloatOperation, | |
| "easy compare": Compare, | |
| "easy imageSwitch": imageSwitch, | |
| "easy textSwitch": textSwitch, | |
| "easy anythingIndexSwitch": anythingIndexSwitch, | |
| "easy imageIndexSwitch": imageIndexSwitch, | |
| "easy textIndexSwitch": textIndexSwitch, | |
| "easy conditioningIndexSwitch": conditioningIndexSwitch, | |
| "easy whileLoopStart": whileLoopStart, | |
| "easy whileLoopEnd": whileLoopEnd, | |
| "easy forLoopStart": forLoopStart, | |
| "easy forLoopEnd": forLoopEnd, | |
| "easy ifElse": IfElse, | |
| "easy isNone": isNone, | |
| "easy isSDXL": isSDXL, | |
| "easy outputToList": outputToList, | |
| "easy xyAny": xyAny, | |
| "easy batchAnything": batchAnything, | |
| "easy convertAnything": convertAnything, | |
| "easy showAnything": showAnything, | |
| "easy showTensorShape": showTensorShape, | |
| "easy clearCacheKey": clearCacheKey, | |
| "easy clearCacheAll": clearCacheAll, | |
| "easy cleanGpuUsed": cleanGPUUsed, | |
| "easy if": If, | |
| "easy poseEditor": poseEditor, | |
| "easy imageToMask": imageToMask | |
| } | |
| NODE_DISPLAY_NAME_MAPPINGS = { | |
| "easy string": "String", | |
| "easy int": "Int", | |
| "easy rangeInt": "Range(Int)", | |
| "easy float": "Float", | |
| "easy rangeFloat": "Range(Float)", | |
| "easy boolean": "Boolean", | |
| "easy compare": "Compare", | |
| "easy mathString": "Math String", | |
| "easy mathInt": "Math Int", | |
| "easy mathFloat": "Math Float", | |
| "easy imageSwitch": "Image Switch", | |
| "easy textSwitch": "Text Switch", | |
| "easy anythingIndexSwitch": "Any Index Switch", | |
| "easy imageIndexSwitch": "Image Index Switch", | |
| "easy textIndexSwitch": "Text Index Switch", | |
| "easy conditioningIndexSwitch": "Conditioning Index Switch", | |
| "easy whileLoopStart": "While Loop Start", | |
| "easy whileLoopEnd": "While Loop End", | |
| "easy forLoopStart": "For Loop Start", | |
| "easy forLoopEnd": "For Loop End", | |
| "easy ifElse": "If else", | |
| "easy isNone": "Is None", | |
| "easy isSDXL": "Is SDXL", | |
| "easy outputToList": "Output to List", | |
| "easy xyAny": "XYAny", | |
| "easy batchAnything": "Batch Any", | |
| "easy convertAnything": "Convert Any", | |
| "easy showAnything": "Show Any", | |
| "easy showTensorShape": "Show Tensor Shape", | |
| "easy clearCacheKey": "Clear Cache Key", | |
| "easy clearCacheAll": "Clear Cache All", | |
| "easy cleanGpuUsed": "Clean GPU Used", | |
| "easy if": "If (🚫Deprecated)", | |
| "easy poseEditor": "PoseEditor (🚫Deprecated)", | |
| "easy imageToMask": "ImageToMask (🚫Deprecated)" | |
| } |