import gradio as gr
import json
import traceback
from shared.utils.plugins import WAN2GPPlugin, compare_release_metadata, is_wangp_compatible, plugin_id_from_url
class PluginManagerUIPlugin(WAN2GPPlugin):
def __init__(self):
super().__init__()
self.name = "Plugin Manager UI"
self.version = "1.8.0"
self.description = "A built-in UI for managing, installing, and updating Wan2GP plugins"
self.WanGP_version = ""
self.quit_application = None
self.restart_application = None
def setup_ui(self):
self.request_global("app")
self.request_global("server_config")
self.request_global("server_config_filename")
self.request_global("quit_application")
self.request_global("restart_application")
self.request_global("WanGP_version")
self.request_component("main")
self.request_component("main_tabs")
self.add_tab(
tab_id="plugin_manager_tab",
label="Plugins",
component_constructor=self.create_plugin_manager_ui,
)
def _get_js_script_html(self):
js_code = """
() => {
function pluginRoot() {
if (window.gradioApp) {
return window.gradioApp();
}
const app = document.querySelector('gradio-app');
return app ? (app.shadowRoot || app) : document;
}
function updateGradioInput(elem_id, value) {
const root = pluginRoot();
const input = root.querySelector(`#${elem_id} textarea, #${elem_id} input`);
if (input) {
input.value = value;
input.dispatchEvent(new Event('input', { bubbles: true }));
return true;
}
return false;
}
function makeSortable() {
const root = pluginRoot();
const userPluginList = root.querySelector('#user-plugin-list');
if (!userPluginList) return;
if (userPluginList.dataset.sortableBound === '1') return;
userPluginList.dataset.sortableBound = '1';
let draggedItem = null;
userPluginList.addEventListener('dragstart', e => {
draggedItem = e.target.closest('.plugin-item');
if (!draggedItem) return;
draggedItem.classList.add('dragging');
if (e.dataTransfer) {
e.dataTransfer.effectAllowed = 'move';
e.dataTransfer.setData('text/plain', draggedItem.dataset.pluginId || '');
}
setTimeout(() => {
if (draggedItem) draggedItem.style.opacity = '0.5';
}, 0);
});
userPluginList.addEventListener('dragend', e => {
setTimeout(() => {
if (draggedItem) {
draggedItem.style.opacity = '1';
draggedItem.classList.remove('dragging');
draggedItem = null;
}
}, 0);
});
userPluginList.addEventListener('dragover', e => {
e.preventDefault();
if (e.dataTransfer) {
e.dataTransfer.dropEffect = 'move';
}
const afterElement = getDragAfterElement(userPluginList, e.clientY);
if (draggedItem) {
if (afterElement === draggedItem) return;
if (afterElement == null) {
userPluginList.appendChild(draggedItem);
} else {
userPluginList.insertBefore(draggedItem, afterElement);
}
}
});
userPluginList.addEventListener('drop', e => {
e.preventDefault();
});
function getDragAfterElement(container, y) {
const draggableElements = [...container.querySelectorAll('.plugin-item:not(.dragging)')];
return draggableElements.reduce((closest, child) => {
const box = child.getBoundingClientRect();
const offset = y - box.top - box.height / 2;
if (offset < 0 && offset > closest.offset) {
return { offset: offset, element: child };
} else {
return closest;
}
}, { offset: Number.NEGATIVE_INFINITY }).element;
}
}
function observeUserPluginList() {
const root = pluginRoot();
if (!root) {
setTimeout(observeUserPluginList, 400);
return;
}
if (root.dataset.pluginSortableObserver === '1') {
makeSortable();
return;
}
root.dataset.pluginSortableObserver = '1';
const observer = new MutationObserver(() => {
makeSortable();
});
observer.observe(root, { childList: true, subtree: true });
makeSortable();
}
setTimeout(observeUserPluginList, 200);
setTimeout(makeSortable, 500);
window.handlePluginAction = function(button, action) {
const pluginItem = button.closest('.plugin-item');
const pluginId = pluginItem.dataset.pluginId;
const payload = JSON.stringify({ action: action, plugin_id: pluginId });
updateGradioInput('plugin_action_input', payload);
};
window.handleStoreInstall = function(button, url) {
const payload = JSON.stringify({ action: 'install_from_store', url: url });
updateGradioInput('plugin_action_input', payload);
};
window.handleSave = function(restart) {
const root = pluginRoot();
const user_container = root.querySelector('#user-plugin-list');
if (!user_container) return;
const user_plugins = user_container.querySelectorAll('.plugin-item');
const enabledUserPlugins = Array.from(user_plugins)
.filter(item => item.querySelector('.plugin-enable-checkbox').checked)
.map(item => item.dataset.pluginId);
const payload = JSON.stringify({ restart: restart, enabled_plugins: enabledUserPlugins });
updateGradioInput('save_action_input', payload);
if (restart) {
setTimeout(() => {
document.body.innerHTML = "
WanGP is restarting... You can safely close this tab. A new tab will open shortly. ";
window.open('', '_self', '');
window.close();
}, 1000);
}
};
}
"""
return f"{js_code}"
def _get_community_plugins_info(self):
if hasattr(self, '_community_plugins_cache') and self._community_plugins_cache is not None:
return self._community_plugins_cache
try:
self._community_plugins_cache = self.app.plugin_manager.get_merged_catalog_entries(use_remote=True)
return self._community_plugins_cache
except Exception as e:
print(f"[PluginManager] Could not fetch community plugins info: {e}")
self._community_plugins_cache = {}
return {}
def _build_community_plugins_html(self):
try:
installed_plugin_ids = {p['id'] for p in self.app.plugin_manager.get_plugins_info()}
remote_plugins = self._get_community_plugins_info()
base_entries = self.app.plugin_manager.load_catalog_entries(use_remote=False)
base_ids = {
plugin_id_from_url(entry.get('url', ''))
for entry in base_entries
if entry.get('url')
}
community_plugins = [
p for plugin_id, p in remote_plugins.items()
if plugin_id not in installed_plugin_ids
]
community_plugins.sort(
key=lambda p: (
plugin_id_from_url(p.get('url', '')) not in base_ids,
(p.get('name') or '').lower()
)
)
except Exception as e:
gr.Warning(f"Could not process community plugins list: {e}")
return "Failed to load community plugins.
"
if not community_plugins:
return "All available community plugins are already installed.
"
items_html = ""
for plugin in community_plugins:
name = plugin.get('name')
author = plugin.get('author') or "Unknown"
version = plugin.get('version', 'N/A')
description = plugin.get('description') or "No description provided."
url = plugin.get('url')
wan2gp_version = plugin.get('wan2gp_version') or plugin.get('wangp_version', '')
if not url:
continue
if not name:
name = plugin_id_from_url(url) or "Unknown Plugin"
safe_url = url.replace("'", "\\'")
incompatible = not is_wangp_compatible(wan2gp_version, self.WanGP_version)
incompat_html = ""
if incompatible and wan2gp_version:
incompat_html = (
f""
f"Requires WanGP v{wan2gp_version}+"
" "
)
items_html += f"""
"""
return f"{items_html}
"
def _build_plugins_html(self):
plugins_info = self.app.plugin_manager.get_plugins_info()
enabled_user_plugins = self.server_config.get("enabled_plugins", [])
all_user_plugins_info = [p for p in plugins_info if not p.get('system')]
remote_plugins_info = self.app.plugin_manager.get_merged_catalog_entries(use_remote=False)
css = """
"""
instruction_html = "Please Select the Plugins you want to enable
"
if not all_user_plugins_info:
user_html = "No user-installed plugins found.
"
else:
user_plugins_map = {p['id']: p for p in all_user_plugins_info}
user_plugins = []
for plugin_id in enabled_user_plugins:
if plugin_id in user_plugins_map:
user_plugins.append(user_plugins_map.pop(plugin_id))
user_plugins.extend(sorted(user_plugins_map.values(), key=lambda p: p['name']))
user_items_html = ""
for plugin in user_plugins:
plugin_id = plugin['id']
checked = "checked" if plugin_id in enabled_user_plugins else ""
uninstallable = plugin.get('uninstallable', True)
author = plugin.get('author') or "Unknown"
update_notice_html = ''
item_classes = []
if uninstallable and plugin_id in remote_plugins_info:
remote_entry = remote_plugins_info[plugin_id]
if compare_release_metadata(remote_entry, plugin) > 0:
remote_version = remote_entry.get('version') or remote_entry.get('date') or "unknown"
update_notice_html = (
f'New version {remote_version} is available ! '
)
item_classes.append('update-available')
bundled_badge_html = ''
if not uninstallable:
bundled_badge_html = 'Bundled '
wan2gp_version = plugin.get('wan2gp_version') or plugin.get('wangp_version', '')
incompatible = not is_wangp_compatible(wan2gp_version, self.WanGP_version)
incompat_html = ''
if incompatible and wan2gp_version:
incompat_html = (
f""
f"Requires WanGP v{wan2gp_version}+"
" "
)
item_classes.append('incompatible')
actions_html = ""
if uninstallable:
actions_html = """
Update
Reinstall
Uninstall
"""
actions_container_html = f'{actions_html}
' if actions_html else ""
user_items_html += f"""
"""
user_html = f'{user_items_html}
'
return f"{css}{instruction_html}{user_html}
"
def create_plugin_manager_ui(self):
with gr.Blocks() as plugin_blocks:
with gr.Row(equal_height=False, variant='panel'):
with gr.Column(scale=2, min_width=600):
gr.Markdown("### Plugins Available Locally (Drag to reorder tabs)")
self.plugins_html_display = gr.HTML()
with gr.Row(elem_classes="save-buttons-container"):
self.save_plugins_button = gr.Button("Save", variant="secondary", size="sm", scale=0, elem_classes="stylish-save-btn")
self.save_and_restart_button = gr.Button("Save and Restart", variant="primary", size="sm", scale=0, elem_classes="stylish-save-btn")
self.refresh_catalog_button = gr.Button("Check for Updates", variant="secondary", size="sm", scale=0, elem_classes="stylish-save-btn")
with gr.Column(scale=2, min_width=300):
gr.Markdown("### Discover & Install")
self.community_plugins_html = gr.HTML()
with gr.Accordion("Install from URL", open=True):
with gr.Group():
self.plugin_url_textbox = gr.Textbox(label="GitHub URL", placeholder="https://github.com/user/wan2gp-plugin-repo")
self.install_plugin_button = gr.Button("Download and Install from URL")
with gr.Column(visible=False):
self.plugin_action_input = gr.Textbox(elem_id="plugin_action_input")
self.save_action_input = gr.Textbox(elem_id="save_action_input")
js = self._get_js_script_html()
plugin_blocks.load(fn=None, js=js)
self.main_tabs.select(
self._on_tab_select_refresh,
None,
[self.plugins_html_display, self.community_plugins_html],
show_progress="hidden"
)
self.save_plugins_button.click(fn=None, js="handleSave(false)")
self.save_and_restart_button.click(fn=None, js="handleSave(true)")
self.refresh_catalog_button.click(
fn=self._refresh_catalog,
inputs=[],
outputs=[self.plugins_html_display, self.community_plugins_html],
show_progress="full"
)
self.save_action_input.change(
fn=self._handle_save_action,
inputs=[self.save_action_input],
outputs=[self.plugins_html_display]
)
self.plugin_action_input.change(
fn=self._handle_plugin_action_from_json,
inputs=[self.plugin_action_input],
outputs=[self.plugins_html_display, self.community_plugins_html],
show_progress="full"
)
self.install_plugin_button.click(
fn=self._install_plugin_and_refresh,
inputs=[self.plugin_url_textbox],
outputs=[self.plugins_html_display, self.community_plugins_html, self.plugin_url_textbox],
show_progress="full"
)
return plugin_blocks
def _on_tab_select_refresh(self, evt: gr.SelectData):
if evt.value != "Plugins":
return gr.update(), gr.update()
if hasattr(self, '_community_plugins_cache'):
del self._community_plugins_cache
installed_html = self._build_plugins_html()
community_html = self._build_community_plugins_html()
return gr.update(value=installed_html), gr.update(value=community_html)
def _refresh_catalog(self, progress=gr.Progress()):
self.app.plugin_manager.refresh_catalog(installed_only=True, use_remote=False)
if hasattr(self, '_community_plugins_cache'):
del self._community_plugins_cache
updates_available = self._count_available_updates()
if updates_available <= 0:
gr.Info("No Plugin Update is available")
elif updates_available == 1:
gr.Info("One Plugin Update is available")
else:
gr.Info(f"{updates_available} Plugin Updates are available")
return self._build_plugins_html(), self._build_community_plugins_html()
def _count_available_updates(self) -> int:
try:
plugins_info = self.app.plugin_manager.get_plugins_info()
remote_plugins_info = self.app.plugin_manager.get_merged_catalog_entries(use_remote=False)
count = 0
for plugin in plugins_info:
if plugin.get('system'):
continue
if not plugin.get('uninstallable', True):
continue
plugin_id = plugin.get('id')
if not plugin_id or plugin_id not in remote_plugins_info:
continue
remote_entry = remote_plugins_info[plugin_id]
if compare_release_metadata(remote_entry, plugin) > 0:
count += 1
return count
except Exception:
return 0
def _enable_plugin_after_install(self, url: str):
try:
plugin_id = plugin_id_from_url(url)
enabled_plugins = self.server_config.get("enabled_plugins", [])
if plugin_id not in enabled_plugins:
enabled_plugins.append(plugin_id)
self.server_config["enabled_plugins"] = enabled_plugins
with open(self.server_config_filename, "w", encoding="utf-8") as writer:
writer.write(json.dumps(self.server_config, indent=4))
return True
except Exception as e:
gr.Warning(f"Failed to auto-enable plugin {plugin_id}: {e}")
return False
def _save_plugin_settings(self, enabled_plugins: list):
self.server_config["enabled_plugins"] = enabled_plugins
with open(self.server_config_filename, "w", encoding="utf-8") as writer:
writer.write(json.dumps(self.server_config, indent=4))
gr.Info("Plugin settings saved. Please restart WanGP for changes to take effect.")
return gr.update(value=self._build_plugins_html())
def _save_and_restart(self, enabled_plugins: list):
self.server_config["enabled_plugins"] = enabled_plugins
with open(self.server_config_filename, "w", encoding="utf-8") as writer:
writer.write(json.dumps(self.server_config, indent=4))
gr.Info("Settings saved. Restarting application...")
if callable(getattr(self, "restart_application", None)):
self.restart_application()
return
elif callable(getattr(self, "quit_application", None)):
gr.Warning("Restart hook is unavailable. WAN2GP will now quit. Please start WAN2GP again manually.")
self.quit_application()
return
def _handle_save_action(self, payload_str: str):
if not payload_str:
return gr.update(value=self._build_plugins_html())
try:
payload = json.loads(payload_str)
enabled_plugins = payload.get("enabled_plugins", [])
if payload.get("restart", False):
self._save_and_restart(enabled_plugins)
return gr.update(value=self._build_plugins_html())
else:
return self._save_plugin_settings(enabled_plugins)
except (json.JSONDecodeError, TypeError):
gr.Warning("Could not process save action due to invalid data.")
return gr.update(value=self._build_plugins_html())
def _install_plugin_and_refresh(self, url, progress=gr.Progress()):
progress(0, desc="Starting installation...")
result_message = self.app.plugin_manager.install_plugin_from_url(url, progress=progress)
if "[Success]" in result_message:
was_enabled = self._enable_plugin_after_install(url)
if was_enabled:
result_message = result_message.replace("Please enable it", "It has been auto-enabled")
plugin_id = plugin_id_from_url(url)
if plugin_id:
self.app.plugin_manager.record_plugin_metadata(plugin_id, url=url)
if hasattr(self, '_community_plugins_cache'):
del self._community_plugins_cache
gr.Info(result_message)
else:
gr.Warning(result_message)
return self._build_plugins_html(), self._build_community_plugins_html(), ""
def _handle_plugin_action_from_json(self, payload_str: str, progress=gr.Progress()):
if not payload_str:
return gr.update(), gr.update()
try:
payload = json.loads(payload_str)
action = payload.get("action")
plugin_id = payload.get("plugin_id")
if action == 'install_from_store':
url = payload.get("url")
if not url:
raise ValueError("URL is required for install_from_store action.")
result_message = self.app.plugin_manager.install_plugin_from_url(url, progress=progress)
if "[Success]" in result_message:
was_enabled = self._enable_plugin_after_install(url)
if was_enabled:
result_message = result_message.replace("Please enable it", "It has been auto-enabled")
else:
if not action or not plugin_id:
raise ValueError("Action and plugin_id are required.")
result_message = ""
if action == 'uninstall':
result_message = self.app.plugin_manager.uninstall_plugin(plugin_id)
current_enabled = self.server_config.get("enabled_plugins", [])
if plugin_id in current_enabled:
current_enabled.remove(plugin_id)
self.server_config["enabled_plugins"] = current_enabled
with open(self.server_config_filename, "w", encoding="utf-8") as writer:
writer.write(json.dumps(self.server_config, indent=4))
elif action == 'update':
result_message = self.app.plugin_manager.update_plugin(plugin_id, progress=progress)
elif action == 'reinstall':
result_message = self.app.plugin_manager.reinstall_plugin(plugin_id, progress=progress)
if "[Success]" in result_message:
gr.Info(result_message)
elif "[Error]" in result_message or "[Warning]" in result_message:
gr.Warning(result_message)
else:
gr.Info(result_message)
except (json.JSONDecodeError, ValueError) as e:
gr.Warning(f"Could not perform plugin action: {e}")
traceback.print_exc()
if hasattr(self, '_community_plugins_cache'):
del self._community_plugins_cache
return self._build_plugins_html(), self._build_community_plugins_html()