| from __future__ import annotations |
|
|
| import copy |
| import json |
| import math |
| import os |
| import sys |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Any |
|
|
| from shared.utils.settings_bundle import SETTINGS_BUNDLE_ATTACHMENT_KEYS, WAN_GP_SETTINGS_SUFFIXES, is_wangp_settings_filename, load_first_settings_from_queue_zip |
| from shared.utils.loras_mutipliers import merge_loras_settings |
| from shared.deepy.config import ( |
| DEEPY_DEFAULT_EDIT_IMAGE, |
| DEEPY_DEFAULT_GEN_IMAGE, |
| DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION, |
| DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE, |
| DEEPY_DEFAULT_GEN_VIDEO, |
| DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH, |
| DEEPY_TOOL_EDIT_IMAGE_KEY, |
| DEEPY_TOOL_GEN_IMAGE_KEY, |
| DEEPY_TOOL_GEN_SPEECH_FROM_DESCRIPTION_KEY, |
| DEEPY_TOOL_GEN_SPEECH_FROM_SAMPLE_KEY, |
| DEEPY_TOOL_GEN_VIDEO_KEY, |
| DEEPY_TOOL_GEN_VIDEO_WITH_SPEECH_KEY, |
| get_deepy_config_value, |
| normalize_deepy_tool_edit_image, |
| normalize_deepy_tool_gen_image, |
| normalize_deepy_tool_gen_speech_from_description, |
| normalize_deepy_tool_gen_speech_from_sample, |
| normalize_deepy_tool_gen_video, |
| normalize_deepy_tool_gen_video_with_speech, |
| ) |
|
|
|
|
| _DEEPY_DIR = Path(__file__).resolve().parent |
| SETTINGS_DIR = _DEEPY_DIR / "settings" |
| DEFAULT_IMAGE_EDITOR_VARIANT = DEEPY_DEFAULT_EDIT_IMAGE |
| DEFAULT_VIDEO_WITH_SPEECH_VARIANT = DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH |
| DEFAULT_SPEECH_FROM_DESCRIPTION_VARIANT = DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION |
| DEFAULT_SPEECH_FROM_SAMPLE_VARIANT = DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE |
| TOOL_DISPLAY_NAMES = { |
| "gen_image": "Image Generator", |
| "edit_image": "Image Editor", |
| "gen_video": "Video Generator", |
| "gen_video_with_speech": "Video With Speech", |
| "gen_speech_from_description": "Speech From Description", |
| "gen_speech_from_sample": "Speech From Sample", |
| } |
| _TOOL_TEMPLATE_VALIDATION_ERRORS = { |
| "gen_video": "The settings should generate a video", |
| "gen_video_with_speech": "The settings should generate a Video and accept an Audio Prompt", |
| "gen_image": "The settings of the model must generate an Image", |
| "edit_image": "The settings of the model must generate an Image and accept an Image Ref", |
| "gen_speech_from_description": "The model should generate only an audio output", |
| "gen_speech_from_sample": "The model should generate only an audio output and a sample audio ois expected", |
| } |
| _TOOL_CONFIG_SPECS = { |
| "gen_image": {"key": DEEPY_TOOL_GEN_IMAGE_KEY, "default": DEEPY_DEFAULT_GEN_IMAGE, "normalize": normalize_deepy_tool_gen_image}, |
| "edit_image": {"key": DEEPY_TOOL_EDIT_IMAGE_KEY, "default": DEEPY_DEFAULT_EDIT_IMAGE, "normalize": normalize_deepy_tool_edit_image}, |
| "gen_video": {"key": DEEPY_TOOL_GEN_VIDEO_KEY, "default": DEEPY_DEFAULT_GEN_VIDEO, "normalize": normalize_deepy_tool_gen_video}, |
| "gen_video_with_speech": { |
| "key": DEEPY_TOOL_GEN_VIDEO_WITH_SPEECH_KEY, |
| "default": DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH, |
| "normalize": normalize_deepy_tool_gen_video_with_speech, |
| }, |
| "gen_speech_from_description": { |
| "key": DEEPY_TOOL_GEN_SPEECH_FROM_DESCRIPTION_KEY, |
| "default": DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION, |
| "normalize": normalize_deepy_tool_gen_speech_from_description, |
| }, |
| "gen_speech_from_sample": { |
| "key": DEEPY_TOOL_GEN_SPEECH_FROM_SAMPLE_KEY, |
| "default": DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE, |
| "normalize": normalize_deepy_tool_gen_speech_from_sample, |
| }, |
| } |
| _PRESET_SOURCE_BUILTIN = "builtin" |
| _PRESET_SOURCE_LINKED = "linked" |
| _PRESET_SOURCE_PRIORITY = { |
| _PRESET_SOURCE_BUILTIN: 0, |
| _PRESET_SOURCE_LINKED: 1, |
| } |
| _LEGACY_VARIANT_ALIASES = { |
| "edit_image": {"Qwen_Edit": DEEPY_DEFAULT_EDIT_IMAGE}, |
| "gen_image": {"Z_Image_Turbo": DEEPY_DEFAULT_GEN_IMAGE}, |
| "gen_video": {"ltx2_22B_distilled": DEEPY_DEFAULT_GEN_VIDEO}, |
| } |
| _LIVE_FILE_PRESET_CACHE: dict[tuple[str, str], dict[str, Any]] = {} |
| GENERATION_TOOL_IDS = tuple(_TOOL_CONFIG_SPECS.keys()) |
|
|
|
|
| def _canonical_variant(tool_name: str, variant: Any) -> str: |
| text = str(variant or "").strip() |
| if len(text) == 0: |
| return "" |
| return _LEGACY_VARIANT_ALIASES.get(str(tool_name or "").strip(), {}).get(text, text) |
|
|
|
|
| def _tool_config_spec(tool_name: str) -> dict[str, Any]: |
| return _TOOL_CONFIG_SPECS.get(str(tool_name or "").strip(), {}) |
|
|
|
|
| def _get_configured_tool_variant(tool_name: str) -> str: |
| spec = _tool_config_spec(tool_name) |
| if len(spec) == 0: |
| return "" |
| raw_value = get_deepy_config_value(spec["key"], spec["default"]) |
| return str(spec["normalize"](raw_value) or "").strip() |
|
|
|
|
| def _looks_like_linked_variant(value: Any) -> bool: |
| text = str(value or "").strip().strip('"').replace("\\", "/") |
| if len(text) == 0 or not is_wangp_settings_filename(text): |
| return False |
| if text.startswith("/") or text.startswith("./") or text.startswith("../"): |
| return False |
| if len(text) >= 2 and text[1] == ":": |
| return False |
| parts = [part.strip() for part in text.split("/")] |
| return len(parts) == 2 and all(parts) |
|
|
|
|
| def _normalize_linked_variant(value: Any) -> str | None: |
| if not _looks_like_linked_variant(value): |
| return None |
| base_model_type, filename = [part.strip() for part in str(value or "").strip().strip('"').replace("\\", "/").split("/", 1)] |
| return f"{base_model_type}/{Path(filename).name}" |
|
|
|
|
| def _parse_linked_variant(value: Any) -> tuple[str, str] | None: |
| normalized = _normalize_linked_variant(value) |
| if normalized is None: |
| return None |
| return tuple(normalized.split("/", 1)) |
|
|
|
|
| def _get_main_callable(name: str) -> Any: |
| main_module = sys.modules.get("__main__") |
| return None if main_module is None else getattr(main_module, str(name or "").strip(), None) |
|
|
|
|
| def _get_base_model_type_name(model_type: Any) -> str: |
| text = str(model_type or "").strip() |
| if len(text) == 0: |
| return "" |
| get_base_model_type = _get_main_callable("get_base_model_type") |
| if callable(get_base_model_type): |
| try: |
| resolved = str(get_base_model_type(text) or "").strip() |
| except Exception: |
| resolved = "" |
| if len(resolved) > 0: |
| return resolved |
| return text |
|
|
|
|
| def _resolve_linked_variant_path(value: Any) -> Path | None: |
| parsed = _parse_linked_variant(value) |
| if parsed is None: |
| return None |
| base_model_type, filename = parsed |
| get_lora_dir = _get_main_callable("get_lora_dir") |
| if not callable(get_lora_dir): |
| return None |
| try: |
| lora_dir = Path(get_lora_dir(base_model_type)) |
| except Exception: |
| return None |
| candidate = (lora_dir / filename).resolve() |
| if candidate.is_file() and candidate.suffix.lower() in WAN_GP_SETTINGS_SUFFIXES: |
| return candidate |
| return None |
|
|
|
|
| def _iter_builtin_settings_dirs() -> tuple[Path, ...]: |
| if not SETTINGS_DIR.is_dir(): |
| return () |
| return tuple(sorted(path for path in SETTINGS_DIR.iterdir() if path.is_dir())) |
|
|
|
|
| def _build_preset_entry(path: Path, *, variant: str | None = None, label: str | None = None, source: str | None = None) -> dict[str, Any] | None: |
| variant_value = str(variant or path.stem or "").strip() |
| if len(variant_value) == 0: |
| return None |
| return { |
| "variant": variant_value, |
| "label": str(label or variant_value).strip() or variant_value, |
| "path": path, |
| "source": str(source or _PRESET_SOURCE_BUILTIN).strip() or _PRESET_SOURCE_BUILTIN, |
| } |
|
|
|
|
| def _build_linked_variant_entry(variant: Any) -> dict[str, Any] | None: |
| normalized = _normalize_linked_variant(variant) |
| if normalized is None: |
| return None |
| preset_path = _resolve_linked_variant_path(normalized) |
| if preset_path is None: |
| return None |
| _, filename = normalized.split("/", 1) |
| label = Path(filename).stem |
| return _build_preset_entry(preset_path, variant=normalized, label=label, source=_PRESET_SOURCE_LINKED) |
|
|
|
|
| def _add_or_replace_preset_entry(tool_entries: list[dict[str, Any]], entry: dict[str, Any]) -> None: |
| variant = str(entry.get("variant", "")).strip() |
| if len(variant) == 0: |
| return |
| for index, existing in enumerate(tool_entries): |
| if str(existing.get("variant", "")).strip() != variant: |
| continue |
| current_priority = _PRESET_SOURCE_PRIORITY.get(str(existing.get("source", _PRESET_SOURCE_BUILTIN)).strip(), 0) |
| next_priority = _PRESET_SOURCE_PRIORITY.get(str(entry.get("source", _PRESET_SOURCE_BUILTIN)).strip(), 0) |
| if next_priority >= current_priority: |
| tool_entries[index] = entry |
| return |
| tool_entries.append(entry) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _preset_index() -> dict[str, tuple[dict[str, Any], ...]]: |
| index: dict[str, list[dict[str, Any]]] = {} |
| for tool_dir in _iter_builtin_settings_dirs(): |
| tool_name = str(tool_dir.name or "").strip() |
| if len(tool_name) == 0: |
| continue |
| tool_entries = index.setdefault(tool_name, []) |
| for path in sorted(tool_dir.glob("*.json")): |
| entry = _build_preset_entry(path) |
| if entry is not None: |
| _add_or_replace_preset_entry(tool_entries, entry) |
| return {tool_name: tuple(entries) for tool_name, entries in index.items()} |
|
|
|
|
| def _tool_entries(tool_name: str, current_variant: Any = None) -> list[dict[str, Any]]: |
| entries = list(_preset_index().get(str(tool_name or "").strip(), ())) |
| seen = {str(entry.get("variant", "")).strip() for entry in entries} |
| for candidate in (_get_configured_tool_variant(tool_name), current_variant): |
| linked_entry = _build_linked_variant_entry(candidate) |
| if linked_entry is not None: |
| variant = str(linked_entry.get("variant", "")).strip() |
| if variant not in seen: |
| entries.append(linked_entry) |
| seen.add(variant) |
| return entries |
|
|
|
|
| def list_tool_variants(tool_name: str, current_variant: Any = None) -> list[str]: |
| return [str(entry.get("variant", "")).strip() for entry in _tool_entries(tool_name, current_variant=current_variant) if len(str(entry.get("variant", "")).strip()) > 0] |
|
|
|
|
| def list_tool_variant_choices(tool_name: str, current_variant: Any = None) -> list[tuple[str, str]]: |
| return [ |
| (str(entry.get("label", "")).strip() or str(entry.get("variant", "")).strip(), str(entry.get("variant", "")).strip()) |
| for entry in _tool_entries(tool_name, current_variant=current_variant) |
| if len(str(entry.get("variant", "")).strip()) > 0 |
| ] |
|
|
|
|
| def find_tool_variant(tool_name: str, requested_variant: Any, current_variant: Any = None) -> str | None: |
| tool_name = str(tool_name or "").strip() |
| requested = _canonical_variant(tool_name, requested_variant) |
| if len(requested) == 0: |
| return None |
| linked_variant = _normalize_linked_variant(requested) |
| if linked_variant is not None: |
| return linked_variant if _resolve_linked_variant_path(linked_variant) is not None else None |
| variants = list_tool_variants(tool_name, current_variant=current_variant) |
| if requested in variants: |
| return requested |
| requested_cf = requested.casefold() |
| for variant in variants: |
| if variant.casefold() == requested_cf: |
| return variant |
| return None |
|
|
|
|
| def get_tool_variant_path(tool_name: str, requested_variant: Any, current_variant: Any = None) -> Path | None: |
| linked_path = _resolve_linked_variant_path(requested_variant) |
| if linked_path is not None: |
| return linked_path |
| resolved_variant = find_tool_variant(tool_name, requested_variant, current_variant=current_variant) |
| if resolved_variant is None: |
| return None |
| for entry in _tool_entries(tool_name, current_variant=current_variant): |
| if str(entry.get("variant", "")).strip() == resolved_variant: |
| return Path(entry["path"]) |
| return None |
|
|
|
|
| def get_tool_variant_model_def(tool_name: str, variant: Any) -> dict[str, Any] | None: |
| try: |
| payload = load_tool_preset(tool_name, str(variant or "").strip()) |
| except Exception: |
| return None |
| model_def = _get_model_def_from_settings_payload(payload) |
| return dict(model_def or {}) if isinstance(model_def, dict) else None |
|
|
|
|
| def resolve_wangp_settings_file(state: Any, selected_value: Any) -> Path | None: |
| value = str(selected_value or "").strip() |
| if len(value) == 0 or "/" in value or "\\" in value or not is_wangp_settings_filename(value): |
| return None |
| get_state_model_type = _get_main_callable("get_state_model_type") |
| get_lora_dir = _get_main_callable("get_lora_dir") |
| if not callable(get_state_model_type) or not callable(get_lora_dir): |
| return None |
| try: |
| model_type = get_state_model_type(state) |
| lora_dir = Path(get_lora_dir(model_type)) |
| except Exception: |
| return None |
| source_path = (lora_dir / Path(value).name).resolve() |
| if source_path.is_file() and source_path.suffix.lower() in WAN_GP_SETTINGS_SUFFIXES: |
| return source_path |
| return None |
|
|
|
|
| def _load_wangp_settings_payload(source_path: Path) -> dict[str, Any]: |
| source_path = Path(source_path).resolve() |
| if source_path.suffix.lower() == ".zip": |
| payload, source_task_count = load_first_settings_from_queue_zip(source_path, SETTINGS_BUNDLE_ATTACHMENT_KEYS) |
| if source_task_count > 1: |
| print(f"[Deepy] Settings bundle {source_path.name} contains {source_task_count} tasks; only the first task was extracted.") |
| else: |
| with source_path.open("r", encoding="utf-8") as reader: |
| payload = json.load(reader) |
| if not isinstance(payload, dict): |
| raise TypeError(f"WanGP settings file '{Path(source_path).name}' must contain a JSON object.") |
| return payload |
|
|
|
|
| def _get_model_def_from_settings_payload(payload: dict[str, Any]) -> dict[str, Any] | None: |
| model_type = str(payload.get("model_type", "")).strip() |
| if len(model_type) == 0: |
| return None |
| get_model_def = _get_main_callable("get_model_def") |
| if not callable(get_model_def): |
| return None |
| try: |
| model_def = get_model_def(model_type) |
| except Exception: |
| return None |
| return model_def if isinstance(model_def, dict) else None |
|
|
|
|
| def _basename_lora_key(value: Any) -> str: |
| return Path(str(value or "").strip().replace("\\", "/")).name.casefold() |
|
|
|
|
| def _normalize_lora_cache_path(value: Any) -> str: |
| path = str(value or "").strip().replace("\\", "/") |
| while "//" in path: |
| path = path.replace("//", "/") |
| return path.casefold() |
|
|
|
|
| def _read_loras_url_cache() -> dict[str, str]: |
| cache_path = _DEEPY_DIR.parents[1] / "loras_url_cache.json" |
| if not cache_path.is_file(): |
| return {} |
| try: |
| payload = json.loads(cache_path.read_text(encoding="utf-8")) |
| except Exception: |
| return {} |
| if not isinstance(payload, dict): |
| return {} |
| return { |
| _normalize_lora_cache_path(key): str(value).strip() |
| for key, value in payload.items() |
| if len(str(key).strip()) > 0 and len(str(value).strip()) > 0 |
| } |
|
|
|
|
| def _format_lora_multiplier(value: Any) -> str: |
| if value is None: |
| return "1" |
| if isinstance(value, bool): |
| raise TypeError("LoRA multiplier values must be strings or numbers.") |
| if isinstance(value, (int, float)): |
| number = float(value) |
| if not math.isfinite(number): |
| raise ValueError("LoRA multiplier values must be finite.") |
| return f"{number:g}" |
| text = str(value).strip() |
| return text if len(text) > 0 else "1" |
|
|
|
|
| def _resolve_tool_lora_dir(tool_name: str, variant: str) -> tuple[str, Path]: |
| payload = load_tool_preset(tool_name, variant) |
| model_type = str(payload.get("model_type", "") or "").strip() |
| if len(model_type) == 0: |
| raise ValueError(f"Deepy preset '{variant}' for tool '{tool_name}' does not define a model_type.") |
| get_lora_dir = _get_main_callable("get_lora_dir") |
| if not callable(get_lora_dir): |
| raise RuntimeError("WanGP get_lora_dir(model_type) is not available.") |
| return model_type, Path(get_lora_dir(model_type)) |
|
|
|
|
| def _list_tool_lora_entries(tool_name: str, variant: str) -> list[tuple[str, str]]: |
| lookup_name = str(tool_name or "").strip() |
| if lookup_name not in GENERATION_TOOL_IDS: |
| raise ValueError(f"LoRAs are only available for the 6 generation tools: {', '.join(GENERATION_TOOL_IDS)}.") |
| _model_type, lora_dir = _resolve_tool_lora_dir(lookup_name, variant) |
| if not lora_dir.is_dir(): |
| return [] |
| url_cache = _read_loras_url_cache() |
| discovered: dict[str, tuple[str, str]] = {} |
| for pattern in ("*.safetensors", "*.sft"): |
| for path in sorted(lora_dir.glob(pattern)): |
| if not path.is_file(): |
| continue |
| filename = path.name |
| cache_key = _normalize_lora_cache_path(lora_dir / filename) |
| original_entry = url_cache.get(cache_key, filename) |
| discovered.setdefault(_basename_lora_key(filename), (filename, original_entry)) |
| return sorted(discovered.values(), key=lambda item: item[0].casefold()) |
|
|
|
|
| def _int_setting(payload: dict[str, Any], key: str) -> int | None: |
| value = payload.get(key, None) |
| if isinstance(value, bool): |
| return int(value) |
| if isinstance(value, int): |
| return value |
| try: |
| return int(str(value).strip()) |
| except Exception: |
| return None |
|
|
|
|
| def _sequence_setting_has_value(payload: dict[str, Any], key: str) -> bool: |
| value = payload.get(key, None) |
| if isinstance(value, list): |
| return any(len(str(item).strip()) > 0 for item in value if item is not None) |
| return len(str(value or "").strip()) > 0 |
|
|
|
|
| def _add_unique_flags(value: Any, flags: str) -> str: |
| text = str(value or "").strip() |
| for flag in str(flags or ""): |
| if len(flag.strip()) == 0 or flag in text: |
| continue |
| text += flag |
| return text |
|
|
|
|
| def validate_wangp_settings_payload_for_tool(tool_name: str, payload: dict[str, Any]) -> str | None: |
| lookup_name = str(tool_name or "").strip() |
| if lookup_name not in _TOOL_TEMPLATE_VALIDATION_ERRORS: |
| return None |
| image_mode = _int_setting(payload, "image_mode") |
| accepts_audio_prompt = "A" in str(payload.get("audio_prompt_type", "") or "") |
| has_image_refs = _sequence_setting_has_value(payload, "image_refs") |
| model_def = _get_model_def_from_settings_payload(payload) |
| audio_only = bool(model_def.get("audio_only", False)) if isinstance(model_def, dict) else False |
| checks = { |
| "gen_video": image_mode == 0, |
| "gen_video_with_speech": image_mode == 0 and accepts_audio_prompt, |
| "gen_image": image_mode == 1, |
| "edit_image": image_mode == 1 and has_image_refs, |
| "gen_speech_from_description": audio_only, |
| "gen_speech_from_sample": audio_only and accepts_audio_prompt, |
| } |
| return None if checks.get(lookup_name, True) else _TOOL_TEMPLATE_VALIDATION_ERRORS[lookup_name] |
|
|
|
|
| def validate_wangp_settings_for_tool(tool_name: str, source_path: Path) -> str | None: |
| return validate_wangp_settings_payload_for_tool(tool_name, _load_wangp_settings_payload(source_path)) |
|
|
|
|
| def build_linked_tool_variant(state: Any, source_path: Path) -> str: |
| source_file = Path(source_path).resolve() |
| if not source_file.is_file() or source_file.suffix.lower() not in WAN_GP_SETTINGS_SUFFIXES: |
| raise FileNotFoundError(f"Deepy source settings file not found: {source_file}") |
| payload = _load_wangp_settings_payload(source_file) |
| model_type = str(payload.get("model_type", "")).strip() |
| if len(model_type) == 0: |
| get_state_model_type = _get_main_callable("get_state_model_type") |
| if callable(get_state_model_type): |
| try: |
| model_type = str(get_state_model_type(state) or "").strip() |
| except Exception: |
| model_type = "" |
| base_model_type = _get_base_model_type_name(model_type) |
| if len(base_model_type) == 0: |
| raise ValueError(f"Unable to resolve base model type for {source_file.name}.") |
| return f"{base_model_type}/{source_file.name}" |
|
|
|
|
| def is_linked_tool_variant(requested_variant: Any) -> bool: |
| return _parse_linked_variant(requested_variant) is not None |
|
|
|
|
| def resolve_tool_variant(tool_name: str, requested_variant: Any, default_variant: str | None = None) -> str: |
| tool_name = str(tool_name or "").strip() |
| static_variants = [str(entry.get("variant", "")).strip() for entry in _preset_index().get(tool_name, ()) if len(str(entry.get("variant", "")).strip()) > 0] |
| if len(static_variants) == 0: |
| raise FileNotFoundError(f"No Deepy presets found for tool '{tool_name}' in {SETTINGS_DIR}.") |
| requested = _canonical_variant(tool_name, requested_variant) |
| if len(requested) > 0: |
| linked_variant = _normalize_linked_variant(requested) |
| if linked_variant is not None: |
| if _resolve_linked_variant_path(linked_variant) is not None: |
| return linked_variant |
| else: |
| resolved_variant = find_tool_variant(tool_name, requested) |
| if resolved_variant is not None: |
| return resolved_variant |
| fallback = _canonical_variant(tool_name, default_variant) |
| if len(fallback) > 0: |
| linked_variant = _normalize_linked_variant(fallback) |
| if linked_variant is not None: |
| if _resolve_linked_variant_path(linked_variant) is not None: |
| return linked_variant |
| else: |
| resolved_variant = find_tool_variant(tool_name, fallback) |
| if resolved_variant is not None: |
| return resolved_variant |
| return static_variants[0] |
|
|
|
|
| def get_default_image_generator_variant() -> str: |
| configured = _get_configured_tool_variant("gen_image") |
| return resolve_tool_variant("gen_image", configured, default_variant=DEEPY_DEFAULT_GEN_IMAGE) |
|
|
|
|
| def get_default_video_generator_variant() -> str: |
| configured = _get_configured_tool_variant("gen_video") |
| return resolve_tool_variant("gen_video", configured, default_variant=DEEPY_DEFAULT_GEN_VIDEO) |
|
|
|
|
| def get_default_image_editor_variant() -> str: |
| configured = _get_configured_tool_variant("edit_image") |
| return resolve_tool_variant("edit_image", configured, default_variant=DEEPY_DEFAULT_EDIT_IMAGE) |
|
|
|
|
| def get_default_video_with_speech_variant() -> str: |
| configured = _get_configured_tool_variant("gen_video_with_speech") |
| return resolve_tool_variant("gen_video_with_speech", configured, default_variant=DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH) |
|
|
|
|
| def get_default_speech_from_description_variant() -> str: |
| configured = _get_configured_tool_variant("gen_speech_from_description") |
| return resolve_tool_variant("gen_speech_from_description", configured, default_variant=DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION) |
|
|
|
|
| def get_default_speech_from_sample_variant() -> str: |
| configured = _get_configured_tool_variant("gen_speech_from_sample") |
| return resolve_tool_variant("gen_speech_from_sample", configured, default_variant=DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE) |
|
|
|
|
| def _format_ineligible_tool_settings_error(tool_name: str, error_text: str) -> str: |
| return f"settings no eligible for tool {str(tool_name or '').strip()}: {str(error_text or '').strip()}" |
|
|
|
|
| @lru_cache(maxsize=None) |
| def _load_static_tool_preset(tool_name: str, variant: str) -> dict[str, Any]: |
| preset_path = None |
| for entry in _preset_index().get(str(tool_name or "").strip(), ()): |
| if str(entry.get("variant", "")).strip() == str(variant or "").strip(): |
| preset_path = Path(entry["path"]) |
| break |
| if preset_path is None or not preset_path.is_file(): |
| raise FileNotFoundError(f"Deepy preset file not found for tool '{tool_name}' variant '{variant}'.") |
| with preset_path.open("r", encoding="utf-8") as reader: |
| payload = json.load(reader) |
| if not isinstance(payload, dict): |
| raise TypeError(f"Deepy preset '{preset_path.name}' must contain a JSON object.") |
| return payload |
|
|
|
|
| def _load_live_tool_preset(tool_name: str, variant: str, preset_path: Path) -> dict[str, Any]: |
| resolved_path = Path(preset_path).resolve() |
| if not resolved_path.is_file(): |
| raise FileNotFoundError(f"Deepy preset file not found for tool '{tool_name}' variant '{variant}'.") |
| cache_key = (str(tool_name or "").strip(), str(variant or "").strip()) |
| stat = resolved_path.stat() |
| mtime_ns = int(getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1_000_000_000))) |
| cached = _LIVE_FILE_PRESET_CACHE.get(cache_key) |
| if isinstance(cached, dict) and cached.get("path") == str(resolved_path) and int(cached.get("mtime_ns", -1)) == mtime_ns: |
| cached_error = str(cached.get("eligibility_error", "") or "").strip() |
| if len(cached_error) > 0: |
| raise ValueError(_format_ineligible_tool_settings_error(tool_name, cached_error)) |
| cached_payload = cached.get("payload", None) |
| if isinstance(cached_payload, dict): |
| return cached_payload |
| payload = _load_wangp_settings_payload(resolved_path) |
| eligibility_error = str(validate_wangp_settings_payload_for_tool(tool_name, payload) or "").strip() |
| _LIVE_FILE_PRESET_CACHE[cache_key] = { |
| "path": str(resolved_path), |
| "mtime_ns": mtime_ns, |
| "payload": copy.deepcopy(payload) if len(eligibility_error) == 0 else None, |
| "eligibility_error": eligibility_error, |
| } |
| if len(eligibility_error) > 0: |
| raise ValueError(_format_ineligible_tool_settings_error(tool_name, eligibility_error)) |
| return payload |
|
|
|
|
| def load_tool_preset(tool_name: str, variant: str) -> dict[str, Any]: |
| lookup_name = str(tool_name or "").strip() |
| resolved_variant = resolve_tool_variant(lookup_name, variant) |
| linked_path = _resolve_linked_variant_path(resolved_variant) |
| if linked_path is not None: |
| return _load_live_tool_preset(lookup_name, resolved_variant, linked_path) |
| return _load_static_tool_preset(lookup_name, resolved_variant) |
|
|
|
|
| def clone_tool_preset(tool_name: str, variant: str) -> dict[str, Any]: |
| return copy.deepcopy(load_tool_preset(tool_name, variant)) |
|
|
|
|
| def refresh_tool_presets() -> None: |
| _preset_index.cache_clear() |
| _load_static_tool_preset.cache_clear() |
| _LIVE_FILE_PRESET_CACHE.clear() |
|
|
|
|
| def list_tool_loras(tool_name: str, variant: str) -> list[str]: |
| return [filename for filename, _original_entry in _list_tool_lora_entries(tool_name, variant)] |
|
|
|
|
| def normalize_tool_loras(tool_name: str, variant: str, loras: Any) -> tuple[list[str], str]: |
| if loras is None: |
| return [], "" |
| if not isinstance(loras, list): |
| raise TypeError("loras must be an array of objects.") |
| available_loras = _list_tool_lora_entries(tool_name, variant) |
| available_by_key = {_basename_lora_key(filename): (filename, original_entry) for filename, original_entry in available_loras} |
| normalized_loras = [] |
| multiplier_tokens = [] |
| seen_keys: set[str] = set() |
| for index, item in enumerate(loras, start=1): |
| if isinstance(item, str): |
| raw_name = item |
| raw_multiplier = 1 |
| elif isinstance(item, dict): |
| raw_name = item.get("name", "") |
| raw_multiplier = item.get("multiplier", 1) |
| else: |
| raise TypeError(f"LoRA entry #{index} must be an object with a name.") |
| raw_name = Path(str(raw_name or "").strip().replace("\\", "/")).name |
| if len(raw_name) == 0: |
| raise ValueError(f"LoRA entry #{index} is missing a filename.") |
| lora_key = _basename_lora_key(raw_name) |
| if lora_key in seen_keys: |
| raise ValueError(f"LoRA '{raw_name}' was provided more than once.") |
| resolved_entry = available_by_key.get(lora_key, None) |
| if resolved_entry is None: |
| raise ValueError(f"Unknown LoRA filename '{raw_name}' for tool '{tool_name}'. Call get_loras first.") |
| _resolved_name, original_entry = resolved_entry |
| normalized_loras.append(original_entry) |
| multiplier_tokens.append(_format_lora_multiplier(raw_multiplier)) |
| seen_keys.add(lora_key) |
| return normalized_loras, " ".join(multiplier_tokens).strip() |
|
|
|
|
| def apply_tool_loras(tool_name: str, variant: str, task: dict[str, Any], loras: Any) -> dict[str, Any]: |
| normalized_loras, normalized_multipliers = normalize_tool_loras(tool_name, variant, loras) |
| if len(normalized_loras) == 0: |
| return task |
| existing_loras = [str(value).strip() for value in list(task.get("activated_loras", []) or []) if len(str(value).strip()) > 0] |
| existing_multipliers = str(task.get("loras_multipliers", "") or "").strip() |
| merged_loras, merged_multipliers = merge_loras_settings( |
| existing_loras, |
| existing_multipliers, |
| normalized_loras, |
| normalized_multipliers, |
| "merge after", |
| path_key=_basename_lora_key, |
| ) |
| task["activated_loras"] = merged_loras |
| task["loras_multipliers"] = merged_multipliers |
| return task |
|
|
|
|
| def build_generation_task( |
| tool_name: str, |
| variant: str, |
| *, |
| prompt: str, |
| client_id: str, |
| alt_prompt: str | None = None, |
| audio_guide: str | None = None, |
| image_start_target: str = "image_start", |
| image_start: str | None = None, |
| image_end: str | None = None, |
| image_refs: list[str] | None = None, |
| ) -> dict[str, Any]: |
| task = clone_tool_preset(tool_name, variant) |
| task["prompt"] = str(prompt or "").strip() |
| task["client_id"] = str(client_id or "").strip() |
| uses_image_refs = str(image_start_target or "image_start").strip() == "image_refs" |
| model_def = _get_model_def_from_settings_payload(task) |
| image_prompt_types_allowed = str((model_def or {}).get("image_prompt_types_allowed", "") or "").strip() |
| if alt_prompt is not None: |
| alt_prompt = str(alt_prompt).strip() |
| if len(alt_prompt) > 0: |
| task["alt_prompt"] = alt_prompt |
| if audio_guide is not None: |
| audio_guide = str(audio_guide).strip() |
| if len(audio_guide) > 0: |
| task["audio_guide"] = audio_guide |
| if image_start is not None: |
| image_start = str(image_start).strip() |
| if len(image_start) > 0: |
| if uses_image_refs: |
| existing_image_refs = task.get("image_refs", None) |
| image_refs_list = [] if not isinstance(existing_image_refs, list) else [str(path).strip() for path in existing_image_refs if len(str(path).strip()) > 0] |
| image_refs_list.insert(0, image_start) |
| task["image_refs"] = image_refs_list |
| task.pop("image_start", None) |
| else: |
| task["image_start"] = image_start |
| if image_end is not None: |
| image_end = str(image_end).strip() |
| if len(image_end) > 0: |
| task["image_end"] = image_end |
| if image_refs is not None: |
| image_refs_list = [str(path).strip() for path in image_refs if len(str(path).strip()) > 0] |
| existing_image_refs = task.get("image_refs", None) |
| if isinstance(existing_image_refs, list) and len(existing_image_refs) > 0: |
| merged_image_refs = [str(path).strip() for path in existing_image_refs if len(str(path).strip()) > 0] |
| merged_image_refs.extend(path for path in image_refs_list if path not in merged_image_refs) |
| task["image_refs"] = merged_image_refs |
| else: |
| task["image_refs"] = image_refs_list |
| has_image_start = len(str(task.get("image_start", "") or "").strip()) > 0 |
| has_image_end = len(str(task.get("image_end", "") or "").strip()) > 0 |
| has_image_refs = any(len(str(path).strip()) > 0 for path in task.get("image_refs", []) or []) |
| image_prompt_type = str(task.get("image_prompt_type", "") or "").strip() |
| if not uses_image_refs and not has_image_start and "S" in image_prompt_type and "T" in image_prompt_types_allowed: |
| image_prompt_type = image_prompt_type.replace("S", "") |
| image_prompt_type = _add_unique_flags(image_prompt_type, "T") |
| if not has_image_end and "E" in image_prompt_type: |
| image_prompt_type = image_prompt_type.replace("E", "") |
| task["image_prompt_type"] = image_prompt_type |
| if has_image_start: |
| if "S" not in image_prompt_types_allowed: |
| raise ValueError("This preset does not support a Start Image.") |
| task["image_prompt_type"] = _add_unique_flags(task.get("image_prompt_type", ""), "S") |
| if has_image_end: |
| if "E" not in image_prompt_types_allowed: |
| raise ValueError("This preset does not support an End Image.") |
| task["image_prompt_type"] = _add_unique_flags(task.get("image_prompt_type", ""), "E") |
| if has_image_refs and str(tool_name or "").strip() in {"gen_video", "gen_video_with_speech"} and "I" not in str(task.get("video_prompt_type", "") or ""): |
| raise ValueError("This preset received Reference Images but its Video Prompt Type does not enable them.") |
| return task |
|
|
|
|
| __all__ = [ |
| "GENERATION_TOOL_IDS", |
| "apply_tool_loras", |
| "DEFAULT_IMAGE_EDITOR_VARIANT", |
| "TOOL_DISPLAY_NAMES", |
| "SETTINGS_DIR", |
| "build_generation_task", |
| "build_linked_tool_variant", |
| "clone_tool_preset", |
| "find_tool_variant", |
| "get_default_image_editor_variant", |
| "get_default_image_generator_variant", |
| "get_default_speech_from_description_variant", |
| "get_default_speech_from_sample_variant", |
| "get_default_video_generator_variant", |
| "get_default_video_with_speech_variant", |
| "list_tool_loras", |
| "get_tool_variant_model_def", |
| "get_tool_variant_path", |
| "is_linked_tool_variant", |
| "list_tool_variant_choices", |
| "list_tool_variants", |
| "normalize_tool_loras", |
| "load_tool_preset", |
| "refresh_tool_presets", |
| "resolve_tool_variant", |
| "resolve_wangp_settings_file", |
| "validate_wangp_settings_for_tool", |
| "validate_wangp_settings_payload_for_tool", |
| ] |
|
|