import gradio as gr
import json
import traceback
from wgp import quit_application, WanGP_version
from shared.utils.plugins import WAN2GPPlugin, compare_release_metadata, is_wangp_compatible, plugin_id_from_url
class PluginManagerUIPlugin(WAN2GPPlugin):
def __init__(self):
super().__init__()
self.name = "Plugin Manager UI"
self.version = "1.8.0"
self.description = "A built-in UI for managing, installing, and updating Wan2GP plugins"
def setup_ui(self):
self.request_global("app")
self.request_global("server_config")
self.request_global("server_config_filename")
self.request_component("main")
self.request_component("main_tabs")
self.add_tab(
tab_id="plugin_manager_tab",
label="Plugins",
component_constructor=self.create_plugin_manager_ui,
)
def _get_js_script_html(self):
js_code = """
() => {
function pluginRoot() {
if (window.gradioApp) {
return window.gradioApp();
}
const app = document.querySelector('gradio-app');
return app ? (app.shadowRoot || app) : document;
}
function updateGradioInput(elem_id, value) {
const root = pluginRoot();
const input = root.querySelector(`#${elem_id} textarea, #${elem_id} input`);
if (input) {
input.value = value;
input.dispatchEvent(new Event('input', { bubbles: true }));
return true;
}
return false;
}
function makeSortable() {
const root = pluginRoot();
const userPluginList = root.querySelector('#user-plugin-list');
if (!userPluginList) return;
if (userPluginList.dataset.sortableBound === '1') return;
userPluginList.dataset.sortableBound = '1';
let draggedItem = null;
userPluginList.addEventListener('dragstart', e => {
draggedItem = e.target.closest('.plugin-item');
if (!draggedItem) return;
draggedItem.classList.add('dragging');
if (e.dataTransfer) {
e.dataTransfer.effectAllowed = 'move';
e.dataTransfer.setData('text/plain', draggedItem.dataset.pluginId || '');
}
setTimeout(() => {
if (draggedItem) draggedItem.style.opacity = '0.5';
}, 0);
});
userPluginList.addEventListener('dragend', e => {
setTimeout(() => {
if (draggedItem) {
draggedItem.style.opacity = '1';
draggedItem.classList.remove('dragging');
draggedItem = null;
}
}, 0);
});
userPluginList.addEventListener('dragover', e => {
e.preventDefault();
if (e.dataTransfer) {
e.dataTransfer.dropEffect = 'move';
}
const afterElement = getDragAfterElement(userPluginList, e.clientY);
if (draggedItem) {
if (afterElement === draggedItem) return;
if (afterElement == null) {
userPluginList.appendChild(draggedItem);
} else {
userPluginList.insertBefore(draggedItem, afterElement);
}
}
});
userPluginList.addEventListener('drop', e => {
e.preventDefault();
});
function getDragAfterElement(container, y) {
const draggableElements = [...container.querySelectorAll('.plugin-item:not(.dragging)')];
return draggableElements.reduce((closest, child) => {
const box = child.getBoundingClientRect();
const offset = y - box.top - box.height / 2;
if (offset < 0 && offset > closest.offset) {
return { offset: offset, element: child };
} else {
return closest;
}
}, { offset: Number.NEGATIVE_INFINITY }).element;
}
}
function observeUserPluginList() {
const root = pluginRoot();
if (!root) {
setTimeout(observeUserPluginList, 400);
return;
}
if (root.dataset.pluginSortableObserver === '1') {
makeSortable();
return;
}
root.dataset.pluginSortableObserver = '1';
const observer = new MutationObserver(() => {
makeSortable();
});
observer.observe(root, { childList: true, subtree: true });
makeSortable();
}
setTimeout(observeUserPluginList, 200);
setTimeout(makeSortable, 500);
window.handlePluginAction = function(button, action) {
const pluginItem = button.closest('.plugin-item');
const pluginId = pluginItem.dataset.pluginId;
const payload = JSON.stringify({ action: action, plugin_id: pluginId });
updateGradioInput('plugin_action_input', payload);
};
window.handleStoreInstall = function(button, url) {
const payload = JSON.stringify({ action: 'install_from_store', url: url });
updateGradioInput('plugin_action_input', payload);
};
window.handleSave = function(restart) {
const root = pluginRoot();
const user_container = root.querySelector('#user-plugin-list');
if (!user_container) return;
const user_plugins = user_container.querySelectorAll('.plugin-item');
const enabledUserPlugins = Array.from(user_plugins)
.filter(item => item.querySelector('.plugin-enable-checkbox').checked)
.map(item => item.dataset.pluginId);
const payload = JSON.stringify({ restart: restart, enabled_plugins: enabledUserPlugins });
updateGradioInput('save_action_input', payload);
};
}
"""
return f"{js_code}"
def _get_community_plugins_info(self):
if hasattr(self, '_community_plugins_cache') and self._community_plugins_cache is not None:
return self._community_plugins_cache
try:
self._community_plugins_cache = self.app.plugin_manager.get_merged_catalog_entries(use_remote=True)
return self._community_plugins_cache
except Exception as e:
print(f"[PluginManager] Could not fetch community plugins info: {e}")
self._community_plugins_cache = {}
return {}
def _build_community_plugins_html(self):
try:
installed_plugin_ids = {p['id'] for p in self.app.plugin_manager.get_plugins_info()}
remote_plugins = self._get_community_plugins_info()
base_entries = self.app.plugin_manager.load_catalog_entries(use_remote=False)
base_ids = {
plugin_id_from_url(entry.get('url', ''))
for entry in base_entries
if entry.get('url')
}
community_plugins = [
p for plugin_id, p in remote_plugins.items()
if plugin_id not in installed_plugin_ids
]
community_plugins.sort(
key=lambda p: (
plugin_id_from_url(p.get('url', '')) not in base_ids,
(p.get('name') or '').lower()
)
)
except Exception as e:
gr.Warning(f"Could not process community plugins list: {e}")
return "
Failed to load community plugins.
"
if not community_plugins:
return "
All available community plugins are already installed.
"
items_html = ""
for plugin in community_plugins:
name = plugin.get('name')
author = plugin.get('author') or "Unknown"
version = plugin.get('version', 'N/A')
description = plugin.get('description') or "No description provided."
url = plugin.get('url')
wan2gp_version = plugin.get('wan2gp_version') or plugin.get('wangp_version', '')
if not url:
continue
if not name:
name = plugin_id_from_url(url) or "Unknown Plugin"
safe_url = url.replace("'", "\\'")
incompatible = not is_wangp_compatible(wan2gp_version, WanGP_version)
incompat_html = ""
if incompatible and wan2gp_version:
incompat_html = (
f""
f"Requires WanGP v{wan2gp_version}+"
""
)
items_html += f"""
{name}version {version} by {author}
{incompat_html}
{description}
"""
return f"
{items_html}
"
def _build_plugins_html(self):
plugins_info = self.app.plugin_manager.get_plugins_info()
enabled_user_plugins = self.server_config.get("enabled_plugins", [])
all_user_plugins_info = [p for p in plugins_info if not p.get('system')]
remote_plugins_info = self.app.plugin_manager.get_merged_catalog_entries(use_remote=False)
css = """
"""
instruction_html = "
Please Select the Plugins you want to enable
"
if not all_user_plugins_info:
user_html = "
No user-installed plugins found.
"
else:
user_plugins_map = {p['id']: p for p in all_user_plugins_info}
user_plugins = []
for plugin_id in enabled_user_plugins:
if plugin_id in user_plugins_map:
user_plugins.append(user_plugins_map.pop(plugin_id))
user_plugins.extend(sorted(user_plugins_map.values(), key=lambda p: p['name']))
user_items_html = ""
for plugin in user_plugins:
plugin_id = plugin['id']
checked = "checked" if plugin_id in enabled_user_plugins else ""
uninstallable = plugin.get('uninstallable', True)
author = plugin.get('author') or "Unknown"
update_notice_html = ''
item_classes = []
if uninstallable and plugin_id in remote_plugins_info:
remote_entry = remote_plugins_info[plugin_id]
if compare_release_metadata(remote_entry, plugin) > 0:
remote_version = remote_entry.get('version') or remote_entry.get('date') or "unknown"
update_notice_html = (
f'New version {remote_version} is available !'
)
item_classes.append('update-available')
bundled_badge_html = ''
if not uninstallable:
bundled_badge_html = 'Bundled'
wan2gp_version = plugin.get('wan2gp_version') or plugin.get('wangp_version', '')
incompatible = not is_wangp_compatible(wan2gp_version, WanGP_version)
incompat_html = ''
if incompatible and wan2gp_version:
incompat_html = (
f""
f"Requires WanGP v{wan2gp_version}+"
""
)
item_classes.append('incompatible')
actions_html = ""
if uninstallable:
actions_html = """
"""
actions_container_html = f'