| | import copy |
| | import glob |
| | import inspect |
| | import json |
| | import os |
| | import random |
| | import sys |
| | import re |
| | from typing import Dict, List, Any, Callable, Tuple, TextIO |
| |
|
| | import black |
| |
|
| |
|
| | from comfyui_to_python_utils import import_custom_nodes, find_path, add_comfyui_directory_to_sys_path, add_extra_model_paths,\ |
| | get_value_at_index, parse_arg, save_image_wrapper |
| |
|
| | PACKAGED_FUNCTIONS = [ |
| | get_value_at_index, |
| | find_path, |
| | add_comfyui_directory_to_sys_path, |
| | add_extra_model_paths, |
| | import_custom_nodes, |
| | save_image_wrapper, |
| | parse_arg |
| | ] |
| |
|
| | add_comfyui_directory_to_sys_path() |
| | from nodes import NODE_CLASS_MAPPINGS |
| | import nodes |
| |
|
| | class FileHandler: |
| | """Handles reading and writing files. |
| | |
| | This class provides methods to read JSON data from an input file and write code to an output file (either file-like objects or string paths). |
| | """ |
| |
|
| | @staticmethod |
| | def read_json_file(file_path: str | TextIO, encoding: str = "utf-8") -> dict: |
| | """ |
| | Reads a JSON file and returns its contents as a dictionary. |
| | |
| | Args: |
| | file_path (str): The path to the JSON file. |
| | |
| | Returns: |
| | dict: The contents of the JSON file as a dictionary. |
| | |
| | Raises: |
| | FileNotFoundError: If the file is not found, it lists all JSON files in the directory of the file path. |
| | ValueError: If the file is not a valid JSON. |
| | """ |
| |
|
| | if hasattr(file_path, "read"): return json.load(file_path) |
| | with open(file_path, 'r', encoding="utf-8") as file: |
| | data = json.load(file) |
| | return data |
| |
|
| | @staticmethod |
| | def write_code_to_file(file_path: str | TextIO, code: str) -> None: |
| | """Write the specified code to a Python file. |
| | |
| | Args: |
| | file_path (str): The path to the Python file. |
| | code (str): The code to write to the file. |
| | |
| | Returns: |
| | None |
| | """ |
| | if isinstance(file_path, str): |
| | |
| | directory = os.path.dirname(file_path) |
| |
|
| | |
| | if directory and not os.path.exists(directory): |
| | os.makedirs(directory) |
| |
|
| | |
| | with open(file_path, 'w', encoding="utf-8") as file: |
| | file.write(code) |
| | else: |
| | file_path.write(code) |
| |
|
| |
|
| | class LoadOrderDeterminer: |
| | """Determine the load order of each key in the provided dictionary. |
| | |
| | This class places the nodes without node dependencies first, then ensures that any node whose |
| | result is used in another node will be added to the list in the order it should be executed. |
| | |
| | Attributes: |
| | data (Dict): The dictionary for which to determine the load order. |
| | node_class_mappings (Dict): Mappings of node classes. |
| | """ |
| |
|
| | def __init__(self, data: Dict, node_class_mappings: Dict): |
| | """Initialize the LoadOrderDeterminer with the given data and node class mappings. |
| | |
| | Args: |
| | data (Dict): The dictionary for which to determine the load order. |
| | node_class_mappings (Dict): Mappings of node classes. |
| | """ |
| | self.data = data |
| | self.node_class_mappings = node_class_mappings |
| | self.visited = {} |
| | self.load_order = [] |
| | self.is_special_function = False |
| |
|
| | def determine_load_order(self) -> List[Tuple[str, Dict, bool]]: |
| | """Determine the load order for the given data. |
| | |
| | Returns: |
| | List[Tuple[str, Dict, bool]]: A list of tuples representing the load order. |
| | """ |
| | self._load_special_functions_first() |
| | self.is_special_function = False |
| | for key in self.data: |
| | if key not in self.visited: |
| | self._dfs(key) |
| | return self.load_order |
| |
|
| | def _dfs(self, key: str) -> None: |
| | """Depth-First Search function to determine the load order. |
| | |
| | Args: |
| | key (str): The key from which to start the DFS. |
| | |
| | Returns: |
| | None |
| | """ |
| | |
| | self.visited[key] = True |
| | inputs = self.data[key]['inputs'] |
| | |
| | for input_key, val in inputs.items(): |
| | |
| | |
| | if isinstance(val, list) and val[0] not in self.visited: |
| | self._dfs(val[0]) |
| | |
| | self.load_order.append((key, self.data[key], self.is_special_function)) |
| |
|
| | def _load_special_functions_first(self) -> None: |
| | """Load functions without dependencies, loaderes, and encoders first. |
| | |
| | Returns: |
| | None |
| | """ |
| | |
| | for key in self.data: |
| | class_def = self.node_class_mappings[self.data[key]['class_type']]() |
| | |
| | if (class_def.CATEGORY == 'loaders' or |
| | class_def.FUNCTION in ['encode'] or |
| | not any(isinstance(val, list) for val in self.data[key]['inputs'].values())): |
| | self.is_special_function = True |
| | |
| | if key not in self.visited: |
| | self._dfs(key) |
| |
|
| |
|
| | class CodeGenerator: |
| | """Generates Python code for a workflow based on the load order. |
| | |
| | Attributes: |
| | node_class_mappings (Dict): Mappings of node classes. |
| | base_node_class_mappings (Dict): Base mappings of node classes. |
| | """ |
| |
|
| | def __init__(self, node_class_mappings: Dict, base_node_class_mappings: Dict, prompt: Dict): |
| | """Initialize the CodeGenerator with given node class mappings. |
| | |
| | Args: |
| | node_class_mappings (Dict): Mappings of node classes. |
| | base_node_class_mappings (Dict): Base mappings of node classes. |
| | """ |
| | self.node_class_mappings = node_class_mappings |
| | self.base_node_class_mappings = base_node_class_mappings |
| | self.prompt = prompt |
| | |
| | def can_be_imported(self, import_name: str): |
| | if import_name in self.base_node_class_mappings.keys(): |
| | if getattr(nodes, import_name, None) is not None: |
| | return True |
| | |
| | return False |
| |
|
| | def generate_workflow(self, load_order: List, queue_size: int = 1) -> str: |
| | """Generate the execution code based on the load order. |
| | |
| | Args: |
| | load_order (List): A list of tuples representing the load order. |
| | filename (str): The name of the Python file to which the code should be saved. |
| | Defaults to 'generated_code_workflow.py'. |
| | queue_size (int): The number of photos that will be created by the script. |
| | |
| | Returns: |
| | str: Generated execution code as a string. |
| | """ |
| | include_prompt_data = False |
| | |
| | import_statements, executed_variables, arg_inputs, special_functions_code, code = set(['NODE_CLASS_MAPPINGS']), {}, [], [], [] |
| | |
| | initialized_objects = {} |
| |
|
| | custom_nodes = False |
| | |
| | for idx, data, is_special_function in load_order: |
| | |
| | inputs, class_type = data['inputs'], data['class_type'] |
| |
|
| | input_types = self.node_class_mappings[class_type].INPUT_TYPES() |
| | missing = [] |
| | for i, input in enumerate(input_types.get("required", {}).keys()): |
| | if input not in inputs: |
| | input_var = f"{input}{len(arg_inputs)+1}" |
| | arg_inputs.append((input_var, f"Argument {i}, input `{input}` for node \\\"{data.get('_meta', {}).get('title', class_type)}\\\" id {idx}")) |
| | print("WARNING: Missing required input", input, "for", class_type) |
| | print("That will be CLI arg " + str(len(arg_inputs))) |
| | missing.append((input, input_var, len(arg_inputs))) |
| |
|
| | class_def = self.node_class_mappings[class_type]() |
| |
|
| | |
| | if class_type not in initialized_objects: |
| | |
| | if class_type == 'PreviewImage': |
| | continue |
| | |
| | class_type, import_statement, class_code = self.get_class_info(class_type) |
| | initialized_objects[class_type] = self.clean_variable_name(class_type) |
| | if self.can_be_imported(class_type): |
| | import_statements.add(import_statement) |
| | if class_type not in self.base_node_class_mappings.keys(): |
| | custom_nodes = True |
| | special_functions_code.append(class_code) |
| |
|
| | |
| | class_def_params = self.get_function_parameters(getattr(class_def, class_def.FUNCTION)) |
| | no_params = class_def_params is None |
| |
|
| | |
| | inputs = {key: value for key, value in inputs.items() if no_params or key in class_def_params} |
| | for input, input_var, arg in missing: |
| | inputs[input] = {"variable_name": f"parse_arg(args." + input_var + ")"} |
| | |
| | if class_def_params is not None: |
| | if 'unique_id' in class_def_params: |
| | inputs['unique_id'] = random.randint(1, 2**64) |
| | if 'prompt' in class_def_params: |
| | inputs["prompt"] = {"variable_name": "PROMPT_DATA"} |
| | include_prompt_data = True |
| |
|
| | |
| | executed_variables[idx] = f'{self.clean_variable_name(class_type)}_{idx}' |
| | inputs = self.update_inputs(inputs, executed_variables) |
| | |
| | if class_type == 'SaveImage': |
| | save_code = self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_special_function, **inputs).strip() |
| | return_code = ['if __name__ != "__main__":', '\treturn dict(' + ', '.join(self.format_arg(key, value) for key, value in inputs.items()) + ')', 'else:', '\t' + save_code] |
| |
|
| | if is_special_function: |
| | special_functions_code.extend(return_code) |
| | else: |
| | code.extend(return_code) |
| | else: |
| | if is_special_function: |
| | special_functions_code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_special_function, **inputs)) |
| | else: |
| | code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_special_function, **inputs)) |
| |
|
| | |
| | final_code = self.assemble_python_code(import_statements, special_functions_code, arg_inputs, code, queue_size, custom_nodes, include_prompt_data) |
| |
|
| | return final_code |
| |
|
| | def create_function_call_code(self, obj_name: str, func: str, variable_name: str, is_special_function: bool, **kwargs) -> str: |
| | """Generate Python code for a function call. |
| | |
| | Args: |
| | obj_name (str): The name of the initialized object. |
| | func (str): The function to be called. |
| | variable_name (str): The name of the variable that the function result should be assigned to. |
| | is_special_function (bool): Determines the code indentation. |
| | **kwargs: The keyword arguments for the function. |
| | |
| | Returns: |
| | str: The generated Python code. |
| | """ |
| | args = ', '.join(self.format_arg(key, value) for key, value in kwargs.items()) |
| |
|
| | |
| | code = f'{variable_name} = {obj_name}.{func}({args})\n' |
| |
|
| | return code |
| |
|
| | def format_arg(self, key: str, value: any) -> str: |
| | """Formats arguments based on key and value. |
| | |
| | Args: |
| | key (str): Argument key. |
| | value (any): Argument value. |
| | |
| | Returns: |
| | str: Formatted argument as a string. |
| | """ |
| | |
| | if isinstance(value, int) and (key == 'noise_seed' or key == 'seed'): |
| | return f'{key}=random.randint(1, 2**64)' |
| | elif isinstance(value, str): |
| | return f'{key}={repr(value)}' |
| | elif isinstance(value, dict) and 'variable_name' in value: |
| | return f'{key}={value["variable_name"]}' |
| | return f'{key}={value}' |
| |
|
| | def assemble_python_code(self, import_statements: set, special_functions_code: List[str], arg_inputs: List[Tuple[str, str]], code: List[str], queue_size: int, custom_nodes=False, include_prompt_data=True) -> str: |
| | """Generates the final code string. |
| | |
| | Args: |
| | import_statements (set): A set of unique import statements. |
| | speical_functions_code (List[str]): A list of special functions code strings. |
| | code (List[str]): A list of code strings. |
| | queue_size (int): Number of photos that will be generated by the script. |
| | custom_nodes (bool): Whether to include custom nodes in the code. |
| | |
| | Returns: |
| | str: Generated final code as a string. |
| | """ |
| | |
| | func_strings = [] |
| | for func in PACKAGED_FUNCTIONS: |
| | func_strings.append(f'\n{inspect.getsource(func)}') |
| | |
| | argparse_code = [f'parser = argparse.ArgumentParser(description="A converted ComfyUI workflow. Required inputs listed below. Values passed should be valid JSON (assumes string if not valid JSON).")'] |
| | for i, (input_name, arg_desc) in enumerate(arg_inputs): |
| | argparse_code.append(f'parser.add_argument("{input_name}", help="{arg_desc} (autogenerated)")\n') |
| | argparse_code.append(f'parser.add_argument("--queue-size", "-q", type=int, default={queue_size}, help="How many times the workflow will be executed (default: {queue_size})")\n') |
| | argparse_code.append('parser.add_argument("--comfyui-directory", "-c", default=None, help="Where to look for ComfyUI (default: current directory)")\n') |
| | argparse_code.append(f'parser.add_argument("--output", "-o", default=None, help="The location to save the output image. Either a file path, a directory, or - for stdout (default: the ComfyUI output directory)")\n') |
| | argparse_code.append(f'parser.add_argument("--disable-metadata", action="store_true", help="Disables writing workflow metadata to the outputs")\n') |
| | argparse_code.append(''' |
| | comfy_args = [sys.argv[0]] |
| | if __name__ == "__main__" and "--" in sys.argv: |
| | idx = sys.argv.index("--") |
| | comfy_args += sys.argv[idx+1:] |
| | sys.argv = sys.argv[:idx] |
| | |
| | args = None |
| | if __name__ == "__main__": |
| | args = parser.parse_args() |
| | sys.argv = comfy_args |
| | if args is not None and args.output is not None and args.output == "-": |
| | ctx = contextlib.redirect_stdout(sys.stderr) |
| | else: |
| | ctx = contextlib.nullcontext() |
| | ''') |
| | |
| | |
| | static_imports = ['import os', 'import random', 'import sys', 'import json', 'import argparse', 'import contextlib', 'from typing import Sequence, Mapping, Any, Union', |
| | 'import torch'] + func_strings + argparse_code |
| | if include_prompt_data: |
| | static_imports.append(f'PROMPT_DATA = json.loads({repr(json.dumps(self.prompt))})') |
| | |
| | if custom_nodes: |
| | static_imports.append(f'\n{inspect.getsource(import_custom_nodes)}\n') |
| | newline_doubletab = '\n\t\t' |
| | newline_tripletab = '\n\t\t\t' |
| | |
| | main_function_code = f""" |
| | _custom_nodes_imported = {str(not custom_nodes)} |
| | _custom_path_added = False |
| | |
| | def main(*func_args, **func_kwargs): |
| | global args, _custom_nodes_imported, _custom_path_added |
| | if __name__ == "__main__": |
| | if args is None: |
| | args = parser.parse_args() |
| | else: |
| | defaults = dict((arg, parser.get_default(arg)) for arg in ['queue_size', 'comfyui_directory', 'output', 'disable_metadata']) |
| | ordered_args = dict(zip({[input_name for input_name, _ in arg_inputs]}, func_args)) |
| | |
| | all_args = dict() |
| | all_args.update(defaults) |
| | all_args.update(ordered_args) |
| | all_args.update(func_kwargs) |
| | |
| | args = argparse.Namespace(**all_args) |
| | |
| | with ctx: |
| | if not _custom_path_added: |
| | add_comfyui_directory_to_sys_path() |
| | add_extra_model_paths() |
| | |
| | _custom_path_added = True |
| | |
| | if not _custom_nodes_imported: |
| | import_custom_nodes() |
| | |
| | _custom_nodes_imported = True |
| | |
| | from nodes import {', '.join([class_name for class_name in import_statements])} |
| | |
| | with torch.inference_mode(), ctx: |
| | {newline_doubletab.join(special_functions_code)} |
| | for q in range(args.queue_size): |
| | {newline_tripletab.join(code)}""".replace(" ", "\t") |
| | |
| | final_code = '\n'.join(static_imports + [main_function_code, '', 'if __name__ == "__main__":', '\tmain()']) |
| | |
| | final_code = black.format_str(final_code, mode=black.Mode()) |
| |
|
| | return final_code |
| | |
| | def get_class_info(self, class_type: str) -> Tuple[str, str, str]: |
| | """Generates and returns necessary information about class type. |
| | |
| | Args: |
| | class_type (str): Class type. |
| | |
| | Returns: |
| | Tuple[str, str, str]: Updated class type, import statement string, class initialization code. |
| | """ |
| | import_statement = class_type |
| | variable_name = self.clean_variable_name(class_type) |
| | before = "" |
| | after = "" |
| | if class_type.strip() == 'SaveImage': |
| | before = 'save_image_wrapper(' + 'ctx, ' |
| | after = ')' |
| | |
| | if self.can_be_imported(class_type): |
| | class_code = f'{variable_name} = {before}{class_type.strip()}{after}()' |
| | else: |
| | class_code = f'{variable_name} = {before}NODE_CLASS_MAPPINGS["{class_type}"]{after}()' |
| |
|
| | return class_type, import_statement, class_code |
| | |
| | @staticmethod |
| | def clean_variable_name(class_type: str) -> str: |
| | """ |
| | Remove any characters from variable name that could cause errors running the Python script. |
| | |
| | Args: |
| | class_type (str): Class type. |
| | |
| | Returns: |
| | str: Cleaned variable name with no special characters or spaces |
| | """ |
| | |
| | clean_name = class_type.lower().strip().replace("-", "_").replace(" ", "_") |
| | |
| | |
| | clean_name = re.sub(r'[^a-z0-9_]', '', clean_name) |
| | |
| | |
| | if clean_name[0].isdigit(): |
| | clean_name = "_" + clean_name |
| | |
| | return clean_name |
| |
|
| | def get_function_parameters(self, func: Callable) -> List: |
| | """Get the names of a function's parameters. |
| | |
| | Args: |
| | func (Callable): The function whose parameters we want to inspect. |
| | |
| | Returns: |
| | List: A list containing the names of the function's parameters. |
| | """ |
| | signature = inspect.signature(func) |
| | parameters = {name: param.default if param.default != param.empty else None |
| | for name, param in signature.parameters.items()} |
| | catch_all = any(param.kind == inspect.Parameter.VAR_KEYWORD for param in signature.parameters.values()) |
| | return list(parameters.keys()) if not catch_all else None |
| |
|
| | def update_inputs(self, inputs: Dict, executed_variables: Dict) -> Dict: |
| | """Update inputs based on the executed variables. |
| | |
| | Args: |
| | inputs (Dict): Inputs dictionary to update. |
| | executed_variables (Dict): Dictionary storing executed variable names. |
| | |
| | Returns: |
| | Dict: Updated inputs dictionary. |
| | """ |
| | for key in inputs.keys(): |
| | if isinstance(inputs[key], list) and inputs[key][0] in executed_variables.keys(): |
| | inputs[key] = {'variable_name': f"get_value_at_index({executed_variables[inputs[key][0]]}, {inputs[key][1]})"} |
| | return inputs |
| | |
| |
|
| | class ComfyUItoPython: |
| | """Main workflow to generate Python code from a workflow_api.json file. |
| | |
| | Attributes: |
| | input_file (str): Path to the input JSON file. |
| | output_file (str): Path to the output Python file. |
| | queue_size (int): The number of photos that will be created by the script. |
| | node_class_mappings (Dict): Mappings of node classes. |
| | base_node_class_mappings (Dict): Base mappings of node classes. |
| | """ |
| |
|
| | def __init__(self, workflow: str = "", input_file: str = "", output_file: (str | TextIO) = "", queue_size: int = 1, node_class_mappings: Dict = NODE_CLASS_MAPPINGS, |
| | needs_init_custom_nodes: bool = False): |
| | """Initialize the ComfyUItoPython class with the given parameters. Exactly one of workflow or input_file must be specified. |
| | |
| | Args: |
| | workflow (str): The workflow's JSON. |
| | input_file (str): Path to the input JSON file. |
| | output_file (str | TextIO): Path to the output file or a file-like object. |
| | queue_size (int): The number of times a workflow will be executed by the script. Defaults to 1. |
| | node_class_mappings (Dict): Mappings of node classes. Defaults to NODE_CLASS_MAPPINGS. |
| | needs_init_custom_nodes (bool): Whether to initialize custom nodes. Defaults to False. |
| | """ |
| | if input_file and workflow: |
| | raise ValueError("Can't provide both input_file and workflow") |
| | elif not input_file and not workflow: |
| | raise ValueError("Needs input_file or workflow") |
| | |
| | if not output_file: |
| | raise ValueError("Needs output_file") |
| | |
| | self.workflow = workflow |
| | self.input_file = input_file |
| | self.output_file = output_file |
| | self.queue_size = queue_size |
| | self.node_class_mappings = node_class_mappings |
| | self.needs_init_custom_nodes = needs_init_custom_nodes |
| | |
| | self.base_node_class_mappings = copy.deepcopy(self.node_class_mappings) |
| | self.execute() |
| |
|
| | def execute(self): |
| | """Execute the main workflow to generate Python code. |
| | |
| | Returns: |
| | None |
| | """ |
| | |
| | if self.needs_init_custom_nodes: |
| | import_custom_nodes() |
| | else: |
| | |
| | self.base_node_class_mappings = {} |
| |
|
| | |
| | if self.input_file: |
| | data = FileHandler.read_json_file(self.input_file) |
| | else: |
| | data = json.loads(self.workflow) |
| |
|
| | |
| | load_order_determiner = LoadOrderDeterminer(data, self.node_class_mappings) |
| | load_order = load_order_determiner.determine_load_order() |
| |
|
| | |
| | code_generator = CodeGenerator(self.node_class_mappings, self.base_node_class_mappings, data) |
| | generated_code = code_generator.generate_workflow(load_order, queue_size=self.queue_size) |
| |
|
| | |
| | FileHandler.write_code_to_file(self.output_file, generated_code) |
| |
|
| | print(f"Code successfully generated and written to {self.output_file}") |
| |
|
| |
|
| | if __name__ == '__main__': |
| | import argparse |
| | |
| | ap = argparse.ArgumentParser(description="Converts a ComfyUI-style workflow.json file to a Python file. Must have been exported with API calls") |
| | |
| | ap.add_argument("workflow", help="The workflow.json file to convert") |
| | ap.add_argument("--output", "-o", default=None, help="The output file (defaults to [input file].py)") |
| | ap.add_argument("--queue-size", "-q", default=1, type=int, help="The queue size per run") |
| | ap.add_argument("--yes", "--overwrite", "-y", action="store_true", help="Overwrite the output file if it exists") |
| | |
| | args = ap.parse_args() |
| | |
| | output = args.output if args.output else args.workflow + ".py" |
| | if os.path.isfile(output): |
| | if not args.yes: |
| | if input("Are you sure you want to overwrite " + output + "?\nY/n").strip().lower() not in ("y", "yes"): |
| | print("Exiting.") |
| | sys.exit(1) |
| |
|
| | |
| | ComfyUItoPython(input_file=args.workflow, output_file=output, queue_size=args.queue_size, needs_init_custom_nodes=True) |
| |
|