| import os |
| import sys |
| import json |
| import inspect |
| import importlib.util |
| import time |
| from pathlib import Path |
|
|
| class NodeScanner: |
| def __init__(self, comfy_path): |
| self.comfy_path = os.path.abspath(comfy_path) |
| if self.comfy_path not in sys.path: |
| sys.path.append(self.comfy_path) |
| |
| |
| self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| |
| |
| self.cache_dir = os.path.join(self.base_dir, 'web', 'nodes') |
| self.cache_file = os.path.join(self.cache_dir, 'node_definitions.txt') |
| |
| print(f"\nScanner initialized with:") |
| print(f"Base directory: {self.base_dir}") |
| print(f"Cache directory: {self.cache_dir}") |
| print(f"Cache file: {self.cache_file}") |
| |
| self.ensure_cache_dir() |
|
|
| def ensure_cache_dir(self): |
| try: |
| |
| web_dir = os.path.join(self.base_dir, 'web') |
| if not os.path.exists(web_dir): |
| os.makedirs(web_dir) |
| print(f"Created web directory: {web_dir}") |
| |
| |
| if not os.path.exists(self.cache_dir): |
| os.makedirs(self.cache_dir) |
| print(f"Created cache directory: {self.cache_dir}") |
| |
| |
| test_file = os.path.join(self.cache_dir, 'test_write.tmp') |
| with open(test_file, 'w') as f: |
| f.write('test') |
| os.remove(test_file) |
| print(f"Cache directory is writable: {self.cache_dir}") |
| |
| return True |
| except Exception as e: |
| print(f"ERROR: Failed to create directory structure:") |
| print(f" - {str(e)}") |
| print(f" - Base dir exists: {os.path.exists(self.base_dir)}") |
| print(f" - Web dir exists: {os.path.exists(web_dir if 'web_dir' in locals() else 'N/A')}") |
| print(f" - Cache dir exists: {os.path.exists(self.cache_dir)}") |
| print(f" - Have write permission: {os.access(self.base_dir, os.W_OK)}") |
| print(f" - Current working directory: {os.getcwd()}") |
| return False |
|
|
| def scan_nodes(self): |
| if not os.path.exists(self.comfy_path): |
| print(f"ERROR: ComfyUI path does not exist: {self.comfy_path}") |
| return {} |
|
|
| try: |
| nodes = {} |
| total_found = 0 |
| custom_nodes_path = os.path.join(self.comfy_path, 'custom_nodes') |
| |
| if not os.path.exists(custom_nodes_path): |
| print(f"ERROR: Custom nodes directory not found: {custom_nodes_path}") |
| return {} |
| |
| print(f"\nScanning custom nodes in: {custom_nodes_path}") |
| |
| for root, dirs, files in os.walk(custom_nodes_path): |
| |
| if 'scanner.py' in files: |
| files.remove('scanner.py') |
| dirs[:] = [d for d in dirs if d not in {'__pycache__', '.git', 'web'}] |
| |
| for file in files: |
| if file.endswith('.py'): |
| try: |
| found_nodes = self.extract_node_info(os.path.join(root, file)) |
| if found_nodes: |
| nodes.update(found_nodes) |
| total_found += len(found_nodes) |
| print(f"Found {len(found_nodes)} nodes in {file}") |
| except Exception as e: |
| if not str(e).startswith("No module named"): |
| print(f"Error processing {file}: {str(e)}") |
| |
| print(f"\nScan complete. Found {total_found} nodes.") |
| if nodes and self.ensure_cache_dir(): |
| self.cache_definitions(nodes) |
| return nodes |
| |
| except Exception as e: |
| print(f"ERROR during scanning: {str(e)}") |
| return {} |
|
|
| def extract_node_info(self, file_path): |
| """Extract node information by reading the file as text""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| |
| |
| nodes = {} |
| |
| |
| if 'NODE_CLASS_MAPPINGS' in content: |
| |
| class_blocks = content.split('class ')[1:] |
| |
| for block in class_blocks: |
| try: |
| |
| class_name = block.split('(')[0].strip() |
| |
| |
| category = 'Unknown' |
| if 'CATEGORY' in block: |
| category_line = [l for l in block.split('\n') if 'CATEGORY' in l] |
| if category_line: |
| category = category_line[0].split('=')[1].strip().strip('"\'') |
| |
| |
| description = None |
| if '"""' in block or "'''" in block: |
| doc_start = block.find('"""') if '"""' in block else block.find("'''") |
| if doc_start > -1: |
| doc_end = block.find('"""', doc_start + 3) if '"""' in block else block.find("'''", doc_start + 3) |
| if doc_end > -1: |
| description = block[doc_start+3:doc_end].strip() |
| |
| nodes[class_name] = { |
| 'name': class_name, |
| 'category': category, |
| 'description': description, |
| 'inputs': self.extract_input_types_from_text(block), |
| 'outputs': self.extract_output_types_from_text(block) |
| } |
| except Exception as e: |
| print(f"Error parsing class in {os.path.basename(file_path)}: {str(e)}") |
| continue |
| |
| return nodes |
| |
| except Exception as e: |
| print(f"Error reading {os.path.basename(file_path)}: {str(e)}") |
| return None |
|
|
| def get_input_types(self, node_class): |
| """Extract input types from node class""" |
| try: |
| if hasattr(node_class, 'INPUT_TYPES'): |
| if inspect.ismethod(node_class.INPUT_TYPES): |
| input_types = node_class.INPUT_TYPES() |
| else: |
| input_types = node_class.INPUT_TYPES |
| return input_types |
| except Exception as e: |
| print(f"ERROR getting input types: {str(e)}") |
| return {} |
|
|
| def extract_input_types_from_text(self, class_text): |
| """Extract input types from class text with improved parsing""" |
| inputs = {'required': {}, 'optional': {}} |
| |
| if 'INPUT_TYPES' in class_text: |
| try: |
| |
| input_block = "" |
| start = class_text.find('INPUT_TYPES') |
| block_start = class_text.find('{', start) |
| |
| if block_start > -1: |
| |
| brace_count = 1 |
| pos = block_start + 1 |
| |
| while brace_count > 0 and pos < len(class_text): |
| if class_text[pos] == '{': |
| brace_count += 1 |
| elif class_text[pos] == '}': |
| brace_count -= 1 |
| pos += 1 |
| |
| input_block = class_text[block_start:pos] |
| |
| |
| if '"required"' in input_block or "'required'" in input_block: |
| req_section = self._extract_section(input_block, 'required') |
| inputs['required'] = self._parse_input_section(req_section) |
| |
| |
| if '"optional"' in input_block or "'optional'" in input_block: |
| opt_section = self._extract_section(input_block, 'optional') |
| inputs['optional'] = self._parse_input_section(opt_section) |
| |
| except Exception as e: |
| print(f"Error parsing INPUT_TYPES: {str(e)}") |
| |
| return inputs |
|
|
| def _extract_section(self, text, section_name): |
| """Helper to extract a section (required/optional) from INPUT_TYPES text""" |
| try: |
| start = text.find(f'"{section_name}"') |
| if start == -1: |
| start = text.find(f"'{section_name}'") |
| |
| if start > -1: |
| start = text.find('{', start) |
| if start > -1: |
| brace_count = 1 |
| pos = start + 1 |
| |
| while brace_count > 0 and pos < len(text): |
| if text[pos] == '{': |
| brace_count += 1 |
| elif text[pos] == '}': |
| brace_count -= 1 |
| pos += 1 |
| |
| return text[start:pos] |
| except Exception as e: |
| print(f"Error extracting section {section_name}: {str(e)}") |
| return "" |
|
|
| def _parse_input_section(self, section_text): |
| """Helper to parse individual input definitions""" |
| inputs = {} |
| |
| try: |
| |
| lines = section_text.split('\n') |
| for line in lines: |
| line = line.strip() |
| if ':' in line and ('(' in line or '[' in line or '"' in line or "'" in line): |
| |
| name = line.split(':')[0].strip().strip('"\'') |
| |
| |
| type_info = [] |
| |
| |
| if '(' in line: |
| start = line.find('(') |
| end = line.find(')', start) |
| if end > start: |
| types = line[start+1:end].split(',') |
| type_info = [t.strip().strip('"\'') for t in types if t.strip()] |
| |
| elif '[' in line: |
| start = line.find('[') |
| end = line.find(']', start) |
| if end > start: |
| types = line[start+1:end].split(',') |
| type_info = [t.strip().strip('"\'') for t in types if t.strip()] |
| |
| elif '"' in line or "'" in line: |
| parts = line.split(':')[1].split(',')[0].strip() |
| if parts: |
| type_info = [parts.strip().strip('"\'')] |
| |
| if name and type_info: |
| inputs[name] = type_info |
| |
| except Exception as e: |
| print(f"Error parsing input section: {str(e)}") |
| |
| return inputs |
|
|
| def extract_output_types_from_text(self, class_text): |
| """Extract output types from class text""" |
| outputs = { |
| 'return_types': (), |
| 'return_names': None |
| } |
| |
| try: |
| |
| if 'RETURN_TYPES' in class_text: |
| start = class_text.find('RETURN_TYPES') |
| end = class_text.find('\n', start) |
| if end > start: |
| types_text = class_text[start:end] |
| if '=' in types_text: |
| types_str = types_text.split('=')[1].strip().strip('()') |
| outputs['return_types'] = tuple(t.strip().strip('"\'') for t in types_str.split(',') if t.strip()) |
| |
| |
| if 'RETURN_NAMES' in class_text: |
| start = class_text.find('RETURN_NAMES') |
| end = class_text.find('\n', start) |
| if end > start: |
| names_text = class_text[start:end] |
| if '=' in names_text: |
| names_str = names_text.split('=')[1].strip().strip('()') |
| outputs['return_names'] = tuple(n.strip().strip('"\'') for n in names_str.split(',') if n.strip()) |
| |
| except Exception as e: |
| print(f"Error parsing output types: {str(e)}") |
| |
| return outputs |
|
|
| def cache_definitions(self, definitions): |
| if not definitions: |
| print("No definitions to cache - skipping cache write") |
| return False |
| |
| print(f"\nWriting cache file to: {self.cache_file}") |
| try: |
| |
| os.makedirs(os.path.dirname(self.cache_file), exist_ok=True) |
| |
| |
| content = [] |
| for node_name, info in definitions.items(): |
| |
| content.append(f"Node: {node_name}") |
| content.append(f"Category: {info['category']}") |
| if info.get('description'): |
| content.append(f"Description: {info['description']}") |
| |
| |
| content.append("Inputs:") |
| if 'required' in info['inputs']: |
| content.append(" Required:") |
| for name, types in info['inputs']['required'].items(): |
| content.append(f" - {name}: {', '.join(types)}") |
| if 'optional' in info['inputs']: |
| content.append(" Optional:") |
| for name, types in info['inputs']['optional'].items(): |
| content.append(f" - {name}: {', '.join(types)}") |
| |
| |
| content.append("Outputs:") |
| return_types = info['outputs']['return_types'] |
| return_names = info['outputs']['return_names'] |
| if return_names: |
| for t, n in zip(return_types, return_names): |
| content.append(f" - {n} ({t})") |
| else: |
| for t in return_types: |
| content.append(f" - {t}") |
| |
| content.append("-" * 50) |
| |
| |
| temp_file = self.cache_file + '.tmp' |
| with open(temp_file, 'w', encoding='utf-8') as f: |
| f.write('\n'.join(content)) |
| f.flush() |
| os.fsync(f.fileno()) |
| |
| print(f"Temp file written: {temp_file}") |
| print(f"Temp file exists: {os.path.exists(temp_file)}") |
| print(f"Temp file size: {os.path.getsize(temp_file) if os.path.exists(temp_file) else 0}") |
| |
| |
| if os.path.exists(self.cache_file): |
| backup_file = self.cache_file + '.bak' |
| if os.path.exists(backup_file): |
| os.remove(backup_file) |
| os.rename(self.cache_file, backup_file) |
| print(f"Created backup file: {backup_file}") |
| |
| |
| os.rename(temp_file, self.cache_file) |
| |
| |
| if not os.path.exists(self.cache_file): |
| raise Exception(f"File was not created at {self.cache_file}") |
| |
| |
| try: |
| with open(self.cache_file, 'r', encoding='utf-8') as f: |
| test_content = f.read(100) |
| print(f"File is readable. First 100 chars: {test_content[:100]}") |
| except Exception as e: |
| print(f"WARNING: File created but couldn't be read: {str(e)}") |
| |
| |
| comfy_dir = os.path.dirname(os.path.dirname(os.path.dirname(self.base_dir))) |
| print(f"ComfyUI root directory: {comfy_dir}") |
| |
| |
| relative_path = os.path.relpath(self.cache_file, comfy_dir) |
| url_path = '/' + relative_path.replace('\\', '/') |
| print(f"Relative web path for browser access: {url_path}") |
| |
| size = os.path.getsize(self.cache_file) |
| print(f"Cache written successfully ({size:,} bytes)") |
| print(f"Cache file absolute path: {os.path.abspath(self.cache_file)}") |
| return True |
| |
| except Exception as e: |
| print(f"ERROR: Failed to write cache: {str(e)}") |
| |
| print(f" - Temp file exists: {os.path.exists(temp_file if 'temp_file' in locals() else 'N/A')}") |
| print(f" - Cache dir exists: {os.path.exists(os.path.dirname(self.cache_file))}") |
| print(f" - Current working directory: {os.getcwd()}") |
| print(f" - Permission to write: {os.access(os.path.dirname(self.cache_file), os.W_OK)}") |
| |
| if 'temp_file' in locals() and os.path.exists(temp_file): |
| try: |
| os.remove(temp_file) |
| except: |
| pass |
| return False |
|
|
| def node_to_compact_format(self, node_name, info): |
| """Convert node info to compact format""" |
| lines = [] |
| |
| |
| lines.append(f"#N {node_name} #C {info['category']}") |
| |
| |
| input_parts = [] |
| if 'required' in info['inputs']: |
| for name, types in info['inputs']['required'].items(): |
| input_parts.append(f"{name}:{','.join(types)}") |
| if 'optional' in info['inputs']: |
| for name, types in info['inputs']['optional'].items(): |
| input_parts.append(f"{name}:{','.join(types)}?") |
| if input_parts: |
| lines.append(f"#I {' '.join(input_parts)}") |
| |
| |
| if info['outputs']['return_types']: |
| out_types = ','.join(info['outputs']['return_types']) |
| if info['outputs']['return_names']: |
| out_names = ','.join(info['outputs']['return_names']) |
| lines.append(f"#O {out_types} #N {out_names}") |
| else: |
| lines.append(f"#O {out_types}") |
| |
| |
| if info.get('description'): |
| lines.append(f"#D {info['description']}") |
| |
| lines.append("---") |
| return '\n'.join(lines) |
|
|
| def parse_compact_format(self, content): |
| """Parse compact format back into node definitions""" |
| definitions = {} |
| current_node = None |
| current_info = {} |
| |
| for line in content.split('\n'): |
| line = line.strip() |
| if not line or line == '---': |
| if current_node: |
| definitions[current_node] = current_info |
| current_node = None |
| current_info = {} |
| continue |
| |
| parts = line.split() |
| if line.startswith('#N') and '#C' in line: |
| |
| name_idx = line.index('#C') |
| current_node = ' '.join(parts[1:name_idx]).strip() |
| current_info['category'] = ' '.join(parts[name_idx+1:]).strip() |
| current_info['inputs'] = {'required': {}, 'optional': {}} |
| current_info['outputs'] = {'return_types': (), 'return_names': None} |
| |
| elif line.startswith('#I'): |
| |
| for input_def in ' '.join(parts[1:]).split(): |
| name, type_info = input_def.split(':') |
| if type_info.endswith('?'): |
| |
| type_info = type_info[:-1] |
| current_info['inputs']['optional'][name] = type_info.split(',') |
| else: |
| |
| current_info['inputs']['required'][name] = type_info.split(',') |
| |
| elif line.startswith('#O'): |
| |
| if '#N' in line: |
| |
| type_idx = line.index('#N') |
| types = ' '.join(parts[1:type_idx]).strip().split(',') |
| names = ' '.join(parts[type_idx+1:]).strip().split(',') |
| current_info['outputs']['return_types'] = tuple(types) |
| current_info['outputs']['return_names'] = tuple(names) |
| else: |
| |
| current_info['outputs']['return_types'] = tuple(parts[1].split(',')) |
| |
| elif line.startswith('#D'): |
| |
| current_info['description'] = ' '.join(parts[1:]) |
| |
| return definitions |
|
|
| def load_cache(self): |
| """Load cached node definitions from compact format""" |
| try: |
| if os.path.exists(self.cache_file): |
| with open(self.cache_file, 'r') as f: |
| return json.load(f) |
| except Exception as e: |
| print(f"Error loading cache: {str(e)}") |
| return None |
|
|
| def is_cache_valid(self, max_age_hours=24): |
| """Check if cache is valid based on age""" |
| cache_data = self.load_cache() |
| if not cache_data: |
| return False |
| |
| cache_age = time.time() - cache_data['timestamp'] |
| return cache_age < (max_age_hours * 3600) |
|
|
| def get_nodes_for_llm(self): |
| """Get node definitions in LLM-friendly format""" |
| cache_data = self.load_cache() |
| if not cache_data: |
| return "No node definitions available." |
|
|
| llm_text = "Available Custom Nodes:\n\n" |
| for name, info in cache_data['definitions'].items(): |
| llm_text += f"Node: {name}\n" |
| if info['description']: |
| llm_text += f"Description: {info['description']}\n" |
| llm_text += f"Category: {info['category']}\n" |
| |
| |
| llm_text += "Inputs:\n" |
| if 'required' in info['inputs']: |
| for input_name, input_info in info['inputs']['required'].items(): |
| llm_text += f" - {input_name} ({input_info[0]})\n" |
| if 'optional' in info['inputs']: |
| for input_name, input_info in info['inputs']['optional'].items(): |
| llm_text += f" - {input_name} ({input_info[0]}) [Optional]\n" |
| |
| |
| llm_text += "Outputs:\n" |
| return_types = info['outputs']['return_types'] |
| return_names = info['outputs']['return_names'] |
| if return_names: |
| for i, (t, n) in enumerate(zip(return_types, return_names)): |
| llm_text += f" - {n} ({t})\n" |
| else: |
| for t in return_types: |
| llm_text += f" - {t}\n" |
| |
| llm_text += "\n" |
|
|
| return llm_text |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| |
| default_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) |
| comfy_path = sys.argv[1] if len(sys.argv) > 1 else default_path |
| |
| print(f"\n=== NodeScanner CLI ===") |
| print(f"βββ ComfyUI path: {comfy_path}") |
| |
| scanner = NodeScanner(comfy_path) |
| print("βββ Starting node scan...") |
| |
| results = scanner.scan_nodes() |
| |
| if results: |
| print(f"βββ Found {len(results)} nodes") |
| print("\nNode Summary:") |
| for node_name, info in results.items(): |
| print(f"βββ Node: {node_name}") |
| print(f"β βββ Category: {info['category']}") |
| if info.get('description'): |
| print(f"β βββ Description: {info['description']}") |
| else: |
| print("βββ No nodes were found or errors occurred during scanning") |
| |
| except Exception as e: |
| print(f"\nERROR: Script failed with error: {str(e)}") |
| sys.exit(1) |