| import sys |
| import time |
|
|
| import execution |
| import impact.impact_server |
| from server import PromptServer |
| from impact.utils import any_typ |
| import impact.core as core |
| import re |
| import nodes |
| import traceback |
|
|
|
|
| class ImpactCompare: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "cmp": (['a = b', 'a <> b', 'a > b', 'a < b', 'a >= b', 'a <= b', 'tt', 'ff'],), |
| "a": (any_typ, ), |
| "b": (any_typ, ), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = ("BOOLEAN", ) |
|
|
| def doit(self, cmp, a, b): |
| if cmp == "a = b": |
| return (a == b, ) |
| elif cmp == "a <> b": |
| return (a != b, ) |
| elif cmp == "a > b": |
| return (a > b, ) |
| elif cmp == "a < b": |
| return (a < b, ) |
| elif cmp == "a >= b": |
| return (a >= b, ) |
| elif cmp == "a <= b": |
| return (a <= b, ) |
| elif cmp == 'tt': |
| return (True, ) |
| else: |
| return (False, ) |
|
|
|
|
| class ImpactNotEmptySEGS: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": {"segs": ("SEGS",)}} |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = ("BOOLEAN", ) |
|
|
| def doit(self, segs): |
| return (segs[1] != [], ) |
|
|
|
|
| class ImpactConditionalBranch: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "cond": ("BOOLEAN",), |
| "tt_value": (any_typ,), |
| "ff_value": (any_typ,), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = (any_typ, ) |
|
|
| def doit(self, cond, tt_value, ff_value): |
| if cond: |
| return (tt_value,) |
| else: |
| return (ff_value,) |
|
|
|
|
| class ImpactConditionalBranchSelMode: |
| @classmethod |
| def INPUT_TYPES(cls): |
| if not core.is_execution_model_version_supported(): |
| required_inputs = { |
| "cond": ("BOOLEAN",), |
| "sel_mode": ("BOOLEAN", {"default": True, "label_on": "select_on_prompt", "label_off": "select_on_execution"}), |
| } |
| else: |
| required_inputs = { |
| "cond": ("BOOLEAN",), |
| } |
|
|
| return { |
| "required": required_inputs, |
| "optional": { |
| "tt_value": (any_typ,), |
| "ff_value": (any_typ,), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = (any_typ, ) |
|
|
| def doit(self, cond, tt_value=None, ff_value=None, **kwargs): |
| print(f'tt={tt_value is None}\nff={ff_value is None}') |
| if cond: |
| return (tt_value,) |
| else: |
| return (ff_value,) |
|
|
|
|
| class ImpactConvertDataType: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": {"value": (any_typ,)}} |
|
|
| RETURN_TYPES = ("STRING", "FLOAT", "INT", "BOOLEAN") |
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic" |
|
|
| @staticmethod |
| def is_number(string): |
| pattern = re.compile(r'^[-+]?[0-9]*\.?[0-9]+$') |
| return bool(pattern.match(string)) |
|
|
| def doit(self, value): |
| if self.is_number(str(value)): |
| num = value |
| else: |
| if str.lower(str(value)) != "false": |
| num = 1 |
| else: |
| num = 0 |
| return (str(value), float(num), int(float(num)), bool(float(num)), ) |
|
|
|
|
| class ImpactIfNone: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": {}, |
| "optional": {"signal": (any_typ,), "any_input": (any_typ,), } |
| } |
|
|
| RETURN_TYPES = (any_typ, "BOOLEAN") |
| RETURN_NAMES = ("signal_opt", "bool") |
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic" |
|
|
| def doit(self, signal=None, any_input=None): |
| if any_input is None: |
| return (signal, False, ) |
| else: |
| return (signal, True, ) |
|
|
|
|
| class ImpactLogicalOperators: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "operator": (['and', 'or', 'xor'],), |
| "bool_a": ("BOOLEAN", {"forceInput": True}), |
| "bool_b": ("BOOLEAN", {"forceInput": True}), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = ("BOOLEAN", ) |
|
|
| def doit(self, operator, bool_a, bool_b): |
| if operator == "and": |
| return (bool_a and bool_b, ) |
| elif operator == "or": |
| return (bool_a or bool_b, ) |
| else: |
| return (bool_a != bool_b, ) |
|
|
|
|
| class ImpactConditionalStopIteration: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { "cond": ("BOOLEAN", {"forceInput": True}), }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = () |
|
|
| OUTPUT_NODE = True |
|
|
| def doit(self, cond): |
| if cond: |
| PromptServer.instance.send_sync("stop-iteration", {}) |
| return {} |
|
|
|
|
| class ImpactNeg: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { "value": ("BOOLEAN", {"forceInput": True}), }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = ("BOOLEAN", ) |
|
|
| def doit(self, value): |
| return (not value, ) |
|
|
|
|
| class ImpactInt: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "value": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = ("INT", ) |
|
|
| def doit(self, value): |
| return (value, ) |
|
|
|
|
| class ImpactFloat: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "value": ("FLOAT", {"default": 1.0, "min": -3.402823466e+38, "max": 3.402823466e+38}), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = ("FLOAT", ) |
|
|
| def doit(self, value): |
| return (value, ) |
|
|
|
|
| class ImpactValueSender: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "value": (any_typ, ), |
| "link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), |
| }, |
| "optional": { |
| "signal_opt": (any_typ,), |
| } |
| } |
|
|
| OUTPUT_NODE = True |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = (any_typ, ) |
| RETURN_NAMES = ("signal", ) |
|
|
| def doit(self, value, link_id=0, signal_opt=None): |
| PromptServer.instance.send_sync("value-send", {"link_id": link_id, "value": value}) |
| return (signal_opt, ) |
|
|
|
|
| class ImpactIntConstSender: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "signal": (any_typ, ), |
| "value": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), |
| "link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), |
| }, |
| } |
|
|
| OUTPUT_NODE = True |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = () |
|
|
| def doit(self, signal, value, link_id=0): |
| PromptServer.instance.send_sync("value-send", {"link_id": link_id, "value": value}) |
| return {} |
|
|
|
|
| class ImpactValueReceiver: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "typ": (["STRING", "INT", "FLOAT", "BOOLEAN"], ), |
| "value": ("STRING", {"default": ""}), |
| "link_id": ("INT", {"default": 0, "min": 0, "max": sys.maxsize, "step": 1}), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic" |
|
|
| RETURN_TYPES = (any_typ, ) |
|
|
| def doit(self, typ, value, link_id=0): |
| if typ == "INT": |
| return (int(value), ) |
| elif typ == "FLOAT": |
| return (float(value), ) |
| elif typ == "BOOLEAN": |
| return (value.lower() == "true", ) |
| else: |
| return (value, ) |
|
|
|
|
| class ImpactImageInfo: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "value": ("IMAGE", ), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
|
|
| RETURN_TYPES = ("INT", "INT", "INT", "INT") |
| RETURN_NAMES = ("batch", "height", "width", "channel") |
|
|
| def doit(self, value): |
| return (value.shape[0], value.shape[1], value.shape[2], value.shape[3]) |
|
|
|
|
| class ImpactLatentInfo: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "value": ("LATENT", ), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
|
|
| RETURN_TYPES = ("INT", "INT", "INT", "INT") |
| RETURN_NAMES = ("batch", "height", "width", "channel") |
|
|
| def doit(self, value): |
| shape = value['samples'].shape |
| return (shape[0], shape[2] * 8, shape[3] * 8, shape[1]) |
|
|
|
|
| class ImpactMinMax: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "mode": ("BOOLEAN", {"default": True, "label_on": "max", "label_off": "min"}), |
| "a": (any_typ,), |
| "b": (any_typ,), |
| }, |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
|
|
| RETURN_TYPES = ("INT", ) |
|
|
| def doit(self, mode, a, b): |
| if mode: |
| return (max(a, b), ) |
| else: |
| return (min(a, b),) |
|
|
|
|
| class ImpactQueueTrigger: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "signal": (any_typ,), |
| "mode": ("BOOLEAN", {"default": True, "label_on": "Trigger", "label_off": "Don't trigger"}), |
| } |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = (any_typ,) |
| RETURN_NAMES = ("signal_opt",) |
| OUTPUT_NODE = True |
|
|
| def doit(self, signal, mode): |
| if(mode): |
| PromptServer.instance.send_sync("impact-add-queue", {}) |
|
|
| return (signal,) |
|
|
|
|
| class ImpactQueueTriggerCountdown: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "count": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
| "total": ("INT", {"default": 10, "min": 1, "max": 0xffffffffffffffff}), |
| "mode": ("BOOLEAN", {"default": True, "label_on": "Trigger", "label_off": "Don't trigger"}), |
| }, |
| "optional": {"signal": (any_typ,),}, |
| "hidden": {"unique_id": "UNIQUE_ID"} |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = (any_typ, "INT", "INT") |
| RETURN_NAMES = ("signal_opt", "count", "total") |
| OUTPUT_NODE = True |
|
|
| def doit(self, count, total, mode, unique_id, signal=None): |
| if (mode): |
| if count < total - 1: |
| PromptServer.instance.send_sync("impact-node-feedback", |
| {"node_id": unique_id, "widget_name": "count", "type": "int", "value": count+1}) |
| PromptServer.instance.send_sync("impact-add-queue", {}) |
| if count >= total - 1: |
| PromptServer.instance.send_sync("impact-node-feedback", |
| {"node_id": unique_id, "widget_name": "count", "type": "int", "value": 0}) |
|
|
| return (signal, count, total) |
|
|
|
|
|
|
| class ImpactSetWidgetValue: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "signal": (any_typ,), |
| "node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
| "widget_name": ("STRING", {"multiline": False}), |
| }, |
| "optional": { |
| "boolean_value": ("BOOLEAN", {"forceInput": True}), |
| "int_value": ("INT", {"forceInput": True}), |
| "float_value": ("FLOAT", {"forceInput": True}), |
| "string_value": ("STRING", {"forceInput": True}), |
| } |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = (any_typ,) |
| RETURN_NAMES = ("signal_opt",) |
| OUTPUT_NODE = True |
|
|
| def doit(self, signal, node_id, widget_name, boolean_value=None, int_value=None, float_value=None, string_value=None, ): |
| kind = None |
| if boolean_value is not None: |
| value = boolean_value |
| kind = "BOOLEAN" |
| elif int_value is not None: |
| value = int_value |
| kind = "INT" |
| elif float_value is not None: |
| value = float_value |
| kind = "FLOAT" |
| elif string_value is not None: |
| value = string_value |
| kind = "STRING" |
| else: |
| value = None |
|
|
| if value is not None: |
| PromptServer.instance.send_sync("impact-node-feedback", |
| {"node_id": node_id, "widget_name": widget_name, "type": kind, "value": value}) |
|
|
| return (signal,) |
|
|
|
|
| class ImpactNodeSetMuteState: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "signal": (any_typ,), |
| "node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
| "set_state": ("BOOLEAN", {"default": True, "label_on": "active", "label_off": "mute"}), |
| } |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = (any_typ,) |
| RETURN_NAMES = ("signal_opt",) |
| OUTPUT_NODE = True |
|
|
| def doit(self, signal, node_id, set_state): |
| PromptServer.instance.send_sync("impact-node-mute-state", {"node_id": node_id, "is_active": set_state}) |
| return (signal,) |
|
|
|
|
| class ImpactSleep: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "signal": (any_typ,), |
| "seconds": ("FLOAT", {"default": 0.5, "min": 0, "max": 3600}), |
| } |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = (any_typ,) |
| RETURN_NAMES = ("signal_opt",) |
| OUTPUT_NODE = True |
|
|
| def doit(self, signal, seconds): |
| time.sleep(seconds) |
| return (signal,) |
|
|
|
|
| error_skip_flag = False |
| try: |
| import cm_global |
| def filter_message(str): |
| global error_skip_flag |
|
|
| if "IMPACT-PACK-SIGNAL: STOP CONTROL BRIDGE" in str: |
| return True |
| elif error_skip_flag and "ERROR:root:!!! Exception during processing !!!\n" == str: |
| error_skip_flag = False |
| return True |
| else: |
| return False |
|
|
| cm_global.try_call(api='cm.register_message_collapse', f=filter_message) |
|
|
| except Exception as e: |
| print(f"[WARN] ComfyUI-Impact-Pack: `ComfyUI` or `ComfyUI-Manager` is an outdated version.") |
| pass |
|
|
|
|
| def workflow_to_map(workflow): |
| nodes = {} |
| links = {} |
| for link in workflow['links']: |
| links[link[0]] = link[1:] |
| for node in workflow['nodes']: |
| nodes[str(node['id'])] = node |
|
|
| return nodes, links |
|
|
|
|
| class ImpactRemoteBoolean: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
| "widget_name": ("STRING", {"multiline": False}), |
| "value": ("BOOLEAN", {"default": True, "label_on": "True", "label_off": "False"}), |
| }} |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = () |
| OUTPUT_NODE = True |
|
|
| def doit(self, **kwargs): |
| return {} |
|
|
|
|
| class ImpactRemoteInt: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "node_id": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
| "widget_name": ("STRING", {"multiline": False}), |
| "value": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff}), |
| }} |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = () |
| OUTPUT_NODE = True |
|
|
| def doit(self, **kwargs): |
| return {} |
|
|
| class ImpactControlBridge: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "value": (any_typ,), |
| "mode": ("BOOLEAN", {"default": True, "label_on": "Active", "label_off": "Mute/Bypass"}), |
| "behavior": ("BOOLEAN", {"default": True, "label_on": "Mute", "label_off": "Bypass"}), |
| }, |
| "hidden": {"unique_id": "UNIQUE_ID", "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"} |
| } |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Logic/_for_test" |
| RETURN_TYPES = (any_typ,) |
| RETURN_NAMES = ("value",) |
| OUTPUT_NODE = True |
|
|
| @classmethod |
| def IS_CHANGED(self, value, mode, behavior=True, unique_id=None, prompt=None, extra_pnginfo=None): |
| |
| |
| try: |
| workflow = core.current_prompt['extra_data']['extra_pnginfo']['workflow'] |
| except: |
| print(f"[Impact Pack] core.current_prompt['extra_data']['extra_pnginfo']['workflow']") |
| return 0 |
|
|
| nodes, links = workflow_to_map(workflow) |
| next_nodes = [] |
|
|
| for link in nodes[unique_id]['outputs'][0]['links']: |
| node_id = str(links[link][2]) |
| impact.utils.collect_non_reroute_nodes(nodes, links, next_nodes, node_id) |
|
|
| return next_nodes |
|
|
| def doit(self, value, mode, behavior=True, unique_id=None, prompt=None, extra_pnginfo=None): |
| global error_skip_flag |
|
|
| workflow_nodes, links = workflow_to_map(extra_pnginfo['workflow']) |
|
|
| active_nodes = [] |
| mute_nodes = [] |
| bypass_nodes = [] |
|
|
| for link in workflow_nodes[unique_id]['outputs'][0]['links']: |
| node_id = str(links[link][2]) |
|
|
| next_nodes = [] |
| impact.utils.collect_non_reroute_nodes(workflow_nodes, links, next_nodes, node_id) |
|
|
| for next_node_id in next_nodes: |
| node_mode = workflow_nodes[next_node_id]['mode'] |
|
|
| if node_mode == 0: |
| active_nodes.append(next_node_id) |
| elif node_mode == 2: |
| mute_nodes.append(next_node_id) |
| elif node_mode == 4: |
| bypass_nodes.append(next_node_id) |
|
|
| if mode: |
| |
| should_be_active_nodes = mute_nodes + bypass_nodes |
| if len(should_be_active_nodes) > 0: |
| PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'actives': list(should_be_active_nodes)}) |
| nodes.interrupt_processing() |
|
|
| elif behavior: |
| |
| should_be_mute_nodes = active_nodes + bypass_nodes |
| if len(should_be_mute_nodes) > 0: |
| PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'mutes': list(should_be_mute_nodes)}) |
| nodes.interrupt_processing() |
|
|
| else: |
| |
| should_be_bypass_nodes = active_nodes + mute_nodes |
| if len(should_be_bypass_nodes) > 0: |
| PromptServer.instance.send_sync("impact-bridge-continue", {"node_id": unique_id, 'bypasses': list(should_be_bypass_nodes)}) |
| nodes.interrupt_processing() |
|
|
| return (value, ) |
|
|
|
|
| class ImpactExecutionOrderController: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return {"required": { |
| "signal": (any_typ,), |
| "value": (any_typ,), |
| }} |
|
|
| FUNCTION = "doit" |
|
|
| CATEGORY = "ImpactPack/Util" |
| RETURN_TYPES = (any_typ, any_typ) |
| RETURN_NAMES = ("signal", "value") |
|
|
| def doit(self, signal, value): |
| return signal, value |
|
|
|
|
| original_handle_execution = execution.PromptExecutor.handle_execution_error |
|
|
|
|
| def handle_execution_error(**kwargs): |
| execution.PromptExecutor.handle_execution_error(**kwargs) |
|
|
|
|