| from __future__ import annotations |
|
|
| import json |
| import zipfile |
| from pathlib import Path |
|
|
| import gradio as gr |
|
|
| from shared.utils.settings_bundle import SETTINGS_BUNDLE_ATTACHMENT_KEYS, WAN_GP_SETTINGS_SUFFIXES, is_wangp_settings_filename, load_first_settings_from_queue_zip |
|
|
| from . import common |
| from . import constants |
| from . import frame_planning as frames |
| from . import process_catalog as catalog |
| from . import process_validation |
| from . import system_handlers |
|
|
|
|
| class ProcessLibrary: |
| def __init__(self, *, get_model_def, get_lora_dir, get_base_model_type) -> None: |
| self.get_model_def = get_model_def |
| self.get_lora_dir = get_lora_dir |
| self.get_base_model_type = get_base_model_type |
|
|
| def model_type_label(self, model_type: str) -> str: |
| if len(str(model_type or "").strip()) == 0: |
| return "Unknown Model" |
| handler = self.system_handler_for_model_type(str(model_type)) |
| if handler is not None: |
| return str(getattr(handler, "model_label", str(model_type))) |
| try: |
| model_def = frames.require_model_def(str(model_type), self.get_model_def) |
| except gr.Error: |
| return str(model_type) |
| model_block = model_def.get("model") |
| if isinstance(model_block, dict): |
| model_name = str(model_block.get("name") or "").strip() |
| if len(model_name) > 0: |
| return model_name |
| model_name = str(model_def.get("name") or "").strip() |
| return model_name if len(model_name) > 0 else str(model_type) |
|
|
| def base_model_type_for_ref(self, model_type: str) -> str: |
| model_type = str(model_type or "").strip() |
| base_model_type = str(self.get_base_model_type(model_type) or "").strip() |
| if len(base_model_type) > 0: |
| return base_model_type |
| return model_type |
|
|
| def resolve_user_settings_ref(self, ref: str) -> Path | None: |
| ref = catalog.normalize_user_settings_ref(ref) |
| if len(ref) == 0: |
| return None |
| base_model_type, filename = ref.split("/", 1) |
| lora_dir = Path(self.get_lora_dir(base_model_type)) |
| settings_path = (lora_dir / Path(filename).name).resolve() |
| if settings_path.is_file() and settings_path.suffix.lower() in WAN_GP_SETTINGS_SUFFIXES: |
| return settings_path |
| return None |
|
|
| @staticmethod |
| def load_settings_payload(settings_path: Path) -> dict | None: |
| try: |
| if settings_path.suffix.lower() == ".zip": |
| payload, source_task_count = load_first_settings_from_queue_zip(settings_path, SETTINGS_BUNDLE_ATTACHMENT_KEYS) |
| if source_task_count > 1: |
| print(f"[Process Full Video] Settings bundle {settings_path.name} contains {source_task_count} tasks; only the first task was extracted.") |
| else: |
| payload = json.loads(settings_path.read_text(encoding="utf-8")) |
| except (OSError, json.JSONDecodeError, zipfile.BadZipFile): |
| return None |
| return payload if isinstance(payload, dict) else None |
|
|
| def build_user_process_definition(self, ref: str) -> dict | None: |
| normalized_ref = catalog.normalize_user_settings_ref(ref) |
| settings_path = self.resolve_user_settings_ref(normalized_ref) |
| if settings_path is None: |
| return None |
| payload = self.load_settings_payload(settings_path) |
| if not isinstance(payload, dict): |
| return None |
| model_type = str(payload.get("model_type") or "").strip() |
| if len(model_type) == 0: |
| return None |
| return { |
| "settings": payload, |
| "path": str(settings_path), |
| "source": "user", |
| "ref": normalized_ref, |
| "name": settings_path.stem, |
| "value": catalog.user_process_value(normalized_ref), |
| } |
|
|
| def user_process_definitions(self, user_refs: list[str]) -> dict[str, dict]: |
| definitions: dict[str, dict] = {} |
| for ref in catalog.get_saved_user_settings_refs({catalog.USER_SETTINGS_STORAGE_KEY: user_refs}): |
| definition = self.build_user_process_definition(ref) |
| if definition is None: |
| continue |
| value = str(definition.get("value") or "") |
| if len(value) > 0: |
| definitions[value] = definition |
| return definitions |
|
|
| @staticmethod |
| def system_process_definition(process_name: str) -> dict | None: |
| process_definition = catalog.PROCESS_DEFINITIONS.get(str(process_name or "").strip()) |
| if not isinstance(process_definition, dict): |
| return None |
| return { |
| "settings": process_definition.get("settings", {}), |
| "path": process_definition.get("path", ""), |
| "source": "system", |
| "name": str(process_name or "").strip(), |
| "value": str(process_name or "").strip(), |
| } |
|
|
| @staticmethod |
| def main_model_type(main_state: dict | None) -> str: |
| if not isinstance(main_state, dict): |
| return "" |
| key = "model_type" if main_state.get("active_form", "add") == "add" else "edit_model_type" |
| return str(main_state.get(key) or main_state.get("model_type") or "").strip() |
|
|
| @staticmethod |
| def current_user_settings_filenames(main_state: dict | None) -> list[str]: |
| if not isinstance(main_state, dict): |
| return [] |
| loras_presets = main_state.get("loras_presets", []) |
| filenames: list[str] = [] |
| seen: set[str] = set() |
| for item in loras_presets: |
| filename = str(item or "").strip() |
| if "/" in filename or "\\" in filename or not is_wangp_settings_filename(filename): |
| continue |
| if filename.casefold() in seen: |
| continue |
| filenames.append(filename) |
| seen.add(filename.casefold()) |
| return sorted(filenames, key=lambda name: Path(name).stem.casefold()) |
|
|
| def normalize_main_lset_selection(self, main_state: dict | None, main_lset_name: str | None) -> str: |
| selection = str(main_lset_name or "").strip() |
| filenames = self.current_user_settings_filenames(main_state) |
| if selection in filenames: |
| return selection |
| normalized_label = selection.replace("\u2500", "").replace(chr(160), " ").strip().casefold() |
| for filename in filenames: |
| if Path(filename).stem.casefold() == normalized_label: |
| return filename |
| return "" |
|
|
| def resolve_current_user_settings_file(self, main_state: dict | None, settings_filename: str) -> Path | None: |
| filename = str(settings_filename or "").strip() |
| if filename not in self.current_user_settings_filenames(main_state): |
| return None |
| model_type = self.main_model_type(main_state) |
| if len(model_type) == 0: |
| return None |
| lora_dir = Path(self.get_lora_dir(model_type)) |
| settings_path = (lora_dir / Path(filename).name).resolve() |
| if settings_path.is_file() and settings_path.suffix.lower() in WAN_GP_SETTINGS_SUFFIXES: |
| return settings_path |
| return None |
|
|
| def build_current_user_settings_ref(self, main_state: dict | None, settings_filename: str) -> str: |
| model_type = self.main_model_type(main_state) |
| base_model_type = self.base_model_type_for_ref(model_type) |
| filename = Path(str(settings_filename or "").strip()).name |
| return catalog.normalize_user_settings_ref(f"{base_model_type}/{filename}") |
|
|
| def build_candidate_user_process_definition(self, main_state: dict | None, settings_filename: str) -> dict | None: |
| settings_path = self.resolve_current_user_settings_file(main_state, settings_filename) |
| if settings_path is None: |
| return None |
| payload = self.load_settings_payload(settings_path) |
| if not isinstance(payload, dict): |
| return None |
| model_type = str(payload.get("model_type") or self.main_model_type(main_state)).strip() |
| if len(model_type) == 0: |
| return None |
| payload = payload.copy() |
| payload["model_type"] = model_type |
| ref = self.build_current_user_settings_ref(main_state, settings_path.name) |
| return { |
| "settings": payload, |
| "path": str(settings_path), |
| "source": "user", |
| "ref": ref, |
| "name": settings_path.stem, |
| "value": settings_path.name, |
| } |
|
|
| def process_definition(self, process_value: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> dict | None: |
| process_value = str(process_value or "").strip() |
| if len(process_value) == 0 or process_value == constants.NO_USER_SETTINGS_VALUE: |
| return None |
| system_definition = self.system_process_definition(process_value) |
| if system_definition is not None: |
| return system_definition |
| if catalog.is_user_process_value(process_value): |
| ref = catalog.user_process_ref_from_value(process_value) |
| if len(ref) == 0: |
| return None |
| if user_refs is not None and ref.casefold() not in {item.casefold() for item in catalog.get_saved_user_settings_refs({catalog.USER_SETTINGS_STORAGE_KEY: user_refs})}: |
| return None |
| return self.build_user_process_definition(ref) |
| if is_wangp_settings_filename(process_value): |
| return self.build_candidate_user_process_definition(main_state, process_value) |
| return None |
|
|
| @staticmethod |
| def process_definition_model_type(process_definition: dict | None) -> str: |
| settings = process_definition.get("settings") if isinstance(process_definition, dict) else None |
| return str(settings.get("model_type") or "").strip() if isinstance(settings, dict) else "" |
|
|
| @staticmethod |
| def system_handler_for_definition(process_definition: dict | None): |
| settings = process_definition.get("settings") if isinstance(process_definition, dict) else None |
| if not isinstance(settings, dict): |
| return None |
| return system_handlers.get_system_handler(settings.get("system_handler")) |
|
|
| @staticmethod |
| def system_handler_for_model_type(model_type: str): |
| model_type = str(model_type or "").strip() |
| for process_definition in catalog.PROCESS_DEFINITIONS.values(): |
| settings = process_definition.get("settings", {}) |
| if str(settings.get("model_type") or "").strip() != model_type: |
| continue |
| handler = system_handlers.get_system_handler(settings.get("system_handler")) |
| if handler is not None: |
| return handler |
| return None |
|
|
| def system_handler_for_process(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None): |
| return self.system_handler_for_definition(self.process_definition(process_name, main_state, user_refs)) |
|
|
| def target_control_choices(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> list[tuple[str, str]]: |
| process_definition = self.process_definition(process_name, main_state, user_refs) |
| handler = self.system_handler_for_definition(process_definition) |
| if handler is not None and callable(getattr(handler, "target_control_choices_for_process", None)): |
| return handler.target_control_choices_for_process(process_definition.get("settings", {})) |
| return list(getattr(handler, "target_control_choices", [])) if handler is not None else [] |
|
|
| def target_control_default(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> str: |
| process_definition = self.process_definition(process_name, main_state, user_refs) |
| handler = self.system_handler_for_definition(process_definition) |
| if handler is not None and callable(getattr(handler, "target_control_default_for_process", None)): |
| return handler.target_control_default_for_process(process_definition.get("settings", {})) |
| return str(getattr(handler, "default_target_control", "")) if handler is not None else "" |
|
|
| def has_target_control(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: |
| return len(self.target_control_choices(process_name, main_state, user_refs)) > 0 |
|
|
| def target_control_label(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> str: |
| handler = self.system_handler_for_process(process_name, main_state, user_refs) |
| return str(getattr(handler, "target_control_label", "Target")) if handler is not None else "Target" |
|
|
| def default_chunk_size_seconds(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> float: |
| handler = self.system_handler_for_process(process_name, main_state, user_refs) |
| if handler is not None: |
| return float(getattr(handler, "default_chunk_size_seconds", 10.0)) |
| return 10.0 |
|
|
| def hides_sliding_window_overlap(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: |
| handler = self.system_handler_for_process(process_name, main_state, user_refs) |
| return bool(getattr(handler, "hide_sliding_window_overlap", False)) if handler is not None else False |
|
|
| def hides_output_resolution(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: |
| handler = self.system_handler_for_process(process_name, main_state, user_refs) |
| return bool(getattr(handler, "hide_output_resolution", False)) if handler is not None else False |
|
|
| def hides_prompt(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: |
| handler = self.system_handler_for_process(process_name, main_state, user_refs) |
| return bool(getattr(handler, "hide_prompt", False)) if handler is not None else False |
|
|
| def process_frame_rules(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> frames.FramePlanRules: |
| process_definition = self.process_definition_or_default(process_name, main_state, user_refs) |
| handler = self.system_handler_for_definition(process_definition) |
| if handler is not None: |
| return frames.FramePlanRules(frame_step=int(getattr(handler, "frame_step", 1)), minimum_requested_frames=int(getattr(handler, "minimum_requested_frames", 1))) |
| model_type = self.process_definition_model_type(process_definition) |
| return frames.get_frame_plan_rules(model_type, self.get_model_def) |
|
|
| def process_values_by_model_type(self, user_refs: list[str]) -> dict[str, list[str]]: |
| values_by_model_type: dict[str, list[str]] = {} |
| for process_name, process_definition in catalog.PROCESS_DEFINITIONS.items(): |
| model_type = str(process_definition.get("settings", {}).get("model_type") or "").strip() |
| if len(model_type) > 0: |
| values_by_model_type.setdefault(model_type, []).append(process_name) |
| for value, definition in self.user_process_definitions(user_refs).items(): |
| model_type = self.process_definition_model_type(definition) |
| if len(model_type) > 0: |
| values_by_model_type.setdefault(model_type, []).append(value) |
| return values_by_model_type |
|
|
| def model_type_choices(self, user_refs: list[str]) -> list[tuple[str, str]]: |
| model_types = sorted(self.process_values_by_model_type(user_refs), key=lambda item: self.model_type_label(item).casefold()) |
| choices = [(self.model_type_label(model_type), model_type) for model_type in model_types] |
| choices.append((constants.ADD_USER_SETTINGS_LABEL, constants.ADD_USER_SETTINGS_MODEL_TYPE)) |
| return choices |
|
|
| def normal_process_choices(self, model_type: str, user_refs: list[str]) -> list[tuple[str, str]]: |
| model_type = str(model_type or "").strip() |
| entries: list[tuple[str, str, str]] = [] |
| for process_name, process_definition in catalog.PROCESS_DEFINITIONS.items(): |
| if str(process_definition.get("settings", {}).get("model_type") or "").strip() == model_type: |
| entries.append((process_name, process_name, "system")) |
| for value, definition in self.user_process_definitions(user_refs).items(): |
| if self.process_definition_model_type(definition) == model_type: |
| label_name = str(definition.get("name") or "").strip() |
| if len(label_name) == 0: |
| ref = catalog.user_process_ref_from_value(value) |
| label_name = Path(ref or value).stem |
| label = f"{label_name} *" |
| entries.append((label, value, "user")) |
| entries.sort(key=lambda item: (item[0].removesuffix(" *").casefold(), 0 if item[2] == "system" else 1)) |
| return [(label, value) for label, value, _source in entries] |
|
|
| def current_user_settings_choices(self, main_state: dict | None, main_lset_name: str | None) -> tuple[list[tuple[str, str]], str]: |
| filenames = self.current_user_settings_filenames(main_state) |
| if len(filenames) == 0: |
| return [(constants.NO_USER_SETTINGS_LABEL, constants.NO_USER_SETTINGS_VALUE)], constants.NO_USER_SETTINGS_VALUE |
| choices = [(Path(filename).stem, filename) for filename in filenames] |
| selected = self.normalize_main_lset_selection(main_state, main_lset_name) |
| value = selected if selected in filenames else filenames[0] |
| return choices, value |
|
|
| def process_choices(self, process_model_type: str, main_state: dict | None, main_lset_name: str | None, user_refs: list[str]) -> tuple[list[tuple[str, str]], str | None]: |
| if str(process_model_type or "").strip() == constants.ADD_USER_SETTINGS_MODEL_TYPE: |
| return self.current_user_settings_choices(main_state, main_lset_name) |
| choices = self.normal_process_choices(str(process_model_type or "").strip(), user_refs) |
| return choices, choices[0][1] if len(choices) > 0 else None |
|
|
| @staticmethod |
| def process_choices_have_user_settings(process_choices: list[tuple[str, str]]) -> bool: |
| return any(catalog.is_user_process_value(value) for _label, value in list(process_choices or [])) |
|
|
| def default_process_definition(self) -> dict: |
| definition = self.system_process_definition(catalog.DEFAULT_PROCESS_NAME) |
| if definition is not None: |
| return definition |
| for process_name in catalog.PROCESS_DEFINITIONS: |
| definition = self.system_process_definition(process_name) |
| if definition is not None: |
| return definition |
| return {"settings": {}, "path": "", "source": "system", "name": "", "value": ""} |
|
|
| def process_definition_or_default(self, process_name: str, main_state: dict | None, user_refs: list[str] | None) -> dict: |
| return self.process_definition(process_name, main_state, user_refs) or self.default_process_definition() |
|
|
| @staticmethod |
| def uses_builtin_outpaint_ui(process_definition: dict | None) -> bool: |
| settings = process_definition.get("settings") if isinstance(process_definition, dict) else None |
| return isinstance(process_definition, dict) and process_definition.get("source") != "user" and isinstance(settings, dict) and "video_guide_outpainting" in settings |
|
|
| def has_process_outpaint(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: |
| if self.has_target_control(process_name, main_state, user_refs): |
| return False |
| process_definition = self.process_definition(process_name, main_state, user_refs) |
| return self.uses_builtin_outpaint_ui(process_definition) |
|
|
| @staticmethod |
| def user_lora_strength_override_default(process_definition: dict | None) -> float | None: |
| if not isinstance(process_definition, dict) or process_definition.get("source") != "user": |
| return None |
| settings = process_definition.get("settings") |
| if not isinstance(settings, dict): |
| return None |
| return common.get_single_lora_simple_multiplier(settings) |
|
|
| def is_process_strength_visible(self, process_name: str, main_state: dict | None = None, user_refs: list[str] | None = None) -> bool: |
| process_definition = self.process_definition(process_name, main_state, user_refs) |
| if self.system_handler_for_definition(process_definition) is not None: |
| return False |
| settings = process_definition.get("settings") if isinstance(process_definition, dict) else None |
| if not isinstance(settings, dict) or self.uses_builtin_outpaint_ui(process_definition): |
| return False |
| if process_definition.get("source") == "user": |
| return self.user_lora_strength_override_default(process_definition) is not None |
| return True |
|
|
| def validate_user_process_definition(self, process_definition: dict | None) -> list[str]: |
| return process_validation.validate_user_process_definition(process_definition, self.get_model_def) |
|
|
| def format_user_process_validation_error(self, process_definition: dict | None, problems: list[str]) -> str: |
| return process_validation.format_user_process_validation_error(process_definition, problems) |
|
|
| def select_after_user_process_delete(self, deleted_process_value: str, deleted_model_type: str, old_refs: list[str], new_refs: list[str]) -> tuple[str, str, list[tuple[str, str]]]: |
| old_choices = self.normal_process_choices(deleted_model_type, old_refs) |
| deleted_index = next((index for index, (_label, value) in enumerate(old_choices) if value == deleted_process_value), -1) |
| new_choices = self.normal_process_choices(deleted_model_type, new_refs) |
| new_values = {value for _label, value in new_choices} |
| if deleted_index >= 0: |
| for _label, value in old_choices[deleted_index + 1:]: |
| if catalog.is_user_process_value(value) and value in new_values: |
| return deleted_model_type, value, new_choices |
| first_system_value = next((value for _label, value in new_choices if not catalog.is_user_process_value(value)), None) |
| if first_system_value is not None: |
| return deleted_model_type, first_system_value, new_choices |
| for model_type in sorted(self.process_values_by_model_type(new_refs), key=lambda item: self.model_type_label(item).casefold()): |
| choices = self.normal_process_choices(model_type, new_refs) |
| first_system_value = next((value for _label, value in choices if not catalog.is_user_process_value(value)), None) |
| if first_system_value is not None: |
| return model_type, first_system_value, choices |
| if len(new_choices) > 0: |
| return deleted_model_type, new_choices[0][1], new_choices |
| return catalog.DEFAULT_MODEL_TYPE, catalog.DEFAULT_PROCESS_NAME, self.normal_process_choices(catalog.DEFAULT_MODEL_TYPE, new_refs) |
|
|