| | import ast |
| | import re |
| | import os |
| | import json |
| | from git import Repo |
| | import concurrent |
| | import datetime |
| | import concurrent.futures |
| | import requests |
| | import warnings |
| | import argparse |
| |
|
| | builtin_nodes = set() |
| |
|
| | import sys |
| |
|
| | from urllib.parse import urlparse |
| | from github import Github, Auth |
| | from pathlib import Path |
| | from typing import Set, Dict, Optional |
| |
|
| | |
| | SCANNER_VERSION = "2.0.12" |
| |
|
| | |
| | _extract_nodes_cache: Dict[str, Set[str]] = {} |
| | _extract_nodes_enhanced_cache: Dict[str, Set[str]] = {} |
| | _file_mtime_cache: Dict[Path, float] = {} |
| |
|
| |
|
| | def _get_repo_root(file_path: Path) -> Optional[Path]: |
| | """Find the repository root directory containing .git""" |
| | current = file_path if file_path.is_dir() else file_path.parent |
| | while current != current.parent: |
| | if (current / ".git").exists(): |
| | return current |
| | current = current.parent |
| | return None |
| |
|
| |
|
| | def _get_repo_hash(repo_path: Path) -> str: |
| | """Get git commit hash or fallback identifier""" |
| | git_dir = repo_path / ".git" |
| | if not git_dir.exists(): |
| | return "" |
| |
|
| | try: |
| | |
| | head_file = git_dir / "HEAD" |
| | if head_file.exists(): |
| | head_content = head_file.read_text().strip() |
| | if head_content.startswith("ref:"): |
| | |
| | ref_path = git_dir / head_content[5:].strip() |
| | if ref_path.exists(): |
| | commit_hash = ref_path.read_text().strip() |
| | return commit_hash[:16] |
| | else: |
| | |
| | return head_content[:16] |
| | except: |
| | pass |
| |
|
| | return "" |
| |
|
| |
|
| | def _load_per_repo_cache(repo_path: Path) -> Optional[tuple]: |
| | """Load nodes and metadata from per-repo cache |
| | |
| | Returns: |
| | tuple: (nodes_set, metadata_dict) or None if cache invalid |
| | """ |
| | cache_file = repo_path / ".git" / "nodecache.json" |
| |
|
| | if not cache_file.exists(): |
| | return None |
| |
|
| | try: |
| | with open(cache_file, 'r') as f: |
| | cache_data = json.load(f) |
| |
|
| | |
| | if cache_data.get('scanner_version') != SCANNER_VERSION: |
| | return None |
| |
|
| | |
| | current_hash = _get_repo_hash(repo_path) |
| | if cache_data.get('git_hash') != current_hash: |
| | return None |
| |
|
| | |
| | nodes = cache_data.get('nodes', []) |
| | metadata = cache_data.get('metadata', {}) |
| | return (set(nodes) if nodes else set(), metadata) |
| |
|
| | except: |
| | return None |
| |
|
| |
|
| | def _save_per_repo_cache(repo_path: Path, all_nodes: Set[str], metadata: dict = None): |
| | """Save nodes and metadata to per-repo cache""" |
| | cache_file = repo_path / ".git" / "nodecache.json" |
| |
|
| | if not cache_file.parent.exists(): |
| | return |
| |
|
| | git_hash = _get_repo_hash(repo_path) |
| | cache_data = { |
| | "scanner_version": SCANNER_VERSION, |
| | "git_hash": git_hash, |
| | "scanned_at": datetime.datetime.now().isoformat(), |
| | "nodes": sorted(list(all_nodes)), |
| | "metadata": metadata if metadata else {} |
| | } |
| |
|
| | try: |
| | with open(cache_file, 'w') as f: |
| | json.dump(cache_data, f, indent=2) |
| | except: |
| | pass |
| |
|
| |
|
| | def download_url(url, dest_folder, filename=None): |
| | |
| | if not os.path.exists(dest_folder): |
| | os.makedirs(dest_folder) |
| |
|
| | |
| | if filename is None: |
| | filename = os.path.basename(url) |
| |
|
| | |
| | dest_path = os.path.join(dest_folder, filename) |
| |
|
| | |
| | response = requests.get(url, stream=True) |
| | if response.status_code == 200: |
| | with open(dest_path, 'wb') as file: |
| | for chunk in response.iter_content(chunk_size=1024): |
| | if chunk: |
| | file.write(chunk) |
| | else: |
| | raise Exception(f"Failed to download file from {url}") |
| |
|
| |
|
| | def parse_arguments(): |
| | """Parse command-line arguments""" |
| | parser = argparse.ArgumentParser( |
| | description='ComfyUI Manager Node Scanner', |
| | formatter_class=argparse.RawDescriptionHelpFormatter, |
| | epilog=''' |
| | Examples: |
| | # Standard mode |
| | python3 scanner.py |
| | python3 scanner.py --skip-update |
| | python3 scanner.py --skip-all --force-rescan |
| | |
| | # Scan-only mode |
| | python3 scanner.py --scan-only temp-urls-clean.list |
| | python3 scanner.py --scan-only urls.list --temp-dir /custom/temp |
| | python3 scanner.py --scan-only urls.list --skip-update --force-rescan |
| | ''' |
| | ) |
| |
|
| | parser.add_argument('--scan-only', type=str, metavar='URL_LIST_FILE', |
| | help='Scan-only mode: provide URL list file (one URL per line)') |
| | parser.add_argument('--temp-dir', type=str, metavar='DIR', |
| | help='Temporary directory for cloned repositories') |
| | parser.add_argument('--skip-update', action='store_true', |
| | help='Skip git clone/pull operations') |
| | parser.add_argument('--skip-stat-update', action='store_true', |
| | help='Skip GitHub stats collection') |
| | parser.add_argument('--skip-all', action='store_true', |
| | help='Skip all update operations') |
| | parser.add_argument('--force-rescan', action='store_true', |
| | help='Force rescan all nodes (ignore cache)') |
| |
|
| | |
| | parser.add_argument('temp_dir_positional', nargs='?', metavar='TEMP_DIR', |
| | help='(Legacy) Temporary directory path') |
| |
|
| | args = parser.parse_args() |
| | return args |
| |
|
| |
|
| | |
| | args = None |
| | scan_only_mode = False |
| | url_list_file = None |
| | temp_dir = None |
| | skip_update = False |
| | skip_stat_update = True |
| | g = None |
| |
|
| |
|
| | parse_cnt = 0 |
| |
|
| |
|
| | def extract_nodes(code_text): |
| | global parse_cnt |
| |
|
| | |
| | cache_key = hash(code_text) |
| | if cache_key in _extract_nodes_cache: |
| | return _extract_nodes_cache[cache_key].copy() |
| |
|
| | try: |
| | if parse_cnt % 100 == 0: |
| | print(".", end="", flush=True) |
| | parse_cnt += 1 |
| |
|
| | code_text = re.sub(r'\\[^"\']', '', code_text) |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | warnings.filterwarnings('ignore', category=DeprecationWarning) |
| | parsed_code = ast.parse(code_text) |
| |
|
| | |
| | assignments = (node for node in parsed_code.body if isinstance(node, (ast.Assign, ast.AnnAssign))) |
| |
|
| | for assignment in assignments: |
| | |
| | if isinstance(assignment, ast.AnnAssign): |
| | if isinstance(assignment.target, ast.Name) and assignment.target.id in ['NODE_CONFIG', 'NODE_CLASS_MAPPINGS']: |
| | node_class_mappings = assignment.value |
| | break |
| | |
| | elif isinstance(assignment.targets[0], ast.Name) and assignment.targets[0].id in ['NODE_CONFIG', 'NODE_CLASS_MAPPINGS']: |
| | node_class_mappings = assignment.value |
| | break |
| | else: |
| | node_class_mappings = None |
| |
|
| | if node_class_mappings: |
| | s = set() |
| |
|
| | for key in node_class_mappings.keys: |
| | if key is not None and isinstance(key.value, str): |
| | s.add(key.value.strip()) |
| |
|
| | |
| | _extract_nodes_cache[cache_key] = s |
| | return s |
| | else: |
| | |
| | _extract_nodes_cache[cache_key] = set() |
| | return set() |
| | except: |
| | |
| | _extract_nodes_cache[cache_key] = set() |
| | return set() |
| |
|
| | def extract_nodes_from_repo(repo_path: Path, verbose: bool = False, force_rescan: bool = False) -> tuple: |
| | """ |
| | Extract all nodes and metadata from a repository with per-repo caching. |
| | |
| | Automatically caches results in .git/nodecache.json. |
| | Cache is invalidated when: |
| | - Git commit hash changes |
| | - Scanner version changes |
| | - force_rescan flag is True |
| | |
| | Args: |
| | repo_path: Path to repository root |
| | verbose: If True, print UI-only extension detection messages |
| | force_rescan: If True, ignore cache and force fresh scan |
| | |
| | Returns: |
| | tuple: (nodes_set, metadata_dict) |
| | """ |
| | |
| | repo_path = repo_path.resolve() |
| |
|
| | |
| | if not force_rescan: |
| | cached_result = _load_per_repo_cache(repo_path) |
| | if cached_result is not None: |
| | return cached_result |
| |
|
| | |
| | all_nodes = set() |
| | all_metadata = {} |
| | py_files = list(repo_path.rglob("*.py")) |
| |
|
| | |
| | filtered_files = [] |
| | for f in py_files: |
| | try: |
| | rel_path = f.relative_to(repo_path) |
| | |
| | if '__pycache__' not in str(rel_path) and not any(part.startswith('.') for part in rel_path.parts): |
| | filtered_files.append(f) |
| | except: |
| | continue |
| | py_files = filtered_files |
| |
|
| | for py_file in py_files: |
| | try: |
| | |
| | with open(py_file, 'r', encoding='utf-8', errors='ignore') as f: |
| | code = f.read() |
| |
|
| | if code: |
| | |
| | |
| | nodes = extract_nodes_enhanced(code, py_file, visited=set(), verbose=verbose) |
| | all_nodes.update(nodes) |
| |
|
| | |
| | v3_nodes = extract_v3_nodes(code) |
| | all_nodes.update(v3_nodes) |
| |
|
| | |
| | pattern = r"_CLASS_MAPPINGS\s*(?::\s*\w+\s*)?=\s*(?:\\\s*)?{([^}]*)}" |
| | regex = re.compile(pattern, re.MULTILINE | re.DOTALL) |
| |
|
| | for match_obj in regex.finditer(code): |
| | |
| | match_start = match_obj.start() |
| | line_start = code.rfind('\n', 0, match_start) + 1 |
| | line_end = code.find('\n', match_start) |
| | if line_end == -1: |
| | line_end = len(code) |
| | line = code[line_start:line_end] |
| |
|
| | |
| | if re.match(r'^\s*#', line): |
| | continue |
| |
|
| | match = match_obj.group(1) |
| |
|
| | |
| | match_lines = match.split('\n') |
| | match_filtered = '\n'.join( |
| | line for line in match_lines |
| | if not re.match(r'^\s*#', line) |
| | ) |
| |
|
| | |
| | key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", match_filtered) |
| | for key, value in key_value_pairs: |
| | all_nodes.add(key.strip()) |
| |
|
| | |
| | key_value_pairs = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", match_filtered) |
| | for key, value in key_value_pairs: |
| | all_nodes.add(key.strip()) |
| |
|
| | |
| | code_cleaned = re.sub(r'^#.*?$', '', code, flags=re.MULTILINE) |
| |
|
| | update_pattern = r"_CLASS_MAPPINGS\.update\s*\(\s*{([^}]*)}\s*\)" |
| | update_match = re.search(update_pattern, code_cleaned, re.DOTALL) |
| | if update_match: |
| | update_dict_text = update_match.group(1) |
| | |
| | update_pairs = re.findall(r'"([^"]*)"\s*:\s*([^,\n]*)', update_dict_text) |
| | for key, value in update_pairs: |
| | all_nodes.add(key.strip()) |
| | |
| | update_pairs_single = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", update_dict_text) |
| | for key, value in update_pairs_single: |
| | all_nodes.add(key.strip()) |
| |
|
| | |
| | patterns = [ |
| | r'^[^=]*_CLASS_MAPPINGS\["(.*?)"\]', |
| | r'^[^=]*_CLASS_MAPPINGS\[\'(.*?)\'\]', |
| | r'@register_node\("(.+)",\s*\".+"\)', |
| | r'"(\w+)"\s*:\s*{"class":\s*\w+\s*' |
| | ] |
| |
|
| | for pattern in patterns: |
| | keys = re.findall(pattern, code_cleaned) |
| | all_nodes.update(key.strip() for key in keys) |
| |
|
| | |
| | metadata = extract_metadata_only(str(py_file)) |
| | all_metadata.update(metadata) |
| | except Exception: |
| | |
| | continue |
| |
|
| | |
| | _save_per_repo_cache(repo_path, all_nodes, all_metadata) |
| |
|
| | return (all_nodes, all_metadata) |
| |
|
| |
|
| | def _verify_class_exists(node_name: str, code_text: str, file_path: Optional[Path] = None) -> tuple[bool, Optional[str], Optional[int]]: |
| | """ |
| | Verify that a node class exists and has ComfyUI node structure. |
| | |
| | Returns: (exists: bool, file_path: str, line_number: int) |
| | |
| | A valid ComfyUI node must have: |
| | - Class definition (not commented) |
| | - At least one of: INPUT_TYPES, RETURN_TYPES, FUNCTION method/attribute |
| | """ |
| | try: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | tree = ast.parse(code_text) |
| | except: |
| | return (False, None, None) |
| |
|
| | for node in ast.walk(tree): |
| | if isinstance(node, ast.ClassDef): |
| | if node.name == node_name or node.name.replace('_', '') == node_name.replace('_', ''): |
| | |
| | has_input_types = False |
| | has_return_types = False |
| | has_function = False |
| |
|
| | for item in node.body: |
| | |
| | if isinstance(item, ast.FunctionDef) and item.name == 'INPUT_TYPES': |
| | has_input_types = True |
| | |
| | elif isinstance(item, ast.Assign): |
| | for target in item.targets: |
| | if isinstance(target, ast.Name): |
| | if target.id == 'RETURN_TYPES': |
| | has_return_types = True |
| | elif target.id == 'FUNCTION': |
| | has_function = True |
| | |
| | elif isinstance(item, ast.FunctionDef): |
| | has_function = True |
| |
|
| | |
| | if has_input_types or has_return_types or has_function: |
| | file_str = str(file_path) if file_path else None |
| | return (True, file_str, node.lineno) |
| |
|
| | return (False, None, None) |
| |
|
| |
|
| | def _extract_display_name_mappings(code_text: str) -> Set[str]: |
| | """ |
| | Extract node names from NODE_DISPLAY_NAME_MAPPINGS. |
| | |
| | Pattern: |
| | NODE_DISPLAY_NAME_MAPPINGS = { |
| | "node_key": "Display Name", |
| | ... |
| | } |
| | |
| | Returns: |
| | Set of node keys from NODE_DISPLAY_NAME_MAPPINGS |
| | """ |
| | try: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | tree = ast.parse(code_text) |
| | except: |
| | return set() |
| |
|
| | nodes = set() |
| |
|
| | for node in tree.body: |
| | if isinstance(node, ast.Assign): |
| | for target in node.targets: |
| | if isinstance(target, ast.Name) and target.id == 'NODE_DISPLAY_NAME_MAPPINGS': |
| | if isinstance(node.value, ast.Dict): |
| | for key in node.value.keys: |
| | if isinstance(key, ast.Constant) and isinstance(key.value, str): |
| | nodes.add(key.value.strip()) |
| |
|
| | return nodes |
| |
|
| |
|
| | def extract_nodes_enhanced( |
| | code_text: str, |
| | file_path: Optional[Path] = None, |
| | visited: Optional[Set[Path]] = None, |
| | verbose: bool = False |
| | ) -> Set[str]: |
| | """ |
| | Enhanced node extraction with multi-layer detection system. |
| | |
| | Scanner 2.0.11 - Comprehensive detection strategy: |
| | - Phase 1: NODE_CLASS_MAPPINGS dict literal |
| | - Phase 2: Class.NAME attribute access (e.g., FreeChat.NAME) |
| | - Phase 3: Item assignment (NODE_CLASS_MAPPINGS["key"] = value) |
| | - Phase 4: Class existence verification (detects active classes even if registration commented) |
| | - Phase 5: NODE_DISPLAY_NAME_MAPPINGS cross-reference |
| | - Phase 6: Empty dict detection (UI-only extensions, logging only) |
| | |
| | Fixed Bugs: |
| | - Scanner 2.0.9: Fallback cascade prevented Phase 3 execution |
| | - Scanner 2.0.10: Missed active classes with commented registrations (15 false negatives) |
| | |
| | Args: |
| | code_text: Python source code |
| | file_path: Path to file (for logging and caching) |
| | visited: Visited paths (for circular import prevention) |
| | verbose: If True, print UI-only extension detection messages |
| | |
| | Returns: |
| | Set of node names (union of all detected patterns) |
| | """ |
| | |
| | if file_path is not None: |
| | try: |
| | file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path |
| | if file_path_obj.exists(): |
| | current_mtime = file_path_obj.stat().st_mtime |
| |
|
| | |
| | if file_path_obj in _file_mtime_cache: |
| | cached_mtime = _file_mtime_cache[file_path_obj] |
| | cache_key = (str(file_path_obj), cached_mtime, SCANNER_VERSION) |
| |
|
| | if current_mtime == cached_mtime and cache_key in _extract_nodes_enhanced_cache: |
| | return _extract_nodes_enhanced_cache[cache_key].copy() |
| | except: |
| | pass |
| |
|
| | |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | warnings.filterwarnings('ignore', category=DeprecationWarning) |
| |
|
| | |
| | phase1_nodes = extract_nodes(code_text) |
| |
|
| | |
| | if visited is None: |
| | visited = set() |
| | phase2_nodes = _fallback_classname_resolver(code_text, file_path) |
| |
|
| | |
| | phase3_nodes = _fallback_item_assignment(code_text) |
| |
|
| | |
| | |
| | phase4_nodes = _extract_display_name_mappings(code_text) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | phase5_nodes = set() |
| | for node_name in phase4_nodes: |
| | |
| | if node_name not in (phase1_nodes | phase2_nodes | phase3_nodes): |
| | exists, _, _ = _verify_class_exists(node_name, code_text, file_path) |
| | if exists: |
| | phase5_nodes.add(node_name) |
| |
|
| | |
| | |
| | |
| | phase6_nodes = _fallback_dict_comprehension(code_text, file_path) |
| |
|
| | |
| | |
| | phase7_nodes = _fallback_import_class_names(code_text, file_path) |
| |
|
| | |
| | |
| | |
| | |
| | all_nodes = phase1_nodes | phase2_nodes | phase3_nodes | phase4_nodes | phase5_nodes | phase6_nodes | phase7_nodes |
| |
|
| | |
| | if not all_nodes: |
| | _fallback_empty_dict_detector(code_text, file_path, verbose) |
| |
|
| | |
| | if file_path is not None: |
| | try: |
| | file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path |
| | if file_path_obj.exists(): |
| | current_mtime = file_path_obj.stat().st_mtime |
| | cache_key = (str(file_path_obj), current_mtime, SCANNER_VERSION) |
| | _extract_nodes_enhanced_cache[cache_key] = all_nodes |
| | _file_mtime_cache[file_path_obj] = current_mtime |
| | except: |
| | pass |
| |
|
| | return all_nodes |
| |
|
| |
|
| | def _fallback_classname_resolver(code_text: str, file_path: Optional[Path]) -> Set[str]: |
| | """ |
| | Detect Class.NAME pattern in NODE_CLASS_MAPPINGS. |
| | |
| | Pattern: |
| | NODE_CLASS_MAPPINGS = { |
| | FreeChat.NAME: FreeChat, |
| | PaidChat.NAME: PaidChat |
| | } |
| | """ |
| | try: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | parsed = ast.parse(code_text) |
| | except: |
| | return set() |
| | |
| | nodes = set() |
| | |
| | for node in parsed.body: |
| | if isinstance(node, ast.Assign): |
| | for target in node.targets: |
| | if isinstance(target, ast.Name) and target.id == 'NODE_CLASS_MAPPINGS': |
| | if isinstance(node.value, ast.Dict): |
| | for key in node.value.keys: |
| | |
| | if isinstance(key, ast.Attribute): |
| | if isinstance(key.value, ast.Name): |
| | |
| | nodes.add(key.value.id) |
| | |
| | elif isinstance(key, ast.Constant) and isinstance(key.value, str): |
| | nodes.add(key.value.strip()) |
| | |
| | return nodes |
| |
|
| |
|
| | def _fallback_item_assignment(code_text: str) -> Set[str]: |
| | """ |
| | Detect item assignment pattern. |
| | |
| | Pattern: |
| | NODE_CLASS_MAPPINGS = {} |
| | NODE_CLASS_MAPPINGS["MyNode"] = MyNode |
| | """ |
| | try: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | parsed = ast.parse(code_text) |
| | except: |
| | return set() |
| |
|
| | nodes = set() |
| |
|
| | for node in ast.walk(parsed): |
| | if isinstance(node, ast.Assign): |
| | for target in node.targets: |
| | if isinstance(target, ast.Subscript): |
| | if (isinstance(target.value, ast.Name) and |
| | target.value.id in ['NODE_CLASS_MAPPINGS', 'NODE_CONFIG']): |
| | |
| | if isinstance(target.slice, ast.Constant): |
| | if isinstance(target.slice.value, str): |
| | nodes.add(target.slice.value) |
| |
|
| | return nodes |
| |
|
| |
|
| | def _fallback_dict_comprehension(code_text: str, file_path: Optional[Path] = None) -> Set[str]: |
| | """ |
| | Detect dict comprehension pattern with __name__ attribute access. |
| | |
| | Pattern: |
| | NODE_CLASS_MAPPINGS = {cls.__name__: cls for cls in to_export} |
| | NODE_CLASS_MAPPINGS = {c.__name__: c for c in [ClassA, ClassB]} |
| | |
| | This function detects dict comprehension assignments to NODE_CLASS_MAPPINGS |
| | and extracts class names from the iterable (list literal or variable reference). |
| | |
| | Returns: |
| | Set of class names extracted from the dict comprehension |
| | """ |
| | try: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | parsed = ast.parse(code_text) |
| | except: |
| | return set() |
| |
|
| | nodes = set() |
| | export_lists = {} |
| |
|
| | |
| | for node in ast.walk(parsed): |
| | if isinstance(node, ast.Assign): |
| | for target in node.targets: |
| | if isinstance(target, ast.Name): |
| | var_name = target.id |
| | |
| | if isinstance(node.value, ast.List): |
| | class_names = set() |
| | for elt in node.value.elts: |
| | if isinstance(elt, ast.Name): |
| | class_names.add(elt.id) |
| | export_lists[var_name] = class_names |
| |
|
| | |
| | elif isinstance(node, ast.AugAssign): |
| | if isinstance(node.target, ast.Name) and isinstance(node.op, ast.Add): |
| | var_name = node.target.id |
| | if isinstance(node.value, ast.List): |
| | class_names = set() |
| | for elt in node.value.elts: |
| | if isinstance(elt, ast.Name): |
| | class_names.add(elt.id) |
| | if var_name in export_lists: |
| | export_lists[var_name].update(class_names) |
| | else: |
| | export_lists[var_name] = class_names |
| |
|
| | |
| | for node in ast.walk(parsed): |
| | if isinstance(node, ast.Assign): |
| | for target in node.targets: |
| | if isinstance(target, ast.Name) and target.id in ['NODE_CLASS_MAPPINGS', 'NODE_CONFIG']: |
| | |
| | if isinstance(node.value, ast.DictComp): |
| | dictcomp = node.value |
| |
|
| | |
| | key = dictcomp.key |
| | if isinstance(key, ast.Attribute) and key.attr == '__name__': |
| | |
| | for generator in dictcomp.generators: |
| | iter_node = generator.iter |
| |
|
| | |
| | if isinstance(iter_node, ast.List): |
| | for elt in iter_node.elts: |
| | if isinstance(elt, ast.Name): |
| | nodes.add(elt.id) |
| |
|
| | |
| | elif isinstance(iter_node, ast.Name): |
| | var_name = iter_node.id |
| | if var_name in export_lists: |
| | nodes.update(export_lists[var_name]) |
| |
|
| | return nodes |
| |
|
| |
|
| | def _fallback_import_class_names(code_text: str, file_path: Optional[Path] = None) -> Set[str]: |
| | """ |
| | Extract class names from imports that are added to export lists. |
| | |
| | Pattern: |
| | from .module import ClassA, ClassB |
| | to_export = [ClassA, ClassB] |
| | NODE_CLASS_MAPPINGS = {cls.__name__: cls for cls in to_export} |
| | |
| | This is a complementary fallback that works with _fallback_dict_comprehension |
| | to resolve import-based node registrations. |
| | |
| | Returns: |
| | Set of imported class names that appear in export-like contexts |
| | """ |
| | try: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | parsed = ast.parse(code_text) |
| | except: |
| | return set() |
| |
|
| | |
| | imported_names = set() |
| | for node in ast.walk(parsed): |
| | if isinstance(node, ast.ImportFrom): |
| | for alias in node.names: |
| | name = alias.asname if alias.asname else alias.name |
| | imported_names.add(name) |
| |
|
| | |
| | export_candidates = set() |
| | has_dict_comp_mapping = False |
| |
|
| | for node in ast.walk(parsed): |
| | |
| | if isinstance(node, ast.Assign): |
| | for target in node.targets: |
| | if isinstance(target, ast.Name) and target.id == 'NODE_CLASS_MAPPINGS': |
| | if isinstance(node.value, ast.DictComp): |
| | has_dict_comp_mapping = True |
| |
|
| | |
| | if isinstance(node, ast.Assign): |
| | if isinstance(node.value, ast.List): |
| | for elt in node.value.elts: |
| | if isinstance(elt, ast.Name) and elt.id in imported_names: |
| | export_candidates.add(elt.id) |
| |
|
| | |
| | elif isinstance(node, ast.AugAssign): |
| | if isinstance(node.value, ast.List): |
| | for elt in node.value.elts: |
| | if isinstance(elt, ast.Name) and elt.id in imported_names: |
| | export_candidates.add(elt.id) |
| |
|
| | |
| | if has_dict_comp_mapping: |
| | return export_candidates |
| |
|
| | return set() |
| |
|
| |
|
| | def _extract_repo_name(file_path: Path) -> str: |
| | """ |
| | Extract repository name from file path. |
| | |
| | Path structure: /home/rho/.tmp/analysis/temp/{author}_{reponame}/{path/to/file.py} |
| | Returns: {author}_{reponame} or filename if extraction fails |
| | """ |
| | try: |
| | parts = file_path.parts |
| | |
| | if 'temp' in parts: |
| | temp_idx = parts.index('temp') |
| | if temp_idx + 1 < len(parts): |
| | |
| | return parts[temp_idx + 1] |
| | except (ValueError, IndexError): |
| | pass |
| |
|
| | |
| | return file_path.name if hasattr(file_path, 'name') else str(file_path) |
| |
|
| |
|
| | def _fallback_empty_dict_detector(code_text: str, file_path: Optional[Path], verbose: bool = False) -> None: |
| | """ |
| | Detect empty NODE_CLASS_MAPPINGS (UI-only extensions). |
| | Logs for documentation purposes only (when verbose=True). |
| | |
| | Args: |
| | code_text: Python source code to analyze |
| | file_path: Path to the file being analyzed |
| | verbose: If True, print detection messages |
| | """ |
| | empty_patterns = [ |
| | 'NODE_CLASS_MAPPINGS = {}', |
| | 'NODE_CLASS_MAPPINGS={}', |
| | ] |
| |
|
| | code_normalized = code_text.replace(' ', '').replace('\n', '') |
| |
|
| | for pattern in empty_patterns: |
| | pattern_normalized = pattern.replace(' ', '') |
| | if pattern_normalized in code_normalized: |
| | if file_path and verbose: |
| | repo_name = _extract_repo_name(file_path) |
| | print(f"Info: UI-only extension (empty NODE_CLASS_MAPPINGS): {repo_name}") |
| | return |
| |
|
| | def has_comfy_node_base(class_node): |
| | """Check if class inherits from io.ComfyNode or ComfyNode""" |
| | for base in class_node.bases: |
| | |
| | if isinstance(base, ast.Name) and base.id == 'ComfyNode': |
| | return True |
| | |
| | elif isinstance(base, ast.Attribute): |
| | if base.attr == 'ComfyNode': |
| | return True |
| | return False |
| |
|
| |
|
| | def extract_keyword_value(call_node, keyword): |
| | """ |
| | Extract string value of keyword argument |
| | Schema(node_id="MyNode") -> "MyNode" |
| | """ |
| | for kw in call_node.keywords: |
| | if kw.arg == keyword: |
| | |
| | if isinstance(kw.value, ast.Constant): |
| | if isinstance(kw.value.value, str): |
| | return kw.value.value |
| | |
| | else: |
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=DeprecationWarning) |
| | if hasattr(ast, 'Str') and isinstance(kw.value, ast.Str): |
| | return kw.value.s |
| | return None |
| |
|
| |
|
| | def is_schema_call(call_node): |
| | """Check if ast.Call is io.Schema() or Schema()""" |
| | func = call_node.func |
| | if isinstance(func, ast.Name) and func.id == 'Schema': |
| | return True |
| | elif isinstance(func, ast.Attribute) and func.attr == 'Schema': |
| | return True |
| | return False |
| |
|
| |
|
| | def extract_node_id_from_schema(class_node): |
| | """ |
| | Extract node_id from define_schema() method |
| | """ |
| | for item in class_node.body: |
| | if isinstance(item, ast.FunctionDef) and item.name == 'define_schema': |
| | |
| | for stmt in ast.walk(item): |
| | if isinstance(stmt, ast.Call): |
| | |
| | if is_schema_call(stmt): |
| | node_id = extract_keyword_value(stmt, 'node_id') |
| | if node_id: |
| | return node_id |
| | return None |
| |
|
| |
|
| | def extract_v3_nodes(code_text): |
| | """ |
| | Extract V3 node IDs using AST parsing |
| | Returns: set of node_id strings |
| | """ |
| | global parse_cnt |
| |
|
| | try: |
| | if parse_cnt % 100 == 0: |
| | print(".", end="", flush=True) |
| | parse_cnt += 1 |
| |
|
| | with warnings.catch_warnings(): |
| | warnings.filterwarnings('ignore', category=SyntaxWarning) |
| | warnings.filterwarnings('ignore', category=DeprecationWarning) |
| | tree = ast.parse(code_text) |
| | except (SyntaxError, UnicodeDecodeError): |
| | return set() |
| |
|
| | nodes = set() |
| |
|
| | |
| | for node in ast.walk(tree): |
| | if isinstance(node, ast.ClassDef): |
| | |
| | if has_comfy_node_base(node): |
| | node_id = extract_node_id_from_schema(node) |
| | if node_id: |
| | nodes.add(node_id) |
| |
|
| | return nodes |
| |
|
| |
|
| | |
| | def extract_metadata_only(filename): |
| | """Extract only metadata (@author, @title, etc) without node scanning""" |
| | try: |
| | with open(filename, encoding='utf-8', errors='ignore') as file: |
| | code = file.read() |
| |
|
| | metadata = {} |
| | lines = code.strip().split('\n') |
| | for line in lines: |
| | if line.startswith('@'): |
| | if line.startswith("@author:") or line.startswith("@title:") or line.startswith("@nickname:") or line.startswith("@description:"): |
| | key, value = line[1:].strip().split(':', 1) |
| | metadata[key.strip()] = value.strip() |
| |
|
| | return metadata |
| | except: |
| | return {} |
| |
|
| |
|
| | def scan_in_file(filename, is_builtin=False): |
| | global builtin_nodes |
| |
|
| | with open(filename, encoding='utf-8', errors='ignore') as file: |
| | code = file.read() |
| |
|
| | |
| | pattern = r"_CLASS_MAPPINGS\s*(?::\s*\w+\s*)?=\s*(?:\\\s*)?{([^}]*)}" |
| | regex = re.compile(pattern, re.MULTILINE | re.DOTALL) |
| |
|
| | nodes = set() |
| | class_dict = {} |
| |
|
| | |
| | nodes |= extract_nodes_enhanced(code, file_path=Path(filename), visited=set()) |
| |
|
| | |
| | nodes |= extract_v3_nodes(code) |
| | code = re.sub(r'^#.*?$', '', code, flags=re.MULTILINE) |
| |
|
| | def extract_keys(pattern, code): |
| | keys = re.findall(pattern, code) |
| | return {key.strip() for key in keys} |
| |
|
| | def update_nodes(nodes, new_keys): |
| | nodes |= new_keys |
| |
|
| | patterns = [ |
| | r'^[^=]*_CLASS_MAPPINGS\["(.*?)"\]', |
| | r'^[^=]*_CLASS_MAPPINGS\[\'(.*?)\'\]', |
| | r'@register_node\("(.+)",\s*\".+"\)', |
| | r'"(\w+)"\s*:\s*{"class":\s*\w+\s*' |
| | ] |
| |
|
| | with concurrent.futures.ThreadPoolExecutor() as executor: |
| | futures = {executor.submit(extract_keys, pattern, code): pattern for pattern in patterns} |
| | for future in concurrent.futures.as_completed(futures): |
| | update_nodes(nodes, future.result()) |
| |
|
| | matches = regex.findall(code) |
| | for match in matches: |
| | dict_text = match |
| |
|
| | key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", dict_text) |
| | for key, value in key_value_pairs: |
| | class_dict[key.strip()] = value.strip() |
| |
|
| | key_value_pairs = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", dict_text) |
| | for key, value in key_value_pairs: |
| | class_dict[key.strip()] = value.strip() |
| |
|
| | for key, value in class_dict.items(): |
| | nodes.add(key.strip()) |
| |
|
| | update_pattern = r"_CLASS_MAPPINGS.update\s*\({([^}]*)}\)" |
| | update_match = re.search(update_pattern, code) |
| | if update_match: |
| | update_dict_text = update_match.group(1) |
| | update_key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", update_dict_text) |
| | for key, value in update_key_value_pairs: |
| | class_dict[key.strip()] = value.strip() |
| | nodes.add(key.strip()) |
| |
|
| | metadata = {} |
| | lines = code.strip().split('\n') |
| | for line in lines: |
| | if line.startswith('@'): |
| | if line.startswith("@author:") or line.startswith("@title:") or line.startswith("@nickname:") or line.startswith("@description:"): |
| | key, value = line[1:].strip().split(':', 1) |
| | metadata[key.strip()] = value.strip() |
| |
|
| | if is_builtin: |
| | builtin_nodes += set(nodes) |
| | else: |
| | for x in builtin_nodes: |
| | if x in nodes: |
| | nodes.remove(x) |
| |
|
| | return nodes, metadata |
| |
|
| |
|
| | def get_py_file_paths(dirname): |
| | file_paths = [] |
| | |
| | for root, dirs, files in os.walk(dirname): |
| | if ".git" in root or "__pycache__" in root: |
| | continue |
| |
|
| | for file in files: |
| | if file.endswith(".py"): |
| | file_path = os.path.join(root, file) |
| | file_paths.append(file_path) |
| | |
| | return file_paths |
| |
|
| |
|
| | def get_nodes(target_dir): |
| | py_files = [] |
| | directories = [] |
| | |
| | for item in os.listdir(target_dir): |
| | if ".git" in item or "__pycache__" in item: |
| | continue |
| |
|
| | path = os.path.abspath(os.path.join(target_dir, item)) |
| | |
| | if os.path.isfile(path) and item.endswith(".py"): |
| | py_files.append(path) |
| | elif os.path.isdir(path): |
| | directories.append(path) |
| | |
| | return py_files, directories |
| |
|
| |
|
| | def get_urls_from_list_file(list_file): |
| | """ |
| | Read URLs from list file for scan-only mode |
| | |
| | Args: |
| | list_file (str): Path to URL list file (one URL per line) |
| | |
| | Returns: |
| | list of tuples: [(url, "", None, None), ...] |
| | Format: (url, title, preemptions, nodename_pattern) |
| | - title: Empty string |
| | - preemptions: None |
| | - nodename_pattern: None |
| | |
| | File format: |
| | https://github.com/owner/repo1 |
| | https://github.com/owner/repo2 |
| | # Comments starting with # are ignored |
| | |
| | Raises: |
| | FileNotFoundError: If list_file does not exist |
| | """ |
| | if not os.path.exists(list_file): |
| | raise FileNotFoundError(f"URL list file not found: {list_file}") |
| |
|
| | urls = [] |
| | with open(list_file, 'r', encoding='utf-8') as f: |
| | for line_num, line in enumerate(f, 1): |
| | line = line.strip() |
| |
|
| | |
| | if not line or line.startswith('#'): |
| | continue |
| |
|
| | |
| | if not (line.startswith('http://') or line.startswith('https://')): |
| | print(f"WARNING: Line {line_num} is not a valid URL: {line}") |
| | continue |
| |
|
| | |
| | |
| | urls.append((line, "", None, None)) |
| |
|
| | print(f"Loaded {len(urls)} URLs from {list_file}") |
| | return urls |
| |
|
| |
|
| | def get_git_urls_from_json(json_file): |
| | with open(json_file, encoding='utf-8') as file: |
| | data = json.load(file) |
| |
|
| | custom_nodes = data.get('custom_nodes', []) |
| | git_clone_files = [] |
| | for node in custom_nodes: |
| | if node.get('install_type') == 'git-clone': |
| | files = node.get('files', []) |
| | if files: |
| | git_clone_files.append((files[0], node.get('title'), node.get('preemptions'), node.get('nodename_pattern'))) |
| |
|
| | git_clone_files.append(("https://github.com/comfyanonymous/ComfyUI", "ComfyUI", None, None)) |
| |
|
| | return git_clone_files |
| |
|
| |
|
| | def get_py_urls_from_json(json_file): |
| | with open(json_file, encoding='utf-8') as file: |
| | data = json.load(file) |
| |
|
| | custom_nodes = data.get('custom_nodes', []) |
| | py_files = [] |
| | for node in custom_nodes: |
| | if node.get('install_type') == 'copy': |
| | files = node.get('files', []) |
| | if files: |
| | py_files.append((files[0], node.get('title'), node.get('preemptions'), node.get('nodename_pattern'))) |
| |
|
| | return py_files |
| |
|
| |
|
| | def clone_or_pull_git_repository(git_url): |
| | repo_name = git_url.split("/")[-1] |
| | if repo_name.endswith(".git"): |
| | repo_name = repo_name[:-4] |
| | |
| | repo_dir = os.path.join(temp_dir, repo_name) |
| |
|
| | if os.path.exists(repo_dir): |
| | try: |
| | repo = Repo(repo_dir) |
| | origin = repo.remote(name="origin") |
| | origin.pull() |
| | repo.git.submodule('update', '--init', '--recursive') |
| | print(f"Pulling {repo_name}...") |
| | except Exception as e: |
| | print(f"Failed to pull '{repo_name}': {e}") |
| | else: |
| | try: |
| | Repo.clone_from(git_url, repo_dir, recursive=True) |
| | print(f"Cloning {repo_name}...") |
| | except Exception as e: |
| | print(f"Failed to clone '{repo_name}': {e}") |
| |
|
| |
|
| | def update_custom_nodes(scan_only_mode=False, url_list_file=None): |
| | """ |
| | Update custom nodes by cloning/pulling repositories |
| | |
| | Args: |
| | scan_only_mode (bool): If True, use URL list file instead of custom-node-list.json |
| | url_list_file (str): Path to URL list file (required if scan_only_mode=True) |
| | |
| | Returns: |
| | dict: node_info mapping {repo_name: (url, title, preemptions, node_pattern)} |
| | """ |
| | if not os.path.exists(temp_dir): |
| | os.makedirs(temp_dir) |
| |
|
| | node_info = {} |
| |
|
| | |
| | if scan_only_mode: |
| | if not url_list_file: |
| | raise ValueError("url_list_file is required in scan-only mode") |
| |
|
| | git_url_titles_preemptions = get_urls_from_list_file(url_list_file) |
| | print("\n[Scan-Only Mode]") |
| | print(f" - URL source: {url_list_file}") |
| | print(" - GitHub stats: DISABLED") |
| | print(f" - Git clone/pull: {'ENABLED' if not skip_update else 'DISABLED'}") |
| | print(" - Metadata: EMPTY") |
| | else: |
| | if not os.path.exists('custom-node-list.json'): |
| | raise FileNotFoundError("custom-node-list.json not found") |
| |
|
| | git_url_titles_preemptions = get_git_urls_from_json('custom-node-list.json') |
| | print("\n[Standard Mode]") |
| | print(" - URL source: custom-node-list.json") |
| | print(f" - GitHub stats: {'ENABLED' if not skip_stat_update else 'DISABLED'}") |
| | print(f" - Git clone/pull: {'ENABLED' if not skip_update else 'DISABLED'}") |
| | print(" - Metadata: FULL") |
| |
|
| | def process_git_url_title(url, title, preemptions, node_pattern): |
| | name = os.path.basename(url) |
| | if name.endswith(".git"): |
| | name = name[:-4] |
| | |
| | node_info[name] = (url, title, preemptions, node_pattern) |
| | if not skip_update: |
| | clone_or_pull_git_repository(url) |
| |
|
| | def process_git_stats(git_url_titles_preemptions): |
| | GITHUB_STATS_CACHE_FILENAME = 'github-stats-cache.json' |
| | GITHUB_STATS_FILENAME = 'github-stats.json' |
| |
|
| | github_stats = {} |
| | try: |
| | with open(GITHUB_STATS_CACHE_FILENAME, 'r', encoding='utf-8') as file: |
| | github_stats = json.load(file) |
| | except FileNotFoundError: |
| | pass |
| |
|
| | def is_rate_limit_exceeded(): |
| | return g.rate_limiting[0] <= 20 |
| |
|
| | if is_rate_limit_exceeded(): |
| | print(f"GitHub API Rate Limit Exceeded: remained - {(g.rate_limiting_resettime - datetime.datetime.now().timestamp())/60:.2f} min") |
| | else: |
| | def renew_stat(url): |
| | if is_rate_limit_exceeded(): |
| | return |
| |
|
| | if 'github.com' not in url: |
| | return None |
| |
|
| | print('.', end="") |
| | sys.stdout.flush() |
| | try: |
| | |
| | parsed_url = urlparse(url) |
| | domain = parsed_url.netloc |
| | path = parsed_url.path |
| | path_parts = path.strip("/").split("/") |
| | if len(path_parts) >= 2 and domain == "github.com": |
| | owner_repo = "/".join(path_parts[-2:]) |
| | repo = g.get_repo(owner_repo) |
| | owner = repo.owner |
| | now = datetime.datetime.now(datetime.timezone.utc) |
| | author_time_diff = now - owner.created_at |
| | |
| | last_update = repo.pushed_at.strftime("%Y-%m-%d %H:%M:%S") if repo.pushed_at else 'N/A' |
| | item = { |
| | "stars": repo.stargazers_count, |
| | "last_update": last_update, |
| | "cached_time": now.timestamp(), |
| | "author_account_age_days": author_time_diff.days, |
| | } |
| | return url, item |
| | else: |
| | print(f"\nInvalid URL format for GitHub repository: {url}\n") |
| | except Exception as e: |
| | print(f"\nERROR on {url}\n{e}") |
| |
|
| | return None |
| |
|
| | |
| | with concurrent.futures.ThreadPoolExecutor(11) as executor: |
| | futures = [] |
| | for url, title, preemptions, node_pattern in git_url_titles_preemptions: |
| | if url not in github_stats: |
| | futures.append(executor.submit(renew_stat, url)) |
| |
|
| | for future in concurrent.futures.as_completed(futures): |
| | url_item = future.result() |
| | if url_item is not None: |
| | url, item = url_item |
| | github_stats[url] = item |
| |
|
| | |
| | outdated_urls = [] |
| | for k, v in github_stats.items(): |
| | elapsed = (datetime.datetime.now().timestamp() - v['cached_time']) |
| | if elapsed > 60*60*12: |
| | outdated_urls.append(k) |
| |
|
| | with concurrent.futures.ThreadPoolExecutor(11) as executor: |
| | for url in outdated_urls: |
| | futures.append(executor.submit(renew_stat, url)) |
| |
|
| | for future in concurrent.futures.as_completed(futures): |
| | url_item = future.result() |
| | if url_item is not None: |
| | url, item = url_item |
| | github_stats[url] = item |
| | |
| | with open('github-stats-cache.json', 'w', encoding='utf-8') as file: |
| | json.dump(github_stats, file, ensure_ascii=False, indent=4) |
| |
|
| | with open(GITHUB_STATS_FILENAME, 'w', encoding='utf-8') as file: |
| | for v in github_stats.values(): |
| | if "cached_time" in v: |
| | del v["cached_time"] |
| |
|
| | github_stats = dict(sorted(github_stats.items())) |
| |
|
| | json.dump(github_stats, file, ensure_ascii=False, indent=4) |
| |
|
| | print(f"Successfully written to {GITHUB_STATS_FILENAME}.") |
| |
|
| | if not skip_stat_update: |
| | process_git_stats(git_url_titles_preemptions) |
| |
|
| | |
| | with concurrent.futures.ThreadPoolExecutor(11) as executor: |
| | for url, title, preemptions, node_pattern in git_url_titles_preemptions: |
| | executor.submit(process_git_url_title, url, title, preemptions, node_pattern) |
| |
|
| | |
| | if not scan_only_mode: |
| | py_url_titles_and_pattern = get_py_urls_from_json('custom-node-list.json') |
| |
|
| | def download_and_store_info(url_title_preemptions_and_pattern): |
| | url, title, preemptions, node_pattern = url_title_preemptions_and_pattern |
| | name = os.path.basename(url) |
| | if name.endswith(".py"): |
| | node_info[name] = (url, title, preemptions, node_pattern) |
| |
|
| | try: |
| | download_url(url, temp_dir) |
| | except: |
| | print(f"[ERROR] Cannot download '{url}'") |
| |
|
| | with concurrent.futures.ThreadPoolExecutor(10) as executor: |
| | executor.map(download_and_store_info, py_url_titles_and_pattern) |
| |
|
| | return node_info |
| |
|
| |
|
| | def gen_json(node_info, scan_only_mode=False, force_rescan=False): |
| | """ |
| | Generate extension-node-map.json from scanned node information |
| | |
| | Args: |
| | node_info (dict): Repository metadata mapping |
| | scan_only_mode (bool): If True, exclude metadata from output |
| | force_rescan (bool): If True, ignore cache and force rescan all nodes |
| | """ |
| | |
| | node_files, node_dirs = get_nodes(temp_dir) |
| |
|
| | comfyui_path = os.path.abspath(os.path.join(temp_dir, "ComfyUI")) |
| | |
| | if comfyui_path in node_dirs: |
| | node_dirs.remove(comfyui_path) |
| | node_dirs = [comfyui_path] + node_dirs |
| |
|
| | data = {} |
| | for dirname in node_dirs: |
| | py_files = get_py_file_paths(dirname) |
| | metadata = {} |
| |
|
| | |
| | try: |
| | nodes, metadata = extract_nodes_from_repo(Path(dirname), verbose=False, force_rescan=force_rescan) |
| | except: |
| | |
| | nodes = set() |
| | for py in py_files: |
| | nodes_in_file, metadata_in_file = scan_in_file(py, dirname == "ComfyUI") |
| | nodes.update(nodes_in_file) |
| | metadata.update(metadata_in_file) |
| |
|
| | dirname = os.path.basename(dirname) |
| |
|
| | if 'Jovimetrix' in dirname: |
| | pass |
| |
|
| | if len(nodes) > 0 or (dirname in node_info and node_info[dirname][3] is not None): |
| | nodes = list(nodes) |
| | nodes.sort() |
| |
|
| | if dirname in node_info: |
| | git_url, title, preemptions, node_pattern = node_info[dirname] |
| |
|
| | |
| | if not scan_only_mode: |
| | |
| | metadata['title_aux'] = title |
| |
|
| | if preemptions is not None: |
| | metadata['preemptions'] = preemptions |
| |
|
| | if node_pattern is not None: |
| | metadata['nodename_pattern'] = node_pattern |
| | |
| |
|
| | data[git_url] = (nodes, metadata) |
| | else: |
| | |
| | |
| | if '_' in dirname: |
| | parts = dirname.split('_', 1) |
| | git_url = f"https://github.com/{parts[0]}/{parts[1]}" |
| | data[git_url] = (nodes, metadata) |
| | else: |
| | print(f"WARN: {dirname} is removed from custom-node-list.json") |
| |
|
| | for file in node_files: |
| | nodes, metadata = scan_in_file(file) |
| |
|
| | if len(nodes) > 0 or (dirname in node_info and node_info[dirname][3] is not None): |
| | nodes = list(nodes) |
| | nodes.sort() |
| |
|
| | file = os.path.basename(file) |
| |
|
| | if file in node_info: |
| | url, title, preemptions, node_pattern = node_info[file] |
| |
|
| | |
| | if not scan_only_mode: |
| | metadata['title_aux'] = title |
| |
|
| | if preemptions is not None: |
| | metadata['preemptions'] = preemptions |
| |
|
| | if node_pattern is not None: |
| | metadata['nodename_pattern'] = node_pattern |
| |
|
| | data[url] = (nodes, metadata) |
| | else: |
| | print(f"Missing info: {file}") |
| |
|
| | |
| | extensions = [name for name in os.listdir(temp_dir) if os.path.isdir(os.path.join(temp_dir, name))] |
| |
|
| | for extension in extensions: |
| | node_list_json_path = os.path.join(temp_dir, extension, 'node_list.json') |
| | if os.path.exists(node_list_json_path): |
| | |
| | if extension not in node_info: |
| | continue |
| |
|
| | git_url, title, preemptions, node_pattern = node_info[extension] |
| |
|
| | with open(node_list_json_path, 'r', encoding='utf-8') as f: |
| | try: |
| | node_list_json = json.load(f) |
| | except Exception as e: |
| | print(f"\nERROR: Invalid json format '{node_list_json_path}'") |
| | print("------------------------------------------------------") |
| | print(e) |
| | print("------------------------------------------------------") |
| | node_list_json = {} |
| |
|
| | metadata_in_url = {} |
| | if git_url not in data: |
| | nodes = set() |
| | else: |
| | nodes_in_url, metadata_in_url = data[git_url] |
| | nodes = set(nodes_in_url) |
| |
|
| | try: |
| | for x, desc in node_list_json.items(): |
| | nodes.add(x.strip()) |
| | except Exception as e: |
| | print(f"\nERROR: Invalid json format '{node_list_json_path}'") |
| | print("------------------------------------------------------") |
| | print(e) |
| | print("------------------------------------------------------") |
| | node_list_json = {} |
| |
|
| | |
| | if not scan_only_mode: |
| | metadata_in_url['title_aux'] = title |
| |
|
| | if preemptions is not None: |
| | metadata_in_url['preemptions'] = preemptions |
| |
|
| | if node_pattern is not None: |
| | metadata_in_url['nodename_pattern'] = node_pattern |
| |
|
| | nodes = list(nodes) |
| | nodes.sort() |
| | data[git_url] = (nodes, metadata_in_url) |
| |
|
| | json_path = "extension-node-map.json" |
| | with open(json_path, "w", encoding='utf-8') as file: |
| | json.dump(data, file, indent=4, sort_keys=True) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | args = parse_arguments() |
| |
|
| | |
| | scan_only_mode = args.scan_only is not None |
| | url_list_file = args.scan_only if scan_only_mode else None |
| |
|
| | |
| | if args.temp_dir: |
| | temp_dir = args.temp_dir |
| | elif args.temp_dir_positional: |
| | temp_dir = args.temp_dir_positional |
| | else: |
| | temp_dir = os.path.join(os.getcwd(), ".tmp") |
| |
|
| | if not os.path.exists(temp_dir): |
| | os.makedirs(temp_dir) |
| |
|
| | |
| | skip_update = args.skip_update or args.skip_all |
| | skip_stat_update = args.skip_stat_update or args.skip_all or scan_only_mode |
| |
|
| | if not skip_stat_update: |
| | auth = Auth.Token(os.environ.get('GITHUB_TOKEN')) |
| | g = Github(auth=auth) |
| | else: |
| | g = None |
| |
|
| | print("### ComfyUI Manager Node Scanner ###") |
| |
|
| | if scan_only_mode: |
| | print(f"\n# [Scan-Only Mode] Processing URL list: {url_list_file}\n") |
| | else: |
| | print("\n# [Standard Mode] Updating extensions\n") |
| |
|
| | |
| | updated_node_info = update_custom_nodes(scan_only_mode, url_list_file) |
| |
|
| | print("\n# Generating 'extension-node-map.json'...\n") |
| |
|
| | |
| | force_rescan = args.force_rescan if hasattr(args, 'force_rescan') else False |
| | if force_rescan: |
| | print("⚠️ Force rescan enabled - ignoring all cached results\n") |
| | gen_json(updated_node_info, scan_only_mode, force_rescan) |
| |
|
| | print("\n✅ DONE.\n") |
| |
|
| | if scan_only_mode: |
| | print("Output: extension-node-map.json (node mappings only)") |
| | else: |
| | print("Output: extension-node-map.json (full metadata)") |
| |
|