| import os
|
| import sys
|
| import importlib
|
| import importlib.util
|
| import inspect
|
| import re
|
| import datetime
|
| from typing import Dict, Any, Optional, List, Union, Set
|
| from dataclasses import dataclass
|
| import gradio as gr
|
| import traceback
|
| import subprocess
|
| import git
|
| import shutil
|
| import stat
|
| import json
|
| import requests
|
| video_gen_label = "Video Generator"
|
|
|
| COMMUNITY_PLUGINS_URL = "https://github.com/deepbeepmeep/Wan2GP/raw/refs/heads/main/plugins.json"
|
| PLUGIN_CATALOG_FILENAME = "plugins.json"
|
| PLUGIN_LOCAL_CATALOG_FILENAME = "plugins_local.json"
|
| PLUGIN_METADATA_FILENAME = "plugin_info.json"
|
| PENDING_DELETIONS_KEY = "pending_plugin_deletions"
|
|
|
| def _has_value(value: Any) -> bool:
|
| if value is None:
|
| return False
|
| if isinstance(value, str):
|
| return value.strip() != ""
|
| return True
|
|
|
| def _split_github_repo(url: str) -> Optional[tuple]:
|
| if not isinstance(url, str):
|
| return None
|
| cleaned = url.strip()
|
| if not cleaned:
|
| return None
|
| cleaned = cleaned.split("?", 1)[0].split("#", 1)[0]
|
| if cleaned.startswith("git@github.com:"):
|
| cleaned = "https://github.com/" + cleaned[len("git@github.com:"):]
|
| cleaned = cleaned.rstrip("/")
|
| marker = "github.com/"
|
| idx = cleaned.lower().find(marker)
|
| if idx < 0:
|
| return None
|
| tail = cleaned[idx + len(marker):]
|
| parts = [part for part in tail.split("/") if part]
|
| if len(parts) < 2:
|
| return None
|
| owner, repo = parts[0], parts[1]
|
| if repo.endswith(".git"):
|
| repo = repo[:-4]
|
| if not owner or not repo:
|
| return None
|
| return owner, repo
|
|
|
| def normalize_plugin_url(url: str) -> str:
|
| if not isinstance(url, str):
|
| return ""
|
| cleaned = url.strip()
|
| if not cleaned:
|
| return ""
|
| cleaned = cleaned.split("?", 1)[0].split("#", 1)[0]
|
| if cleaned.startswith("git@github.com:"):
|
| cleaned = "https://github.com/" + cleaned[len("git@github.com:"):]
|
| cleaned = cleaned.rstrip("/")
|
| repo_info = _split_github_repo(cleaned)
|
| if repo_info:
|
| owner, repo = repo_info
|
| return f"https://github.com/{owner}/{repo}"
|
| if cleaned.endswith(".git"):
|
| cleaned = cleaned[:-4]
|
| return cleaned.rstrip("/")
|
|
|
| def _parse_version_parts(version: str) -> List[Any]:
|
| if not isinstance(version, str):
|
| return []
|
| version = version.strip()
|
| if not version:
|
| return []
|
| parts = re.split(r"[^0-9A-Za-z]+", version)
|
| tokens = []
|
| for part in parts:
|
| if not part:
|
| continue
|
| for token in re.findall(r"\d+|[A-Za-z]+", part):
|
| if token.isdigit():
|
| tokens.append((0, int(token)))
|
| else:
|
| tokens.append((1, token.lower()))
|
| return tokens
|
|
|
| def compare_versions(left: str, right: str) -> int:
|
| left_text = left if isinstance(left, str) else ""
|
| right_text = right if isinstance(right, str) else ""
|
| left_has_digits = bool(re.search(r"\d", left_text))
|
| right_has_digits = bool(re.search(r"\d", right_text))
|
| if left_has_digits != right_has_digits:
|
| return 1 if left_has_digits else -1
|
| left_parts = _parse_version_parts(left_text)
|
| right_parts = _parse_version_parts(right_text)
|
| max_len = max(len(left_parts), len(right_parts))
|
| if max_len == 0:
|
| return 0
|
| filler = (0, 0)
|
| for idx in range(max_len):
|
| left_part = left_parts[idx] if idx < len(left_parts) else filler
|
| right_part = right_parts[idx] if idx < len(right_parts) else filler
|
| if left_part == right_part:
|
| continue
|
| return 1 if left_part > right_part else -1
|
| return 0
|
|
|
| def _parse_date(value: Any) -> Optional[datetime.datetime]:
|
| if not isinstance(value, str):
|
| return None
|
| text = value.strip()
|
| if not text:
|
| return None
|
| if text.endswith("Z"):
|
| text = text[:-1] + "+00:00"
|
| try:
|
| return datetime.datetime.fromisoformat(text)
|
| except ValueError:
|
| return None
|
|
|
| def compare_release_metadata(left: Dict[str, Any], right: Dict[str, Any]) -> int:
|
| left_date = _parse_date(left.get("date"))
|
| right_date = _parse_date(right.get("date"))
|
| if left_date or right_date:
|
| if left_date and right_date:
|
| if left_date != right_date:
|
| return 1 if left_date > right_date else -1
|
| elif left_date:
|
| return 1
|
| else:
|
| return -1
|
| return compare_versions(left.get("version", ""), right.get("version", ""))
|
|
|
| def is_wangp_compatible(required_version: str, current_version: str) -> bool:
|
| if not _has_value(required_version):
|
| return True
|
| return compare_versions(current_version or "", required_version) >= 0
|
|
|
| def plugin_id_from_url(url: str) -> str:
|
| if not isinstance(url, str):
|
| return ""
|
| repo_info = _split_github_repo(url)
|
| if repo_info:
|
| return repo_info[1]
|
| clean = normalize_plugin_url(url)
|
| if not clean:
|
| return ""
|
| return clean.split("/")[-1]
|
| def auto_install_and_enable_default_plugins(manager: 'PluginManager', wgp_globals: dict):
|
| server_config = wgp_globals.get("server_config")
|
| server_config_filename = wgp_globals.get("server_config_filename")
|
|
|
| if not server_config or not server_config_filename:
|
| print("[Plugins] WARNING: Cannot auto-install/enable default plugins. Server config not found.")
|
| return
|
|
|
| default_plugins = {}
|
|
|
| config_modified = False
|
| enabled_plugins = server_config.get("enabled_plugins", [])
|
| newly_installed = []
|
|
|
| for repo_name, url in default_plugins.items():
|
| target_dir = os.path.join(manager.plugins_dir, repo_name)
|
| if not os.path.isdir(target_dir):
|
| print(f"[Plugins] Auto-installing default plugin: {repo_name}...")
|
| result = manager.install_plugin_from_url(url)
|
| print(f"[Plugins] Install result for {repo_name}: {result}")
|
|
|
| if "[Success]" in result:
|
| newly_installed.append(repo_name)
|
|
|
| for repo_name in newly_installed:
|
| if repo_name in enabled_plugins:
|
| enabled_plugins.remove(repo_name)
|
| config_modified = True
|
|
|
| if config_modified:
|
| print("[Plugins] Disabling newly installed default plugins...")
|
| server_config["enabled_plugins"] = enabled_plugins
|
| try:
|
| with open(server_config_filename, 'w', encoding='utf-8') as f:
|
| json.dump(server_config, f, indent=4)
|
| except Exception as e:
|
| print(f"[Plugins] ERROR: Failed to update config file '{server_config_filename}': {e}")
|
|
|
|
|
| SYSTEM_PLUGINS = [
|
| "wan2gp-video-mask-creator",
|
| "wan2gp-motion-designer",
|
| "wan2gp-guides",
|
| "wan2gp-configuration",
|
| "wan2gp-plugin-manager",
|
| "wan2gp-about",
|
| ]
|
| BUNDLED_PLUGINS = {
|
| "wan2gp-sample",
|
| }
|
|
|
| USER_PLUGIN_INSERT_POSITION = 3
|
|
|
| @dataclass
|
| class InsertAfterRequest:
|
| target_component_id: str
|
| new_component_constructor: callable
|
|
|
| @dataclass
|
| class PluginTab:
|
| id: str
|
| label: str
|
| component_constructor: callable
|
| position: int = -1
|
|
|
| class WAN2GPPlugin:
|
| def __init__(self):
|
| self.tabs: Dict[str, PluginTab] = {}
|
| self.name = self.__class__.__name__
|
| self.version = "1.0.0"
|
| self.description = "No description provided."
|
| self.uninstallable = True
|
| self._component_requests: List[str] = []
|
| self._global_requests: List[str] = []
|
| self._insert_after_requests: List[InsertAfterRequest] = []
|
| self._setup_complete = False
|
| self._data_hooks: Dict[str, List[callable]] = {}
|
| self.tab_ids: List[str] = []
|
| self._set_wgp_global_func = None
|
| self._custom_js_snippets: List[str] = []
|
|
|
| def setup_ui(self) -> None:
|
| pass
|
|
|
| def add_tab(self, tab_id: str, label: str, component_constructor: callable, position: int = -1):
|
| self.tabs[tab_id] = PluginTab(id=tab_id, label=label, component_constructor=component_constructor, position=position)
|
|
|
| def post_ui_setup(self, components: Dict[str, gr.components.Component]) -> Dict[gr.components.Component, Union[gr.update, Any]]:
|
| return {}
|
|
|
| def on_tab_select(self, state: Dict[str, Any]) -> None:
|
| pass
|
|
|
| def on_tab_deselect(self, state: Dict[str, Any]) -> None:
|
| pass
|
|
|
| def request_component(self, component_id: str) -> None:
|
| if component_id not in self._component_requests:
|
| self._component_requests.append(component_id)
|
|
|
| def request_global(self, global_name: str) -> None:
|
| if global_name not in self._global_requests:
|
| self._global_requests.append(global_name)
|
|
|
| def set_global(self, variable_name: str, new_value: Any):
|
| if self._set_wgp_global_func:
|
| return self._set_wgp_global_func(variable_name, new_value)
|
|
|
| @property
|
| def component_requests(self) -> List[str]:
|
| return self._component_requests.copy()
|
|
|
| @property
|
| def global_requests(self) -> List[str]:
|
| return self._global_requests.copy()
|
|
|
| def register_data_hook(self, hook_name: str, callback: callable):
|
| if hook_name not in self._data_hooks:
|
| self._data_hooks[hook_name] = []
|
| self._data_hooks[hook_name].append(callback)
|
|
|
| def add_custom_js(self, js_code: str) -> None:
|
| if isinstance(js_code, str) and js_code.strip():
|
| self._custom_js_snippets.append(js_code)
|
|
|
| @property
|
| def custom_js_snippets(self) -> List[str]:
|
| return self._custom_js_snippets.copy()
|
|
|
| def insert_after(self, target_component_id: str, new_component_constructor: callable) -> None:
|
| if not hasattr(self, '_insert_after_requests'):
|
| self._insert_after_requests = []
|
| self._insert_after_requests.append(
|
| InsertAfterRequest(
|
| target_component_id=target_component_id,
|
| new_component_constructor=new_component_constructor
|
| )
|
| )
|
|
|
| class PluginManager:
|
| def __init__(self, plugins_dir="plugins"):
|
| self.plugins: Dict[str, WAN2GPPlugin] = {}
|
| self.plugins_dir = plugins_dir
|
| os.makedirs(self.plugins_dir, exist_ok=True)
|
| if self.plugins_dir not in sys.path:
|
| sys.path.insert(0, self.plugins_dir)
|
| self.data_hooks: Dict[str, List[callable]] = {}
|
| self.restricted_globals: Set[str] = set()
|
| self.custom_js_snippets: List[str] = []
|
| self.repo_root = os.path.abspath(os.getcwd())
|
| self.catalog_path = os.path.join(self.repo_root, PLUGIN_CATALOG_FILENAME)
|
| self.local_catalog_path = os.path.join(self.repo_root, PLUGIN_LOCAL_CATALOG_FILENAME)
|
| self.server_config: Optional[Dict[str, Any]] = None
|
| self.server_config_filename: str = ""
|
|
|
| def set_server_config(self, server_config: Optional[Dict[str, Any]], server_config_filename: str = "") -> None:
|
| self.server_config = server_config if isinstance(server_config, dict) else None
|
| self.server_config_filename = server_config_filename or ""
|
|
|
| def _save_server_config(self) -> None:
|
| if not self.server_config or not self.server_config_filename:
|
| return
|
| try:
|
| with open(self.server_config_filename, "w", encoding="utf-8") as writer:
|
| writer.write(json.dumps(self.server_config, indent=4))
|
| except Exception as e:
|
| print(f"[PluginManager] Failed to write config file '{self.server_config_filename}': {e}")
|
|
|
| def _get_pending_deletions(self) -> List[str]:
|
| if not self.server_config:
|
| return []
|
| pending = self.server_config.get(PENDING_DELETIONS_KEY, [])
|
| if not isinstance(pending, list):
|
| return []
|
| cleaned = []
|
| for item in pending:
|
| if isinstance(item, str) and item.strip():
|
| cleaned.append(item.strip())
|
| return cleaned
|
|
|
| def _set_pending_deletions(self, pending: List[str]) -> None:
|
| if not self.server_config:
|
| return
|
| unique = []
|
| seen = set()
|
| for item in pending:
|
| if not isinstance(item, str):
|
| continue
|
| key = item.strip()
|
| if not key or key in seen:
|
| continue
|
| seen.add(key)
|
| unique.append(key)
|
| self.server_config[PENDING_DELETIONS_KEY] = unique
|
| self._save_server_config()
|
|
|
| def _add_pending_deletion(self, plugin_id: str) -> None:
|
| if not plugin_id:
|
| return
|
| pending = self._get_pending_deletions()
|
| if plugin_id not in pending:
|
| pending.append(plugin_id)
|
| self._set_pending_deletions(pending)
|
|
|
| def _clear_pending_deletion(self, plugin_id: str) -> None:
|
| if not plugin_id:
|
| return
|
| pending = self._get_pending_deletions()
|
| if plugin_id in pending:
|
| pending = [item for item in pending if item != plugin_id]
|
| self._set_pending_deletions(pending)
|
|
|
| def _is_cleanup_candidate(self, path: str) -> bool:
|
| try:
|
| for entry in os.scandir(path):
|
| name = entry.name
|
| if name.startswith("."):
|
| continue
|
| if entry.is_file(follow_symlinks=False):
|
| if name.endswith(".pyc"):
|
| continue
|
| return False
|
| if entry.is_dir(follow_symlinks=False):
|
| if name == "__pycache__":
|
| continue
|
| if not self._is_cleanup_candidate(entry.path):
|
| return False
|
| return True
|
| except Exception:
|
| return False
|
|
|
| def cleanup_pending_deletions(self) -> None:
|
| pending = self._get_pending_deletions()
|
| if not pending:
|
| return
|
| remaining = []
|
| for plugin_id in pending:
|
| plugin_dir = os.path.join(self.plugins_dir, plugin_id)
|
| if not os.path.isdir(plugin_dir):
|
| continue
|
| if self._is_cleanup_candidate(plugin_dir):
|
| try:
|
| shutil.rmtree(plugin_dir, onerror=self._remove_readonly)
|
| continue
|
| except Exception:
|
| remaining.append(plugin_id)
|
| continue
|
| remaining.append(plugin_id)
|
| self._set_pending_deletions(remaining)
|
|
|
| def _coerce_bool(self, value: Any, default: bool = True) -> bool:
|
| if isinstance(value, bool):
|
| return value
|
| if isinstance(value, str):
|
| lowered = value.strip().lower()
|
| if lowered in ("true", "1", "yes"):
|
| return True
|
| if lowered in ("false", "0", "no"):
|
| return False
|
| return default
|
|
|
| def _load_json_file(self, path: str) -> Optional[Any]:
|
| if not path or not os.path.isfile(path):
|
| return None
|
| last_error = None
|
| for encoding in ("utf-8", "utf-8-sig"):
|
| try:
|
| with open(path, "r", encoding=encoding) as reader:
|
| return json.load(reader)
|
| except UnicodeDecodeError as e:
|
| last_error = e
|
| continue
|
| except json.JSONDecodeError as e:
|
| last_error = e
|
| if encoding == "utf-8" and "UTF-8 BOM" in str(e):
|
| continue
|
| break
|
| except Exception as e:
|
| last_error = e
|
| break
|
| if last_error is not None:
|
| print(f"[PluginManager] Failed to read JSON from {path}: {last_error}")
|
| return None
|
|
|
| def _write_json_file(self, path: str, payload: Any) -> None:
|
| if not path:
|
| return
|
| try:
|
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
|
| with open(path, "w", encoding="utf-8") as writer:
|
| json.dump(payload, writer, indent=2)
|
| except Exception as e:
|
| print(f"[PluginManager] Failed to write JSON to {path}: {e}")
|
|
|
| def _load_plugin_metadata(self, plugin_path: str) -> Optional[Dict[str, Any]]:
|
| metadata_path = os.path.join(plugin_path, PLUGIN_METADATA_FILENAME)
|
| payload = self._load_json_file(metadata_path)
|
| if not isinstance(payload, dict):
|
| return None
|
| return self._normalize_plugin_metadata(payload)
|
|
|
| def _normalize_plugin_metadata(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
| metadata = dict(payload)
|
| for key in ("name", "version", "description", "author", "date", "wan2gp_version"):
|
| value = metadata.get(key)
|
| if isinstance(value, str):
|
| metadata[key] = value.strip()
|
| elif value is None:
|
| metadata[key] = ""
|
| else:
|
| metadata[key] = str(value)
|
| legacy_version = metadata.get("wangp_version")
|
| if not _has_value(metadata.get("wan2gp_version")) and _has_value(legacy_version):
|
| metadata["wan2gp_version"] = str(legacy_version).strip()
|
| metadata.pop("wangp_version", None)
|
| metadata["url"] = ""
|
| metadata["uninstallable"] = self._coerce_bool(metadata.get("uninstallable"), default=True)
|
| return metadata
|
|
|
| def _apply_metadata_to_plugin(self, plugin: WAN2GPPlugin, metadata: Optional[Dict[str, Any]], is_system: bool) -> None:
|
| if not metadata:
|
| if is_system:
|
| plugin.uninstallable = False
|
| return
|
| for key in ("name", "version", "description", "author", "date", "wan2gp_version"):
|
| if key in metadata:
|
| setattr(plugin, key, metadata.get(key))
|
| if not _has_value(getattr(plugin, "wan2gp_version", "")) and _has_value(metadata.get("wangp_version")):
|
| setattr(plugin, "wan2gp_version", str(metadata.get("wangp_version")).strip())
|
| if "uninstallable" in metadata:
|
| plugin.uninstallable = self._coerce_bool(metadata.get("uninstallable"), default=True)
|
| if is_system:
|
| plugin.uninstallable = False
|
|
|
| def _normalize_catalog_entry(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
| entry = dict(payload)
|
| for key in ("name", "author", "version", "description", "url", "date", "wan2gp_version", "last_check"):
|
| value = entry.get(key)
|
| if key == "url":
|
| if isinstance(value, str):
|
| entry[key] = normalize_plugin_url(value)
|
| elif value is None:
|
| entry[key] = ""
|
| else:
|
| entry[key] = normalize_plugin_url(str(value))
|
| continue
|
| if isinstance(value, str):
|
| entry[key] = value.strip()
|
| elif value is None:
|
| entry[key] = ""
|
| else:
|
| entry[key] = str(value)
|
| legacy_version = entry.get("wangp_version")
|
| if not _has_value(entry.get("wan2gp_version")) and _has_value(legacy_version):
|
| entry["wan2gp_version"] = str(legacy_version).strip()
|
| entry.pop("wangp_version", None)
|
| return entry
|
|
|
| def _merge_entry_fields(self, primary: Dict[str, Any], secondary: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
| result = dict(primary) if primary else {}
|
| if not secondary:
|
| return result
|
| for key, value in secondary.items():
|
| if _has_value(result.get(key)):
|
| continue
|
| if _has_value(value):
|
| result[key] = value
|
| return result
|
|
|
| def _merge_catalog_entries(
|
| self,
|
| base_entries: List[Dict[str, Any]],
|
| local_entries: List[Dict[str, Any]],
|
| ) -> Dict[str, Dict[str, Any]]:
|
| base_map: Dict[str, Dict[str, Any]] = {}
|
| local_map: Dict[str, Dict[str, Any]] = {}
|
| for entry in base_entries:
|
| plugin_id = entry.get("id") or plugin_id_from_url(entry.get("url", ""))
|
| if not plugin_id:
|
| continue
|
| base_map[plugin_id] = self._normalize_catalog_entry(entry)
|
| for entry in local_entries:
|
| plugin_id = entry.get("id") or plugin_id_from_url(entry.get("url", ""))
|
| if not plugin_id:
|
| continue
|
| local_map[plugin_id] = self._normalize_catalog_entry(entry)
|
|
|
| merged: Dict[str, Dict[str, Any]] = {}
|
| all_ids = set(base_map.keys()) | set(local_map.keys())
|
| for plugin_id in all_ids:
|
| base_entry = base_map.get(plugin_id)
|
| local_entry = local_map.get(plugin_id)
|
| if base_entry and local_entry:
|
| comparison = compare_release_metadata(local_entry, base_entry)
|
| if comparison > 0:
|
| merged_entry = dict(local_entry)
|
| else:
|
| merged_entry = dict(base_entry)
|
| if _has_value(local_entry.get("last_check")):
|
| merged_entry["last_check"] = local_entry.get("last_check")
|
| merged[plugin_id] = merged_entry
|
| else:
|
| merged[plugin_id] = base_entry or local_entry
|
| return merged
|
|
|
| def _fetch_remote_catalog_entries(self) -> List[Dict[str, Any]]:
|
| try:
|
| response = requests.get(COMMUNITY_PLUGINS_URL, timeout=10)
|
| response.raise_for_status()
|
| payload = response.json()
|
| if isinstance(payload, list):
|
| return [self._normalize_catalog_entry(entry) for entry in payload if isinstance(entry, dict)]
|
| except Exception as e:
|
| print(f"[PluginManager] Could not fetch community plugins info: {e}")
|
| return []
|
|
|
| def load_catalog_entries(self, use_remote: bool = True) -> List[Dict[str, Any]]:
|
| entries = None
|
| if not entries:
|
| payload = self._load_json_file(self.catalog_path)
|
| if isinstance(payload, list):
|
| entries = [self._normalize_catalog_entry(entry) for entry in payload if isinstance(entry, dict)]
|
| return entries
|
|
|
| def load_local_catalog_entries(self) -> List[Dict[str, Any]]:
|
| payload = self._load_json_file(self.local_catalog_path)
|
| if isinstance(payload, list):
|
| return [self._normalize_catalog_entry(entry) for entry in payload if isinstance(entry, dict)]
|
| return []
|
|
|
| def get_merged_catalog_entries(self, use_remote: bool = True) -> Dict[str, Dict[str, Any]]:
|
| base_entries = self.load_catalog_entries(use_remote=use_remote)
|
| local_entries = self.load_local_catalog_entries()
|
| return self._merge_catalog_entries(base_entries, local_entries)
|
|
|
| def _build_plugin_json_url(self, url: str) -> str:
|
| repo_info = _split_github_repo(url)
|
| if not repo_info:
|
| return ""
|
| owner, repo = repo_info
|
| return f"https://github.com/{owner}/{repo}/raw/HEAD/{PLUGIN_METADATA_FILENAME}"
|
|
|
| def _fetch_plugin_json(self, url: str, quiet: bool = False) -> Optional[Dict[str, Any]]:
|
| plugin_json_url = self._build_plugin_json_url(url)
|
| if not plugin_json_url:
|
| return None
|
| try:
|
| response = requests.get(plugin_json_url, timeout=10)
|
| if response.status_code != 200:
|
| return None
|
| payload = response.json()
|
| if not isinstance(payload, dict):
|
| return None
|
| return self._normalize_plugin_metadata(payload)
|
| except Exception as e:
|
| if not quiet:
|
| print(f"[PluginManager] Could not fetch {PLUGIN_METADATA_FILENAME} for {url}: {e}")
|
| return None
|
|
|
| def _metadata_to_catalog_entry(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
|
| entry = {}
|
| for key in ("name", "author", "version", "description", "date", "wan2gp_version"):
|
| entry[key] = metadata.get(key, "")
|
| return self._normalize_catalog_entry(entry)
|
|
|
| def _extract_catalog_metadata(self, entry: Dict[str, Any]) -> Dict[str, Any]:
|
| payload = {}
|
| for key in ("name", "author", "version", "description", "url", "date", "wan2gp_version"):
|
| payload[key] = entry.get(key, "")
|
| return self._normalize_catalog_entry(payload)
|
|
|
| def _find_catalog_entry(self, plugin_id: str, url: str = "", use_remote: bool = False) -> Optional[Dict[str, Any]]:
|
| entries = self.load_catalog_entries(use_remote=use_remote)
|
| target_id = plugin_id.strip().lower() if isinstance(plugin_id, str) else ""
|
| target_url = normalize_plugin_url(url).lower() if isinstance(url, str) else ""
|
| for entry in entries:
|
| entry_url = entry.get("url", "")
|
| entry_id = entry.get("id") or plugin_id_from_url(entry_url)
|
| entry_id = entry_id.strip().lower() if isinstance(entry_id, str) else ""
|
| entry_url_norm = normalize_plugin_url(entry_url).lower() if isinstance(entry_url, str) else ""
|
| if target_id and entry_id and entry_id == target_id:
|
| return self._normalize_catalog_entry(entry)
|
| if target_url and entry_url_norm and entry_url_norm == target_url:
|
| return self._normalize_catalog_entry(entry)
|
| return None
|
|
|
| def _get_git_remote_url(self, plugin_path: str) -> str:
|
| if not plugin_path:
|
| return ""
|
| git_dir = os.path.join(plugin_path, ".git")
|
| if not os.path.isdir(git_dir):
|
| return ""
|
| try:
|
| repo = git.Repo(plugin_path)
|
| if repo.remotes:
|
| return repo.remotes.origin.url
|
| except Exception:
|
| return ""
|
| return ""
|
|
|
| def refresh_catalog(self, installed_only: bool = True, use_remote: bool = True) -> Dict[str, int]:
|
| base_entries = self.load_catalog_entries(use_remote=False)
|
| local_entries = self.load_local_catalog_entries()
|
| merged_catalog = self._merge_catalog_entries(base_entries, local_entries)
|
| local_map: Dict[str, Dict[str, Any]] = {}
|
| for entry in local_entries:
|
| plugin_id = entry.get("id") or plugin_id_from_url(entry.get("url", ""))
|
| if plugin_id:
|
| local_map[plugin_id] = self._normalize_catalog_entry(entry)
|
|
|
| def _info_to_entry(info: Dict[str, Any]) -> Dict[str, Any]:
|
| return {
|
| "name": info.get("name", ""),
|
| "author": info.get("author", ""),
|
| "version": info.get("version", ""),
|
| "description": info.get("description", ""),
|
| "url": info.get("url", ""),
|
| "date": info.get("date", ""),
|
| "wan2gp_version": info.get("wan2gp_version", ""),
|
| }
|
|
|
| targets: List[Dict[str, Any]] = []
|
| if installed_only:
|
| installed_plugins = [info for info in self.get_plugins_info() if not info.get("system")]
|
| for info in installed_plugins:
|
| plugin_id = info.get("id", "")
|
| if not plugin_id:
|
| continue
|
| base_entry = merged_catalog.get(plugin_id, {})
|
| git_url = self._get_git_remote_url(info.get("path", ""))
|
| url = normalize_plugin_url(base_entry.get("url") or info.get("url") or git_url or "")
|
| if git_url and plugin_id_from_url(git_url) == plugin_id:
|
| url = normalize_plugin_url(git_url)
|
| if not _has_value(url):
|
| continue
|
| targets.append(
|
| {
|
| "id": plugin_id,
|
| "url": url,
|
| "base_entry": base_entry,
|
| "info_entry": _info_to_entry(info),
|
| }
|
| )
|
| else:
|
| for plugin_id, entry in merged_catalog.items():
|
| url = normalize_plugin_url(entry.get("url") or "")
|
| if not _has_value(url):
|
| continue
|
| targets.append({"id": plugin_id, "url": url, "base_entry": entry, "info_entry": {}})
|
|
|
| checked = 0
|
| updated = 0
|
| updates_available = 0
|
| for target in targets:
|
| plugin_id = target["id"]
|
| url = target["url"]
|
| base_entry = target.get("base_entry", {})
|
| info_entry = target.get("info_entry", {})
|
| checked += 1
|
| metadata = self._fetch_plugin_json(url, quiet=True)
|
| if not metadata:
|
| continue
|
| catalog_entry = self._metadata_to_catalog_entry(metadata)
|
| if _has_value(url):
|
| catalog_entry_url = catalog_entry.get("url", "")
|
| if not _has_value(catalog_entry_url) or plugin_id_from_url(catalog_entry_url) != plugin_id:
|
| catalog_entry["url"] = url
|
| catalog_entry = self._merge_entry_fields(catalog_entry, base_entry)
|
| if info_entry:
|
| catalog_entry = self._merge_entry_fields(catalog_entry, info_entry)
|
| catalog_entry["last_check"] = datetime.datetime.now().isoformat(timespec="seconds")
|
| catalog_entry = self._normalize_catalog_entry(catalog_entry)
|
| existing_entry = local_map.get(plugin_id, {})
|
| if info_entry:
|
| if compare_release_metadata(catalog_entry, info_entry) > 0:
|
| updates_available += 1
|
| elif existing_entry:
|
| if compare_release_metadata(catalog_entry, existing_entry) > 0:
|
| updates_available += 1
|
| local_map[plugin_id] = catalog_entry
|
| updated += 1
|
|
|
| if updated > 0:
|
| self._write_json_file(self.local_catalog_path, list(local_map.values()))
|
| return {"checked": checked, "updated": updated, "updates_available": updates_available}
|
|
|
| def record_plugin_metadata(self, plugin_id: str, url: str = "") -> bool:
|
| if not plugin_id:
|
| return False
|
| url = normalize_plugin_url(url)
|
| plugin_path = os.path.join(self.plugins_dir, plugin_id)
|
| metadata = self._load_plugin_metadata(plugin_path) if os.path.isdir(plugin_path) else None
|
| plugin_json_found = metadata is not None
|
| if not metadata and _has_value(url):
|
| metadata = self._fetch_plugin_json(url)
|
| plugin_json_found = metadata is not None
|
| if not metadata:
|
| plugin_info = next((info for info in self.get_plugins_info() if info.get("id") == plugin_id), None)
|
| if plugin_info:
|
| metadata = {
|
| "name": plugin_info.get("name", ""),
|
| "version": plugin_info.get("version", ""),
|
| "description": plugin_info.get("description", ""),
|
| "author": plugin_info.get("author", ""),
|
| "url": plugin_info.get("url", ""),
|
| "date": plugin_info.get("date", ""),
|
| "wan2gp_version": plugin_info.get("wan2gp_version", ""),
|
| }
|
| if not metadata:
|
| return False
|
|
|
| catalog_entry = self._metadata_to_catalog_entry(metadata)
|
| if _has_value(url):
|
| catalog_entry_url = catalog_entry.get("url", "")
|
| if not _has_value(catalog_entry_url) or plugin_id_from_url(catalog_entry_url) != plugin_id:
|
| catalog_entry["url"] = url
|
|
|
| merged_catalog = self.get_merged_catalog_entries(use_remote=False)
|
| catalog_entry = self._merge_entry_fields(catalog_entry, merged_catalog.get(plugin_id, {}))
|
| base_entry = self._find_catalog_entry(plugin_id, url=url, use_remote=False)
|
| if base_entry:
|
| catalog_entry = self._merge_entry_fields(catalog_entry, base_entry)
|
| if plugin_json_found:
|
| catalog_entry["last_check"] = datetime.datetime.now().isoformat(timespec="seconds")
|
| catalog_entry = self._normalize_catalog_entry(catalog_entry)
|
|
|
| local_entries = self.load_local_catalog_entries()
|
| local_map: Dict[str, Dict[str, Any]] = {}
|
| for entry in local_entries:
|
| existing_id = entry.get("id") or plugin_id_from_url(entry.get("url", ""))
|
| if existing_id:
|
| local_map[existing_id] = self._normalize_catalog_entry(entry)
|
| local_map[plugin_id] = catalog_entry
|
| self._write_json_file(self.local_catalog_path, list(local_map.values()))
|
| return True
|
|
|
| def merge_local_catalog(self) -> str:
|
| local_entries = self.load_local_catalog_entries()
|
| if not local_entries:
|
| return "[Info] No local catalog entries to merge."
|
| base_entries = self.load_catalog_entries(use_remote=False)
|
| merged = self._merge_catalog_entries(base_entries, local_entries)
|
| merged_list = []
|
| for entry in merged.values():
|
| entry.pop("last_check", None)
|
| merged_list.append(entry)
|
| merged_list.sort(key=lambda item: item.get("name", ""))
|
| self._write_json_file(self.catalog_path, merged_list)
|
| try:
|
| os.remove(self.local_catalog_path)
|
| except FileNotFoundError:
|
| pass
|
| except Exception as e:
|
| return f"[Warning] Catalog merged, but failed to remove {self.local_catalog_path}: {e}"
|
| return "[Success] Catalog merged and local overrides removed."
|
|
|
| def get_plugins_info(self) -> List[Dict[str, str]]:
|
| plugins_info = []
|
| for dir_name in self.discover_plugins():
|
| plugin_path = os.path.join(self.plugins_dir, dir_name)
|
| is_system = dir_name in SYSTEM_PLUGINS
|
| info = {
|
| 'id': dir_name,
|
| 'name': dir_name,
|
| 'version': 'N/A',
|
| 'description': 'No description provided.',
|
| 'author': '',
|
| 'url': '',
|
| 'date': '',
|
| 'wan2gp_version': '',
|
| 'path': plugin_path,
|
| 'system': is_system,
|
| 'uninstallable': True,
|
| }
|
| metadata = self._load_plugin_metadata(plugin_path)
|
| if metadata:
|
| info['name'] = metadata.get('name', info['name'])
|
| info['version'] = metadata.get('version', info['version'])
|
| info['description'] = metadata.get('description', info['description'])
|
| info['author'] = metadata.get('author', info['author'])
|
| info['url'] = metadata.get('url', info['url'])
|
| info['date'] = metadata.get('date', info['date'])
|
| info['wan2gp_version'] = metadata.get('wan2gp_version', info['wan2gp_version'])
|
| info['uninstallable'] = bool(metadata.get('uninstallable', info['uninstallable']))
|
| else:
|
| try:
|
| module = importlib.import_module(f"{dir_name}.plugin")
|
| for name, obj in inspect.getmembers(module, inspect.isclass):
|
| if issubclass(obj, WAN2GPPlugin) and obj != WAN2GPPlugin:
|
| instance = obj()
|
| info['name'] = instance.name
|
| info['version'] = instance.version
|
| info['description'] = instance.description
|
| info['uninstallable'] = bool(getattr(instance, 'uninstallable', True))
|
| break
|
| except Exception as e:
|
| print(f"Could not load metadata for plugin {dir_name}: {e}")
|
| if is_system:
|
| info['uninstallable'] = False
|
| if info['id'] in BUNDLED_PLUGINS:
|
| info['uninstallable'] = False
|
| if not is_system:
|
| merged_catalog = self.get_merged_catalog_entries(use_remote=False)
|
| catalog_entry = merged_catalog.get(info.get("id", ""))
|
| if catalog_entry:
|
| info = self._merge_entry_fields(info, self._extract_catalog_metadata(catalog_entry))
|
| plugins_info.append(info)
|
|
|
| plugins_info.sort(key=lambda p: (not p['system'], p['name']))
|
| return plugins_info
|
|
|
| def _remove_readonly(self, func, path, exc_info):
|
| if not os.access(path, os.W_OK):
|
| os.chmod(path, stat.S_IWRITE)
|
| func(path)
|
| else:
|
| raise
|
|
|
| def _is_plugin_uninstallable(self, plugin_id: str) -> bool:
|
| if plugin_id in SYSTEM_PLUGINS:
|
| return False
|
| if plugin_id in BUNDLED_PLUGINS:
|
| return False
|
| try:
|
| for info in self.get_plugins_info():
|
| if info.get('id') == plugin_id:
|
| return bool(info.get('uninstallable', True))
|
| except Exception:
|
| pass
|
| return True
|
|
|
| def uninstall_plugin(self, plugin_id: str):
|
| if not plugin_id:
|
| return "[Error] No plugin selected for uninstallation."
|
|
|
| if plugin_id in SYSTEM_PLUGINS:
|
| return f"[Error] Cannot uninstall system plugin '{plugin_id}'."
|
| if not self._is_plugin_uninstallable(plugin_id):
|
| return f"[Error] Cannot uninstall protected plugin '{plugin_id}'."
|
|
|
| target_dir = os.path.join(self.plugins_dir, plugin_id)
|
| if not os.path.isdir(target_dir):
|
| return f"[Error] Plugin '{plugin_id}' directory not found."
|
| self._add_pending_deletion(plugin_id)
|
|
|
| try:
|
| shutil.rmtree(target_dir, onerror=self._remove_readonly)
|
| return f"[Success] Plugin '{plugin_id}' uninstalled. Please restart WanGP."
|
| except Exception as e:
|
| return f"[Error] Failed to remove plugin '{plugin_id}': {e}"
|
|
|
| def update_plugin(self, plugin_id: str, progress=None):
|
| if not plugin_id:
|
| return "[Error] No plugin selected for update."
|
|
|
| target_dir = os.path.join(self.plugins_dir, plugin_id)
|
| if not os.path.isdir(os.path.join(target_dir, '.git')):
|
| return f"[Error] '{plugin_id}' is not a git repository and cannot be updated automatically."
|
|
|
| try:
|
| if progress is not None: progress(0, desc=f"Updating '{plugin_id}'...")
|
| repo = git.Repo(target_dir)
|
| if not repo.remotes:
|
| return f"[Error] Update failed: no git remote configured for '{plugin_id}'."
|
| origin = repo.remotes.origin
|
|
|
| if progress is not None: progress(0.2, desc=f"Fetching updates for '{plugin_id}'...")
|
| origin.fetch()
|
|
|
| local_commit = repo.head.commit
|
| try:
|
| branch_name = repo.active_branch.name
|
| remote_ref = origin.refs[branch_name]
|
| remote_commit = remote_ref.commit
|
| except Exception:
|
| return f"[Error] Update failed: could not resolve remote branch for '{plugin_id}'."
|
|
|
| if local_commit == remote_commit:
|
| return f"[Info] Plugin '{plugin_id}' is already up to date."
|
|
|
| if progress is not None: progress(0.6, desc=f"Pulling updates for '{plugin_id}'...")
|
| origin.pull()
|
|
|
| requirements_path = os.path.join(target_dir, 'requirements.txt')
|
| if os.path.exists(requirements_path):
|
| if progress is not None: progress(0.8, desc=f"Re-installing dependencies for '{plugin_id}'...")
|
| subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', requirements_path])
|
|
|
| if progress is not None: progress(1.0, desc="Update complete.")
|
| return f"[Success] Plugin '{plugin_id}' updated. Please restart WanGP for changes to take effect."
|
| except git.exc.GitCommandError as e:
|
| traceback.print_exc()
|
| stderr = (e.stderr or str(e)).strip()
|
| lowered = stderr.lower()
|
| if any(token in lowered for token in ("not found", "repository", "could not read from remote")):
|
| return f"[Error] Update failed: remote repository not found or unreachable for '{plugin_id}'."
|
| if any(token in lowered for token in ("authentication", "access denied", "permission denied")):
|
| return f"[Error] Update failed: access denied to remote for '{plugin_id}'."
|
| return f"[Error] Git update failed for '{plugin_id}': {stderr}"
|
| except Exception as e:
|
| traceback.print_exc()
|
| return f"[Error] An unexpected error occurred during update of '{plugin_id}': {str(e)}"
|
|
|
| def reinstall_plugin(self, plugin_id: str, progress=None):
|
| if not plugin_id:
|
| return "[Error] No plugin selected for reinstallation."
|
|
|
| target_dir = os.path.join(self.plugins_dir, plugin_id)
|
| if not os.path.isdir(target_dir):
|
| return f"[Error] Plugin '{plugin_id}' not found."
|
|
|
| git_url = None
|
| if os.path.isdir(os.path.join(target_dir, '.git')):
|
| try:
|
| repo = git.Repo(target_dir)
|
| git_url = repo.remotes.origin.url
|
| except Exception as e:
|
| traceback.print_exc()
|
| return f"[Error] Could not get remote URL for '{plugin_id}': {e}"
|
|
|
| if not git_url:
|
| return f"[Error] Could not determine remote URL for '{plugin_id}'. Cannot reinstall."
|
|
|
| if progress is not None: progress(0, desc=f"Reinstalling '{plugin_id}'...")
|
|
|
| backup_dir = f"{target_dir}.bak"
|
| if os.path.exists(backup_dir):
|
| try:
|
| shutil.rmtree(backup_dir, onerror=self._remove_readonly)
|
| except Exception as e:
|
| return f"[Error] Could not remove old backup directory '{backup_dir}'. Please remove it manually and try again. Error: {e}"
|
|
|
| try:
|
| if progress is not None: progress(0.2, desc=f"Moving old version of '{plugin_id}' aside...")
|
| os.rename(target_dir, backup_dir)
|
| except OSError as e:
|
| traceback.print_exc()
|
| return f"[Error] Could not move the existing plugin directory for '{plugin_id}'. It may be in use by another process. Please close any file explorers or editors in that folder and try again. Error: {e}"
|
|
|
| install_msg = self.install_plugin_from_url(git_url, progress=progress)
|
|
|
| if "[Success]" in install_msg:
|
| try:
|
| shutil.rmtree(backup_dir, onerror=self._remove_readonly)
|
| except Exception:
|
| pass
|
| return f"[Success] Plugin '{plugin_id}' reinstalled. Please restart WanGP."
|
| else:
|
| try:
|
| os.rename(backup_dir, target_dir)
|
| return f"[Error] Reinstallation failed during install step: {install_msg}. The original plugin has been restored."
|
| except Exception as restore_e:
|
| return f"[CRITICAL ERROR] Reinstallation failed AND could not restore backup. Plugin '{plugin_id}' is now in a broken state. Please manually rename '{backup_dir}' back to '{target_dir}'. Original error: {install_msg}. Restore error: {restore_e}"
|
|
|
| def install_plugin_from_url(self, git_url: str, progress=None):
|
| cleaned_url = normalize_plugin_url(git_url)
|
| if not cleaned_url or not cleaned_url.startswith("https://github.com/"):
|
| return "[Error] Invalid URL."
|
|
|
| try:
|
| repo_name = plugin_id_from_url(cleaned_url)
|
| if not repo_name:
|
| return "[Error] Invalid URL."
|
| target_dir = os.path.join(self.plugins_dir, repo_name)
|
|
|
| if os.path.exists(target_dir):
|
| return f"[Warning] Plugin '{repo_name}' already exists. Please remove it manually to reinstall."
|
|
|
| if progress is not None: progress(0.1, desc=f"Cloning '{repo_name}'...")
|
| git.Repo.clone_from(cleaned_url, target_dir)
|
|
|
| plugin_entry = os.path.join(target_dir, "plugin.py")
|
| if not os.path.isfile(plugin_entry):
|
| shutil.rmtree(target_dir, onerror=self._remove_readonly)
|
| return "[Error] Invalid Plugin."
|
|
|
| requirements_path = os.path.join(target_dir, 'requirements.txt')
|
| if os.path.exists(requirements_path):
|
| if progress is not None: progress(0.5, desc=f"Installing dependencies for '{repo_name}'...")
|
| try:
|
| subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', requirements_path])
|
| except subprocess.CalledProcessError as e:
|
| traceback.print_exc()
|
| return f"[Error] Failed to install dependencies for {repo_name}. Check console for details. Error: {e}"
|
|
|
| setup_path = os.path.join(target_dir, 'setup.py')
|
| if os.path.exists(setup_path):
|
| if progress is not None: progress(0.8, desc=f"Running setup for '{repo_name}'...")
|
| try:
|
| subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-e', target_dir])
|
| except subprocess.CalledProcessError as e:
|
| traceback.print_exc()
|
| return f"[Error] Failed to run setup.py for {repo_name}. Check console for details. Error: {e}"
|
|
|
| init_path = os.path.join(target_dir, '__init__.py')
|
| if not os.path.exists(init_path):
|
| with open(init_path, 'w') as f:
|
| pass
|
|
|
| self._strip_uninstallable_flag(target_dir)
|
|
|
| if progress is not None: progress(1.0, desc="Installation complete.")
|
| self._clear_pending_deletion(repo_name)
|
| return f"[Success] Plugin '{repo_name}' installed. Please enable it in the list and restart WanGP."
|
|
|
| except git.exc.GitCommandError as e:
|
| traceback.print_exc()
|
| stderr = (e.stderr or str(e)).strip()
|
| lowered = stderr.lower()
|
| if any(token in lowered for token in ("not found", "repository", "fatal", "could not read from remote")):
|
| return "[Error] Invalid URL."
|
| return f"[Error] Git clone failed: {stderr}"
|
| except Exception as e:
|
| traceback.print_exc()
|
| return f"[Error] An unexpected error occurred: {str(e)}"
|
|
|
| def _strip_uninstallable_flag(self, plugin_dir: str) -> None:
|
| if not plugin_dir or not os.path.isdir(plugin_dir):
|
| return
|
| metadata_path = os.path.join(plugin_dir, PLUGIN_METADATA_FILENAME)
|
| payload = self._load_json_file(metadata_path)
|
| if not isinstance(payload, dict):
|
| return
|
| if "uninstallable" not in payload:
|
| return
|
| if not self._coerce_bool(payload.get("uninstallable"), default=True):
|
| payload.pop("uninstallable", None)
|
| try:
|
| with open(metadata_path, "w", encoding="utf-8") as writer:
|
| json.dump(payload, writer, indent=2, ensure_ascii=True)
|
| except Exception as e:
|
| print(f"[PluginManager] Failed to update {metadata_path}: {e}")
|
|
|
| def discover_plugins(self) -> List[str]:
|
| discovered = []
|
| for item in os.listdir(self.plugins_dir):
|
| path = os.path.join(self.plugins_dir, item)
|
| if os.path.isdir(path) and os.path.exists(os.path.join(path, '__init__.py')):
|
| discovered.append(item)
|
| return sorted(discovered)
|
|
|
| def load_plugins_from_directory(self, enabled_user_plugins: List[str], safe_mode: bool = False) -> None:
|
| self.custom_js_snippets = []
|
| if safe_mode:
|
| print("[Safe Mode] User plugins are disabled. Only system plugins will be loaded.")
|
| plugins_to_load = SYSTEM_PLUGINS
|
| else:
|
| plugins_to_load = SYSTEM_PLUGINS + [p for p in enabled_user_plugins if p not in SYSTEM_PLUGINS]
|
|
|
| for plugin_dir_name in self.discover_plugins():
|
| if plugin_dir_name not in plugins_to_load:
|
| continue
|
| try:
|
| module = importlib.import_module(f"{plugin_dir_name}.plugin")
|
| plugin_path = os.path.join(self.plugins_dir, plugin_dir_name)
|
| metadata = self._load_plugin_metadata(plugin_path)
|
| is_bundled = plugin_dir_name in BUNDLED_PLUGINS
|
|
|
| for name, obj in inspect.getmembers(module, inspect.isclass):
|
| if issubclass(obj, WAN2GPPlugin) and obj != WAN2GPPlugin:
|
| plugin = obj()
|
| self._apply_metadata_to_plugin(plugin, metadata, plugin_dir_name in SYSTEM_PLUGINS)
|
| if is_bundled:
|
| plugin.uninstallable = False
|
| plugin.setup_ui()
|
| self.plugins[plugin_dir_name] = plugin
|
| if plugin.custom_js_snippets:
|
| self.custom_js_snippets.extend(plugin.custom_js_snippets)
|
| for hook_name, callbacks in plugin._data_hooks.items():
|
| if hook_name not in self.data_hooks:
|
| self.data_hooks[hook_name] = []
|
| self.data_hooks[hook_name].extend(callbacks)
|
| if plugin_dir_name not in SYSTEM_PLUGINS:
|
| print(f"Loaded plugin: {plugin.name} (from {plugin_dir_name})")
|
| break
|
| except Exception as e:
|
| print(f"Error loading plugin from directory {plugin_dir_name}: {e}")
|
| traceback.print_exc()
|
|
|
| def get_all_plugins(self) -> Dict[str, WAN2GPPlugin]:
|
| return self.plugins.copy()
|
|
|
| def get_custom_js(self) -> str:
|
| if not self.custom_js_snippets:
|
| return ""
|
| return "\n".join(self.custom_js_snippets)
|
|
|
| def inject_globals(self, global_references: Dict[str, Any]) -> None:
|
| for plugin_id, plugin in self.plugins.items():
|
| try:
|
| if 'set_wgp_global' in global_references:
|
| plugin._set_wgp_global_func = global_references['set_wgp_global']
|
| for global_name in plugin.global_requests:
|
| if global_name in self.restricted_globals:
|
| setattr(plugin, global_name, None)
|
| elif global_name in global_references:
|
| setattr(plugin, global_name, global_references[global_name])
|
| except Exception as e:
|
| print(f" [!] ERROR injecting globals for {plugin_id}: {str(e)}")
|
|
|
| def update_global_reference(self, global_name: str, new_value: Any) -> None:
|
| safe_value = None if global_name in self.restricted_globals else new_value
|
| for plugin_id, plugin in self.plugins.items():
|
| try:
|
| if hasattr(plugin, '_global_requests') and global_name in plugin._global_requests:
|
| setattr(plugin, global_name, safe_value)
|
| except Exception as e:
|
| print(f" [!] ERROR updating global '{global_name}' for plugin {plugin_id}: {str(e)}")
|
|
|
| def setup_ui(self) -> Dict[str, Dict[str, Any]]:
|
| tabs = {}
|
| for plugin_id, plugin in self.plugins.items():
|
| try:
|
| for tab_id, tab in plugin.tabs.items():
|
| tabs[tab_id] = {
|
| 'label': tab.label,
|
| 'component_constructor': tab.component_constructor,
|
| 'position': tab.position
|
| }
|
| except Exception as e:
|
| print(f"Error in setup_ui for plugin {plugin_id}: {str(e)}")
|
| return {'tabs': tabs}
|
|
|
| def run_data_hooks(self, hook_name: str, *args, **kwargs):
|
| if hook_name not in self.data_hooks:
|
| return kwargs.get('configs')
|
|
|
| callbacks = self.data_hooks[hook_name]
|
| data = kwargs.get('configs')
|
|
|
| if 'configs' in kwargs:
|
| kwargs.pop('configs')
|
|
|
| for callback in callbacks:
|
| try:
|
| data = callback(data, **kwargs)
|
| except Exception as e:
|
| print(f"[PluginManager] Error running hook '{hook_name}' from {callback.__module__}: {e}")
|
| traceback.print_exc()
|
| return data
|
|
|
| def run_component_insertion_and_setup(self, all_components: Dict[str, Any]):
|
| all_insert_requests: List[InsertAfterRequest] = []
|
|
|
| for plugin_id, plugin in self.plugins.items():
|
| try:
|
| for comp_id in plugin.component_requests:
|
| if comp_id in all_components and (not hasattr(plugin, comp_id) or getattr(plugin, comp_id) is None):
|
| setattr(plugin, comp_id, all_components[comp_id])
|
|
|
| requested_components = {
|
| comp_id: all_components[comp_id]
|
| for comp_id in plugin.component_requests
|
| if comp_id in all_components
|
| }
|
|
|
| plugin.post_ui_setup(requested_components)
|
|
|
| insert_requests = getattr(plugin, '_insert_after_requests', [])
|
| if insert_requests:
|
| all_insert_requests.extend(insert_requests)
|
| plugin._insert_after_requests.clear()
|
|
|
| except Exception as e:
|
| print(f"[PluginManager] ERROR in post_ui_setup for {plugin_id}: {str(e)}")
|
| traceback.print_exc()
|
|
|
| if all_insert_requests:
|
| for request in all_insert_requests:
|
| try:
|
| target = all_components.get(request.target_component_id)
|
| parent = getattr(target, 'parent', None)
|
| if not target or not parent or not hasattr(parent, 'children'):
|
| print(f"[PluginManager] ERROR: Target '{request.target_component_id}' for insertion not found or invalid.")
|
| continue
|
|
|
| target_index = parent.children.index(target)
|
| with parent:
|
| new_component = request.new_component_constructor()
|
|
|
| newly_added = parent.children.pop(-1)
|
| parent.children.insert(target_index + 1, newly_added)
|
|
|
| except Exception as e:
|
| print(f"[PluginManager] ERROR processing insert_after for {request.target_component_id}: {str(e)}")
|
| traceback.print_exc()
|
|
|
| class WAN2GPApplication:
|
| def __init__(self):
|
| self.plugin_manager = PluginManager()
|
| self.tab_to_plugin_map: Dict[str, WAN2GPPlugin] = {}
|
| self.all_rendered_tabs: List[gr.Tab] = []
|
| self.enabled_plugins: List[str] = []
|
|
|
| def initialize_plugins(self, wgp_globals: dict):
|
| if not hasattr(self, 'plugin_manager'):
|
| return
|
|
|
| safe_mode = wgp_globals.get("SAFE_MODE", False)
|
|
|
| if not safe_mode:
|
| auto_install_and_enable_default_plugins(self.plugin_manager, wgp_globals)
|
|
|
| server_config = wgp_globals.get("server_config")
|
| server_config_filename = wgp_globals.get("server_config_filename", "")
|
| if not server_config:
|
| print("[PluginManager] ERROR: server_config not found in globals.")
|
| return
|
| self.plugin_manager.set_server_config(server_config, server_config_filename)
|
| self.plugin_manager.cleanup_pending_deletions()
|
|
|
| self.enabled_plugins = server_config.get("enabled_plugins", [])
|
|
|
| self.plugin_manager.load_plugins_from_directory(self.enabled_plugins, safe_mode=safe_mode)
|
| self.plugin_manager.inject_globals(wgp_globals)
|
|
|
| def setup_ui_tabs(self, main_tabs_component: gr.Tabs, state_component: gr.State, set_save_form_event):
|
| self._create_plugin_tabs(main_tabs_component, state_component)
|
| self._setup_tab_events(main_tabs_component, state_component, set_save_form_event)
|
|
|
| def _create_plugin_tabs(self, main_tabs, state):
|
| if not hasattr(self, 'plugin_manager'):
|
| return
|
|
|
| loaded_plugins = self.plugin_manager.get_all_plugins()
|
| system_tabs, user_tabs = [], []
|
| system_order = {pid: idx for idx, pid in enumerate(SYSTEM_PLUGINS)}
|
|
|
| for plugin_id, plugin in loaded_plugins.items():
|
| for tab_id, tab in plugin.tabs.items():
|
| self.tab_to_plugin_map[tab.label] = plugin
|
| tab_info = {
|
| 'id': tab_id,
|
| 'label': tab.label,
|
| 'component_constructor': tab.component_constructor,
|
| 'position': system_order.get(plugin_id, tab.position),
|
| 'plugin_id': plugin_id,
|
| }
|
| if plugin_id in SYSTEM_PLUGINS:
|
| system_tabs.append(tab_info)
|
| else:
|
| user_tabs.append((plugin_id, tab_info))
|
|
|
|
|
| system_tabs_sorted = sorted(
|
| system_tabs,
|
| key=lambda t: (system_order.get(t['plugin_id'], 1_000_000), t['label']),
|
| )
|
| pre_user_tabs = system_tabs_sorted[:USER_PLUGIN_INSERT_POSITION]
|
| post_user_tabs = system_tabs_sorted[USER_PLUGIN_INSERT_POSITION:]
|
|
|
| sorted_user_tabs = [tab_info for plugin_id in self.enabled_plugins for pid, tab_info in user_tabs if pid == plugin_id]
|
|
|
| all_tabs_to_render = pre_user_tabs + sorted_user_tabs + post_user_tabs
|
|
|
| def goto_video_tab(state):
|
| self._handle_tab_selection(state, None)
|
| return gr.Tabs(selected="video_gen")
|
|
|
|
|
| for tab_info in all_tabs_to_render:
|
| with gr.Tab(tab_info['label'], id=f"plugin_{tab_info['id']}") as new_tab:
|
| self.all_rendered_tabs.append(new_tab)
|
| plugin = self.tab_to_plugin_map[new_tab.label]
|
| plugin.goto_video_tab = goto_video_tab
|
| tab_info['component_constructor']()
|
|
|
|
|
| def _setup_tab_events(self, main_tabs_component: gr.Tabs, state_component: gr.State, set_save_form_event):
|
| if main_tabs_component and state_component:
|
| main_tabs_component.select(
|
| fn=self._handle_tab_selection,
|
| inputs=[state_component],
|
| outputs=None,
|
| show_progress="hidden",
|
| )
|
|
|
|
|
| for tab in self.all_rendered_tabs:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| plugin = self.tab_to_plugin_map[tab.label]
|
|
|
|
|
| event = set_save_form_event(tab.select)
|
| event.then(
|
| fn=self._handle_one_tab_selection,
|
| inputs=[state_component, gr.State(tab.label)],
|
| outputs=plugin.on_tab_outputs if hasattr(plugin, "on_tab_outputs") else None,
|
| show_progress="hidden",
|
| trigger_mode="multiple",
|
| )
|
|
|
|
|
| def _handle_tab_selection(self, state: dict, evt: gr.SelectData):
|
| if not hasattr(self, 'previous_tab_id'):
|
| self.previous_tab_id = video_gen_label
|
|
|
| new_tab_id = video_gen_label if evt is None else evt.value
|
|
|
| if self.previous_tab_id == new_tab_id:
|
| return
|
|
|
| if self.previous_tab_id and self.previous_tab_id in self.tab_to_plugin_map:
|
| plugin_to_deselect = self.tab_to_plugin_map[self.previous_tab_id]
|
| try:
|
| plugin_to_deselect.on_tab_deselect(state)
|
| except Exception as e:
|
| print(f"[PluginManager] Error in on_tab_deselect for plugin {plugin_to_deselect.name}: {e}")
|
| traceback.print_exc()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| self.previous_tab_id = new_tab_id
|
|
|
| def _handle_one_tab_selection(self, state: dict, new_tab_id):
|
| plugin_to_select = self.tab_to_plugin_map.get(new_tab_id, None)
|
| try:
|
| ret = plugin_to_select.on_tab_select(state)
|
| except Exception as e:
|
| print(f"[PluginManager] Error in on_tab_select for plugin {plugin_to_select.name}: {e}")
|
| traceback.print_exc()
|
| ret = None
|
| return ret
|
|
|
| def run_component_insertion(self, components_dict: Dict[str, Any]):
|
| if hasattr(self, 'plugin_manager'):
|
| self.plugin_manager.run_component_insertion_and_setup(components_dict)
|
|
|