from __future__ import annotations import json import zipfile from pathlib import Path import gradio as gr from shared.utils.settings_bundle import SETTINGS_BUNDLE_ATTACHMENT_KEYS, WAN_GP_SETTINGS_SUFFIXES, is_wangp_settings_filename, load_first_settings_from_queue_zip from . import common from . import constants from . import frame_planning as frames from . import process_catalog as catalog from . import process_validation from . import system_handlers class ProcessLibrary: def __init__(self, *, get_model_def, get_lora_dir, get_base_model_type) -> None: self.get_model_def = get_model_def self.get_lora_dir = get_lora_dir self.get_base_model_type = get_base_model_type def model_type_label(self, model_type: str) -> str: if len(str(model_type or "").strip()) == 0: return "Unknown Model" handler = self.system_handler_for_model_type(str(model_type)) if handler is not None: return str(getattr(handler, "model_label", str(model_type))) try: model_def = frames.require_model_def(str(model_type), self.get_model_def) except gr.Error: return str(model_type) model_block = model_def.get("model") if isinstance(model_block, dict): model_name = str(model_block.get("name") or "").strip() if len(model_name) > 0: return model_name model_name = str(model_def.get("name") or "").strip() return model_name if len(model_name) > 0 else str(model_type) def base_model_type_for_ref(self, model_type: str) -> str: model_type = str(model_type or "").strip() base_model_type = str(self.get_base_model_type(model_type) or "").strip() if len(base_model_type) > 0: return base_model_type return model_type def resolve_user_settings_ref(self, ref: str) -> Path | None: ref = catalog.normalize_user_settings_ref(ref) if len(ref) == 0: return None base_model_type, filename = ref.split("/", 1) lora_dir = Path(self.get_lora_dir(base_model_type)) settings_path = (lora_dir / Path(filename).name).resolve() if settings_path.is_file() and settings_path.suffix.lower() in WAN_GP_SETTINGS_SUFFIXES: return settings_path return None @staticmethod def load_settings_payload(settings_path: Path) -> dict | None: try: if settings_path.suffix.lower() == ".zip": payload, source_task_count = load_first_settings_from_queue_zip(settings_path, SETTINGS_BUNDLE_ATTACHMENT_KEYS) if source_task_count > 1: print(f"[Process Full Video] Settings bundle {settings_path.name} contains {source_task_count} tasks; only the first task was extracted.") else: payload = json.loads(settings_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError, zipfile.BadZipFile): return None return payload if isinstance(payload, dict) else None def build_user_process_definition(self, ref: str) -> dict | None: normalized_ref = catalog.normalize_user_settings_ref(ref) settings_path = self.resolve_user_settings_ref(normalized_ref) if settings_path is None: return None payload = self.load_settings_payload(settings_path) if not isinstance(payload, dict): return None model_type = str(payload.get("model_type") or "").strip() if len(model_type) == 0: return None return { "settings": payload, "path": str(settings_path), "source": "user", "ref": normalized_ref, "name": settings_path.stem, "value": catalog.user_process_value(normalized_ref), } def user_process_definitions(self, user_refs: list[str]) -> dict[str, dict]: definitions: dict[str, dict] = {} for ref in catalog.get_saved_user_settings_refs({catalog.USER_SETTINGS_STORAGE_KEY: user_refs}): definition = self.build_user_process_definition(ref) if definition is None: continue value = str(definition.get("value") or "") if len(value) > 0: definitions[value] = definition return definitions @staticmethod def system_process_definition(process_name: str) -> dict | None: process_definition = catalog.PROCESS_DEFINITIONS.get(str(process_name or "").strip()) if not isinstance(process_definition, dict): return None return { "settings": process_definition.get("settings", {}), "path": process_definition.get("path", ""), "source": "system", "name": str(process_name or "").strip(), "value": str(process_name or "").strip(), } @staticmethod def main_model_type(main_state: dict | None) -> str: if not isinstance(main_state, dict): return "" key = "model_type" if main_state.get("active_form", "add") == "add" else "edit_model_type" return str(main_state.get(key) or main_state.get("model_type") or "").strip() @staticmethod def current_user_settings_filenames(main_state: dict | None) -> list[str]: if not isinstance(main_state, dict): return [] loras_presets = main_state.get("loras_presets", []) filenames: list[str] = [] seen: set[str] = set() for item in loras_presets: filename = str(item or "").strip() if "/" in filename or "\\" in filename or not is_wangp_settings_filename(filename): continue if filename.casefold() in seen: continue filenames.append(filename) seen.add(filename.casefold()) return sorted(filenames, key=lambda name: Path(name).stem.casefold()) def normalize_main_lset_selection(self, main_state: dict | None, main_lset_name: str | None) -> str: selection = str(main_lset_name or "").strip() filenames = self.current_user_settings_filenames(main_state) if selection in filenames: return selection normalized_label = selection.replace("\u2500", "").replace(chr(160), " ").strip().casefold() for filename in filenames: if Path(filename).stem.casefold() == normalized_label: return filename return "" def resolve_current_user_settings_file(self, main_state: dict | None, settings_filename: str) -> Path | None: filename = str(settings_filename or "").strip() if filename not in self.current_user_settings_filenames(main_state): return None model_type = self.main_model_type(main_state) if len(model_type) == 0: return None lora_dir = Path(self.get_lora_dir(model_type)) settings_path = (lora_dir / Path(filename).name).resolve() if settings_path.is_file() and settings_path.suffix.lower() in WAN_GP_SETTINGS_SUFFIXES: return settings_path return None def build_current_user_settings_ref(self, main_state: dict | None, settings_filename: str) -> str: model_type = self.main_model_type(main_state) base_model_type = self.base_model_type_for_ref(model_type) filename = Path(str(settings_filename or "").strip()).name return catalog.normalize_user_settings_ref(f"{base_model_type}/{filename}") def build_candidate_user_process_definition(self, main_state: dict | None, settings_filename: str) -> dict | None: settings_path = self.resolve_current_user_settings_file(main_state, settings_filename) if settings_path is None: return None payload = self.load_settings_payload(settings_path) if not isinstance(payload, dict): return None model_type = str(payload.get("model_type") or self.main_model_type(main_state)).strip() if len(model_type) == 0: return None payload = payload.copy() payload["model_type"] = model_type ref = self.build_current_user_settings_ref(main_state, settings_path.name) return { "settings": payload, "path": str(settings_path), "source": "user", "ref": ref, "name": settings_path.stem, "value": settings_path.name, } def process_definition(self, process_value: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> dict | None: process_value = str(process_value or "").strip() if len(process_value) == 0 or process_value == constants.NO_USER_SETTINGS_VALUE: return None system_definition = self.system_process_definition(process_value) if system_definition is not None: return system_definition if catalog.is_user_process_value(process_value): ref = catalog.user_process_ref_from_value(process_value) if len(ref) == 0: return None if user_refs is not None and ref.casefold() not in {item.casefold() for item in catalog.get_saved_user_settings_refs({catalog.USER_SETTINGS_STORAGE_KEY: user_refs})}: return None return self.build_user_process_definition(ref) if is_wangp_settings_filename(process_value): return self.build_candidate_user_process_definition(main_state, process_value) return None @staticmethod def process_definition_model_type(process_definition: dict | None) -> str: settings = process_definition.get("settings") if isinstance(process_definition, dict) else None return str(settings.get("model_type") or "").strip() if isinstance(settings, dict) else "" @staticmethod def system_handler_for_definition(process_definition: dict | None): settings = process_definition.get("settings") if isinstance(process_definition, dict) else None if not isinstance(settings, dict): return None return system_handlers.get_system_handler(settings.get("system_handler")) @staticmethod def system_handler_for_model_type(model_type: str): model_type = str(model_type or "").strip() for process_definition in catalog.PROCESS_DEFINITIONS.values(): settings = process_definition.get("settings", {}) if str(settings.get("model_type") or "").strip() != model_type: continue handler = system_handlers.get_system_handler(settings.get("system_handler")) if handler is not None: return handler return None def system_handler_for_process(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None): return self.system_handler_for_definition(self.process_definition(process_name, main_state, user_refs)) def target_control_choices(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> list[tuple[str, str]]: process_definition = self.process_definition(process_name, main_state, user_refs) handler = self.system_handler_for_definition(process_definition) if handler is not None and callable(getattr(handler, "target_control_choices_for_process", None)): return handler.target_control_choices_for_process(process_definition.get("settings", {})) return list(getattr(handler, "target_control_choices", [])) if handler is not None else [] def target_control_default(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> str: process_definition = self.process_definition(process_name, main_state, user_refs) handler = self.system_handler_for_definition(process_definition) if handler is not None and callable(getattr(handler, "target_control_default_for_process", None)): return handler.target_control_default_for_process(process_definition.get("settings", {})) return str(getattr(handler, "default_target_control", "")) if handler is not None else "" def has_target_control(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: return len(self.target_control_choices(process_name, main_state, user_refs)) > 0 def target_control_label(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> str: handler = self.system_handler_for_process(process_name, main_state, user_refs) return str(getattr(handler, "target_control_label", "Target")) if handler is not None else "Target" def default_chunk_size_seconds(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> float: handler = self.system_handler_for_process(process_name, main_state, user_refs) if handler is not None: return float(getattr(handler, "default_chunk_size_seconds", 10.0)) return 10.0 def hides_sliding_window_overlap(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: handler = self.system_handler_for_process(process_name, main_state, user_refs) return bool(getattr(handler, "hide_sliding_window_overlap", False)) if handler is not None else False def hides_output_resolution(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: handler = self.system_handler_for_process(process_name, main_state, user_refs) return bool(getattr(handler, "hide_output_resolution", False)) if handler is not None else False def hides_prompt(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: handler = self.system_handler_for_process(process_name, main_state, user_refs) return bool(getattr(handler, "hide_prompt", False)) if handler is not None else False def process_frame_rules(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> frames.FramePlanRules: process_definition = self.process_definition_or_default(process_name, main_state, user_refs) handler = self.system_handler_for_definition(process_definition) if handler is not None: return frames.FramePlanRules(frame_step=int(getattr(handler, "frame_step", 1)), minimum_requested_frames=int(getattr(handler, "minimum_requested_frames", 1))) model_type = self.process_definition_model_type(process_definition) return frames.get_frame_plan_rules(model_type, self.get_model_def) def process_values_by_model_type(self, user_refs: list[str]) -> dict[str, list[str]]: values_by_model_type: dict[str, list[str]] = {} for process_name, process_definition in catalog.PROCESS_DEFINITIONS.items(): model_type = str(process_definition.get("settings", {}).get("model_type") or "").strip() if len(model_type) > 0: values_by_model_type.setdefault(model_type, []).append(process_name) for value, definition in self.user_process_definitions(user_refs).items(): model_type = self.process_definition_model_type(definition) if len(model_type) > 0: values_by_model_type.setdefault(model_type, []).append(value) return values_by_model_type def model_type_choices(self, user_refs: list[str]) -> list[tuple[str, str]]: model_types = sorted(self.process_values_by_model_type(user_refs), key=lambda item: self.model_type_label(item).casefold()) choices = [(self.model_type_label(model_type), model_type) for model_type in model_types] choices.append((constants.ADD_USER_SETTINGS_LABEL, constants.ADD_USER_SETTINGS_MODEL_TYPE)) return choices def normal_process_choices(self, model_type: str, user_refs: list[str]) -> list[tuple[str, str]]: model_type = str(model_type or "").strip() entries: list[tuple[str, str, str]] = [] for process_name, process_definition in catalog.PROCESS_DEFINITIONS.items(): if str(process_definition.get("settings", {}).get("model_type") or "").strip() == model_type: entries.append((process_name, process_name, "system")) for value, definition in self.user_process_definitions(user_refs).items(): if self.process_definition_model_type(definition) == model_type: label_name = str(definition.get("name") or "").strip() if len(label_name) == 0: ref = catalog.user_process_ref_from_value(value) label_name = Path(ref or value).stem label = f"{label_name} *" entries.append((label, value, "user")) entries.sort(key=lambda item: (item[0].removesuffix(" *").casefold(), 0 if item[2] == "system" else 1)) return [(label, value) for label, value, _source in entries] def current_user_settings_choices(self, main_state: dict | None, main_lset_name: str | None) -> tuple[list[tuple[str, str]], str]: filenames = self.current_user_settings_filenames(main_state) if len(filenames) == 0: return [(constants.NO_USER_SETTINGS_LABEL, constants.NO_USER_SETTINGS_VALUE)], constants.NO_USER_SETTINGS_VALUE choices = [(Path(filename).stem, filename) for filename in filenames] selected = self.normalize_main_lset_selection(main_state, main_lset_name) value = selected if selected in filenames else filenames[0] return choices, value def process_choices(self, process_model_type: str, main_state: dict | None, main_lset_name: str | None, user_refs: list[str]) -> tuple[list[tuple[str, str]], str | None]: if str(process_model_type or "").strip() == constants.ADD_USER_SETTINGS_MODEL_TYPE: return self.current_user_settings_choices(main_state, main_lset_name) choices = self.normal_process_choices(str(process_model_type or "").strip(), user_refs) return choices, choices[0][1] if len(choices) > 0 else None @staticmethod def process_choices_have_user_settings(process_choices: list[tuple[str, str]]) -> bool: return any(catalog.is_user_process_value(value) for _label, value in list(process_choices or [])) def default_process_definition(self) -> dict: definition = self.system_process_definition(catalog.DEFAULT_PROCESS_NAME) if definition is not None: return definition for process_name in catalog.PROCESS_DEFINITIONS: definition = self.system_process_definition(process_name) if definition is not None: return definition return {"settings": {}, "path": "", "source": "system", "name": "", "value": ""} def process_definition_or_default(self, process_name: str, main_state: dict | None, user_refs: list[str] | None) -> dict: return self.process_definition(process_name, main_state, user_refs) or self.default_process_definition() @staticmethod def uses_builtin_outpaint_ui(process_definition: dict | None) -> bool: settings = process_definition.get("settings") if isinstance(process_definition, dict) else None return isinstance(process_definition, dict) and process_definition.get("source") != "user" and isinstance(settings, dict) and "video_guide_outpainting" in settings def has_process_outpaint(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: if self.has_target_control(process_name, main_state, user_refs): return False process_definition = self.process_definition(process_name, main_state, user_refs) return self.uses_builtin_outpaint_ui(process_definition) @staticmethod def user_lora_strength_override_default(process_definition: dict | None) -> float | None: if not isinstance(process_definition, dict) or process_definition.get("source") != "user": return None settings = process_definition.get("settings") if not isinstance(settings, dict): return None return common.get_single_lora_simple_multiplier(settings) def is_process_strength_visible(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: process_definition = self.process_definition(process_name, main_state, user_refs) if self.system_handler_for_definition(process_definition) is not None: return False settings = process_definition.get("settings") if isinstance(process_definition, dict) else None if not isinstance(settings, dict) or self.uses_builtin_outpaint_ui(process_definition): return False if process_definition.get("source") == "user": return self.user_lora_strength_override_default(process_definition) is not None return True def validate_user_process_definition(self, process_definition: dict | None) -> list[str]: return process_validation.validate_user_process_definition(process_definition, self.get_model_def) def format_user_process_validation_error(self, process_definition: dict | None, problems: list[str]) -> str: return process_validation.format_user_process_validation_error(process_definition, problems) def select_after_user_process_delete(self, deleted_process_value: str, deleted_model_type: str, old_refs: list[str], new_refs: list[str]) -> tuple[str, str, list[tuple[str, str]]]: old_choices = self.normal_process_choices(deleted_model_type, old_refs) deleted_index = next((index for index, (_label, value) in enumerate(old_choices) if value == deleted_process_value), -1) new_choices = self.normal_process_choices(deleted_model_type, new_refs) new_values = {value for _label, value in new_choices} if deleted_index >= 0: for _label, value in old_choices[deleted_index + 1:]: if catalog.is_user_process_value(value) and value in new_values: return deleted_model_type, value, new_choices first_system_value = next((value for _label, value in new_choices if not catalog.is_user_process_value(value)), None) if first_system_value is not None: return deleted_model_type, first_system_value, new_choices for model_type in sorted(self.process_values_by_model_type(new_refs), key=lambda item: self.model_type_label(item).casefold()): choices = self.normal_process_choices(model_type, new_refs) first_system_value = next((value for _label, value in choices if not catalog.is_user_process_value(value)), None) if first_system_value is not None: return model_type, first_system_value, choices if len(new_choices) > 0: return deleted_model_type, new_choices[0][1], new_choices return catalog.DEFAULT_MODEL_TYPE, catalog.DEFAULT_PROCESS_NAME, self.normal_process_choices(catalog.DEFAULT_MODEL_TYPE, new_refs)