Wan2GP / shared /deepy /tool_settings.py
Egnalkram's picture
Upload folder using huggingface_hub
4689c2b verified
from __future__ import annotations
import copy
import json
import os
import sys
from functools import lru_cache
from pathlib import Path
from typing import Any
from shared.deepy.config import (
DEEPY_DEFAULT_EDIT_IMAGE,
DEEPY_DEFAULT_GEN_IMAGE,
DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION,
DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE,
DEEPY_DEFAULT_GEN_VIDEO,
DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH,
DEEPY_TOOL_EDIT_IMAGE_KEY,
DEEPY_TOOL_GEN_IMAGE_KEY,
DEEPY_TOOL_GEN_SPEECH_FROM_DESCRIPTION_KEY,
DEEPY_TOOL_GEN_SPEECH_FROM_SAMPLE_KEY,
DEEPY_TOOL_GEN_VIDEO_KEY,
DEEPY_TOOL_GEN_VIDEO_WITH_SPEECH_KEY,
get_deepy_config_value,
normalize_deepy_tool_edit_image,
normalize_deepy_tool_gen_image,
normalize_deepy_tool_gen_speech_from_description,
normalize_deepy_tool_gen_speech_from_sample,
normalize_deepy_tool_gen_video,
normalize_deepy_tool_gen_video_with_speech,
)
_DEEPY_DIR = Path(__file__).resolve().parent
SETTINGS_DIR = _DEEPY_DIR / "settings"
DEFAULT_IMAGE_EDITOR_VARIANT = DEEPY_DEFAULT_EDIT_IMAGE
DEFAULT_VIDEO_WITH_SPEECH_VARIANT = DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH
DEFAULT_SPEECH_FROM_DESCRIPTION_VARIANT = DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION
DEFAULT_SPEECH_FROM_SAMPLE_VARIANT = DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE
TOOL_DISPLAY_NAMES = {
"gen_image": "Image Generator",
"edit_image": "Image Editor",
"gen_video": "Video Generator",
"gen_video_with_speech": "Video With Speech",
"gen_speech_from_description": "Speech From Description",
"gen_speech_from_sample": "Speech From Sample",
}
_TOOL_TEMPLATE_VALIDATION_ERRORS = {
"gen_video": "The settings should generate a video",
"gen_video_with_speech": "The settings should generate a Video and accept an Audio Prompt",
"gen_image": "The settings of the model must generate an Image",
"edit_image": "The settings of the model must generate an Image and accept an Image Ref",
"gen_speech_from_description": "The model should generate only an audio output",
"gen_speech_from_sample": "The model should generate only an audio output and a sample audio ois expected",
}
_TOOL_CONFIG_SPECS = {
"gen_image": {"key": DEEPY_TOOL_GEN_IMAGE_KEY, "default": DEEPY_DEFAULT_GEN_IMAGE, "normalize": normalize_deepy_tool_gen_image},
"edit_image": {"key": DEEPY_TOOL_EDIT_IMAGE_KEY, "default": DEEPY_DEFAULT_EDIT_IMAGE, "normalize": normalize_deepy_tool_edit_image},
"gen_video": {"key": DEEPY_TOOL_GEN_VIDEO_KEY, "default": DEEPY_DEFAULT_GEN_VIDEO, "normalize": normalize_deepy_tool_gen_video},
"gen_video_with_speech": {
"key": DEEPY_TOOL_GEN_VIDEO_WITH_SPEECH_KEY,
"default": DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH,
"normalize": normalize_deepy_tool_gen_video_with_speech,
},
"gen_speech_from_description": {
"key": DEEPY_TOOL_GEN_SPEECH_FROM_DESCRIPTION_KEY,
"default": DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION,
"normalize": normalize_deepy_tool_gen_speech_from_description,
},
"gen_speech_from_sample": {
"key": DEEPY_TOOL_GEN_SPEECH_FROM_SAMPLE_KEY,
"default": DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE,
"normalize": normalize_deepy_tool_gen_speech_from_sample,
},
}
_PRESET_SOURCE_BUILTIN = "builtin"
_PRESET_SOURCE_LINKED = "linked"
_PRESET_SOURCE_PRIORITY = {
_PRESET_SOURCE_BUILTIN: 0,
_PRESET_SOURCE_LINKED: 1,
}
_LEGACY_VARIANT_ALIASES = {
"edit_image": {"Qwen_Edit": DEEPY_DEFAULT_EDIT_IMAGE},
"gen_image": {"Z_Image_Turbo": DEEPY_DEFAULT_GEN_IMAGE},
"gen_video": {"ltx2_22B_distilled": DEEPY_DEFAULT_GEN_VIDEO},
}
_LIVE_FILE_PRESET_CACHE: dict[tuple[str, str], dict[str, Any]] = {}
def _canonical_variant(tool_name: str, variant: Any) -> str:
text = str(variant or "").strip()
if len(text) == 0:
return ""
return _LEGACY_VARIANT_ALIASES.get(str(tool_name or "").strip(), {}).get(text, text)
def _tool_config_spec(tool_name: str) -> dict[str, Any]:
return _TOOL_CONFIG_SPECS.get(str(tool_name or "").strip(), {})
def _get_configured_tool_variant(tool_name: str) -> str:
spec = _tool_config_spec(tool_name)
if len(spec) == 0:
return ""
raw_value = get_deepy_config_value(spec["key"], spec["default"])
return str(spec["normalize"](raw_value) or "").strip()
def _looks_like_linked_variant(value: Any) -> bool:
text = str(value or "").strip().strip('"').replace("\\", "/")
if len(text) == 0 or not text.lower().endswith(".json"):
return False
if text.startswith("/") or text.startswith("./") or text.startswith("../"):
return False
if len(text) >= 2 and text[1] == ":":
return False
parts = [part.strip() for part in text.split("/")]
return len(parts) == 2 and all(parts)
def _normalize_linked_variant(value: Any) -> str | None:
if not _looks_like_linked_variant(value):
return None
base_model_type, filename = [part.strip() for part in str(value or "").strip().strip('"').replace("\\", "/").split("/", 1)]
return f"{base_model_type}/{Path(filename).name}"
def _parse_linked_variant(value: Any) -> tuple[str, str] | None:
normalized = _normalize_linked_variant(value)
if normalized is None:
return None
return tuple(normalized.split("/", 1)) # type: ignore[return-value]
def _get_main_callable(name: str) -> Any:
main_module = sys.modules.get("__main__")
return None if main_module is None else getattr(main_module, str(name or "").strip(), None)
def _get_base_model_type_name(model_type: Any) -> str:
text = str(model_type or "").strip()
if len(text) == 0:
return ""
get_base_model_type = _get_main_callable("get_base_model_type")
if callable(get_base_model_type):
try:
resolved = str(get_base_model_type(text) or "").strip()
except Exception:
resolved = ""
if len(resolved) > 0:
return resolved
return text
def _resolve_linked_variant_path(value: Any) -> Path | None:
parsed = _parse_linked_variant(value)
if parsed is None:
return None
base_model_type, filename = parsed
get_lora_dir = _get_main_callable("get_lora_dir")
if not callable(get_lora_dir):
return None
try:
lora_dir = Path(get_lora_dir(base_model_type))
except Exception:
return None
candidate = (lora_dir / filename).resolve()
if candidate.is_file() and candidate.suffix.lower() == ".json":
return candidate
return None
def _iter_builtin_settings_dirs() -> tuple[Path, ...]:
if not SETTINGS_DIR.is_dir():
return ()
return tuple(sorted(path for path in SETTINGS_DIR.iterdir() if path.is_dir()))
def _build_preset_entry(path: Path, *, variant: str | None = None, label: str | None = None, source: str | None = None) -> dict[str, Any] | None:
variant_value = str(variant or path.stem or "").strip()
if len(variant_value) == 0:
return None
return {
"variant": variant_value,
"label": str(label or variant_value).strip() or variant_value,
"path": path,
"source": str(source or _PRESET_SOURCE_BUILTIN).strip() or _PRESET_SOURCE_BUILTIN,
}
def _build_linked_variant_entry(variant: Any) -> dict[str, Any] | None:
normalized = _normalize_linked_variant(variant)
if normalized is None:
return None
preset_path = _resolve_linked_variant_path(normalized)
if preset_path is None:
return None
_, filename = normalized.split("/", 1)
label = Path(filename).stem
return _build_preset_entry(preset_path, variant=normalized, label=label, source=_PRESET_SOURCE_LINKED)
def _add_or_replace_preset_entry(tool_entries: list[dict[str, Any]], entry: dict[str, Any]) -> None:
variant = str(entry.get("variant", "")).strip()
if len(variant) == 0:
return
for index, existing in enumerate(tool_entries):
if str(existing.get("variant", "")).strip() != variant:
continue
current_priority = _PRESET_SOURCE_PRIORITY.get(str(existing.get("source", _PRESET_SOURCE_BUILTIN)).strip(), 0)
next_priority = _PRESET_SOURCE_PRIORITY.get(str(entry.get("source", _PRESET_SOURCE_BUILTIN)).strip(), 0)
if next_priority >= current_priority:
tool_entries[index] = entry
return
tool_entries.append(entry)
@lru_cache(maxsize=1)
def _preset_index() -> dict[str, tuple[dict[str, Any], ...]]:
index: dict[str, list[dict[str, Any]]] = {}
for tool_dir in _iter_builtin_settings_dirs():
tool_name = str(tool_dir.name or "").strip()
if len(tool_name) == 0:
continue
tool_entries = index.setdefault(tool_name, [])
for path in sorted(tool_dir.glob("*.json")):
entry = _build_preset_entry(path)
if entry is not None:
_add_or_replace_preset_entry(tool_entries, entry)
return {tool_name: tuple(entries) for tool_name, entries in index.items()}
def _tool_entries(tool_name: str, current_variant: Any = None) -> list[dict[str, Any]]:
entries = list(_preset_index().get(str(tool_name or "").strip(), ()))
seen = {str(entry.get("variant", "")).strip() for entry in entries}
for candidate in (_get_configured_tool_variant(tool_name), current_variant):
linked_entry = _build_linked_variant_entry(candidate)
if linked_entry is not None:
variant = str(linked_entry.get("variant", "")).strip()
if variant not in seen:
entries.append(linked_entry)
seen.add(variant)
return entries
def list_tool_variants(tool_name: str, current_variant: Any = None) -> list[str]:
return [str(entry.get("variant", "")).strip() for entry in _tool_entries(tool_name, current_variant=current_variant) if len(str(entry.get("variant", "")).strip()) > 0]
def list_tool_variant_choices(tool_name: str, current_variant: Any = None) -> list[tuple[str, str]]:
return [
(str(entry.get("label", "")).strip() or str(entry.get("variant", "")).strip(), str(entry.get("variant", "")).strip())
for entry in _tool_entries(tool_name, current_variant=current_variant)
if len(str(entry.get("variant", "")).strip()) > 0
]
def find_tool_variant(tool_name: str, requested_variant: Any, current_variant: Any = None) -> str | None:
tool_name = str(tool_name or "").strip()
requested = _canonical_variant(tool_name, requested_variant)
if len(requested) == 0:
return None
linked_variant = _normalize_linked_variant(requested)
if linked_variant is not None:
return linked_variant if _resolve_linked_variant_path(linked_variant) is not None else None
variants = list_tool_variants(tool_name, current_variant=current_variant)
if requested in variants:
return requested
requested_cf = requested.casefold()
for variant in variants:
if variant.casefold() == requested_cf:
return variant
return None
def get_tool_variant_path(tool_name: str, requested_variant: Any, current_variant: Any = None) -> Path | None:
linked_path = _resolve_linked_variant_path(requested_variant)
if linked_path is not None:
return linked_path
resolved_variant = find_tool_variant(tool_name, requested_variant, current_variant=current_variant)
if resolved_variant is None:
return None
for entry in _tool_entries(tool_name, current_variant=current_variant):
if str(entry.get("variant", "")).strip() == resolved_variant:
return Path(entry["path"])
return None
def get_tool_variant_model_def(tool_name: str, variant: Any) -> dict[str, Any] | None:
try:
payload = load_tool_preset(tool_name, str(variant or "").strip())
except Exception:
return None
model_def = _get_model_def_from_settings_payload(payload)
return dict(model_def or {}) if isinstance(model_def, dict) else None
def resolve_wangp_settings_file(state: Any, selected_value: Any) -> Path | None:
value = str(selected_value or "").strip()
if len(value) == 0 or "/" in value or "\\" in value or not value.lower().endswith(".json"):
return None
get_state_model_type = _get_main_callable("get_state_model_type")
get_lora_dir = _get_main_callable("get_lora_dir")
if not callable(get_state_model_type) or not callable(get_lora_dir):
return None
try:
model_type = get_state_model_type(state)
lora_dir = Path(get_lora_dir(model_type))
except Exception:
return None
source_path = (lora_dir / Path(value).name).resolve()
if source_path.is_file() and source_path.suffix.lower() == ".json":
return source_path
return None
def _load_wangp_settings_payload(source_path: Path) -> dict[str, Any]:
with Path(source_path).resolve().open("r", encoding="utf-8") as reader:
payload = json.load(reader)
if not isinstance(payload, dict):
raise TypeError(f"WanGP settings file '{Path(source_path).name}' must contain a JSON object.")
return payload
def _get_model_def_from_settings_payload(payload: dict[str, Any]) -> dict[str, Any] | None:
model_type = str(payload.get("model_type", "")).strip()
if len(model_type) == 0:
return None
get_model_def = _get_main_callable("get_model_def")
if not callable(get_model_def):
return None
try:
model_def = get_model_def(model_type)
except Exception:
return None
return model_def if isinstance(model_def, dict) else None
def _int_setting(payload: dict[str, Any], key: str) -> int | None:
value = payload.get(key, None)
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
try:
return int(str(value).strip())
except Exception:
return None
def _sequence_setting_has_value(payload: dict[str, Any], key: str) -> bool:
value = payload.get(key, None)
if isinstance(value, list):
return any(len(str(item).strip()) > 0 for item in value if item is not None)
return len(str(value or "").strip()) > 0
def _add_unique_flags(value: Any, flags: str) -> str:
text = str(value or "").strip()
for flag in str(flags or ""):
if len(flag.strip()) == 0 or flag in text:
continue
text += flag
return text
def validate_wangp_settings_payload_for_tool(tool_name: str, payload: dict[str, Any]) -> str | None:
lookup_name = str(tool_name or "").strip()
if lookup_name not in _TOOL_TEMPLATE_VALIDATION_ERRORS:
return None
image_mode = _int_setting(payload, "image_mode")
accepts_audio_prompt = "A" in str(payload.get("audio_prompt_type", "") or "")
has_image_refs = _sequence_setting_has_value(payload, "image_refs")
model_def = _get_model_def_from_settings_payload(payload)
audio_only = bool(model_def.get("audio_only", False)) if isinstance(model_def, dict) else False
checks = {
"gen_video": image_mode == 0,
"gen_video_with_speech": image_mode == 0 and accepts_audio_prompt,
"gen_image": image_mode == 1,
"edit_image": image_mode == 1 and has_image_refs,
"gen_speech_from_description": audio_only,
"gen_speech_from_sample": audio_only and accepts_audio_prompt,
}
return None if checks.get(lookup_name, True) else _TOOL_TEMPLATE_VALIDATION_ERRORS[lookup_name]
def validate_wangp_settings_for_tool(tool_name: str, source_path: Path) -> str | None:
return validate_wangp_settings_payload_for_tool(tool_name, _load_wangp_settings_payload(source_path))
def build_linked_tool_variant(state: Any, source_path: Path) -> str:
source_file = Path(source_path).resolve()
if not source_file.is_file() or source_file.suffix.lower() != ".json":
raise FileNotFoundError(f"Deepy source settings file not found: {source_file}")
payload = _load_wangp_settings_payload(source_file)
model_type = str(payload.get("model_type", "")).strip()
if len(model_type) == 0:
get_state_model_type = _get_main_callable("get_state_model_type")
if callable(get_state_model_type):
try:
model_type = str(get_state_model_type(state) or "").strip()
except Exception:
model_type = ""
base_model_type = _get_base_model_type_name(model_type)
if len(base_model_type) == 0:
raise ValueError(f"Unable to resolve base model type for {source_file.name}.")
return f"{base_model_type}/{source_file.name}"
def is_linked_tool_variant(requested_variant: Any) -> bool:
return _parse_linked_variant(requested_variant) is not None
def resolve_tool_variant(tool_name: str, requested_variant: Any, default_variant: str | None = None) -> str:
tool_name = str(tool_name or "").strip()
static_variants = [str(entry.get("variant", "")).strip() for entry in _preset_index().get(tool_name, ()) if len(str(entry.get("variant", "")).strip()) > 0]
if len(static_variants) == 0:
raise FileNotFoundError(f"No Deepy presets found for tool '{tool_name}' in {SETTINGS_DIR}.")
requested = _canonical_variant(tool_name, requested_variant)
if len(requested) > 0:
linked_variant = _normalize_linked_variant(requested)
if linked_variant is not None:
if _resolve_linked_variant_path(linked_variant) is not None:
return linked_variant
else:
resolved_variant = find_tool_variant(tool_name, requested)
if resolved_variant is not None:
return resolved_variant
fallback = _canonical_variant(tool_name, default_variant)
if len(fallback) > 0:
linked_variant = _normalize_linked_variant(fallback)
if linked_variant is not None:
if _resolve_linked_variant_path(linked_variant) is not None:
return linked_variant
else:
resolved_variant = find_tool_variant(tool_name, fallback)
if resolved_variant is not None:
return resolved_variant
return static_variants[0]
def get_default_image_generator_variant() -> str:
configured = _get_configured_tool_variant("gen_image")
return resolve_tool_variant("gen_image", configured, default_variant=DEEPY_DEFAULT_GEN_IMAGE)
def get_default_video_generator_variant() -> str:
configured = _get_configured_tool_variant("gen_video")
return resolve_tool_variant("gen_video", configured, default_variant=DEEPY_DEFAULT_GEN_VIDEO)
def get_default_image_editor_variant() -> str:
configured = _get_configured_tool_variant("edit_image")
return resolve_tool_variant("edit_image", configured, default_variant=DEEPY_DEFAULT_EDIT_IMAGE)
def get_default_video_with_speech_variant() -> str:
configured = _get_configured_tool_variant("gen_video_with_speech")
return resolve_tool_variant("gen_video_with_speech", configured, default_variant=DEEPY_DEFAULT_GEN_VIDEO_WITH_SPEECH)
def get_default_speech_from_description_variant() -> str:
configured = _get_configured_tool_variant("gen_speech_from_description")
return resolve_tool_variant("gen_speech_from_description", configured, default_variant=DEEPY_DEFAULT_GEN_SPEECH_FROM_DESCRIPTION)
def get_default_speech_from_sample_variant() -> str:
configured = _get_configured_tool_variant("gen_speech_from_sample")
return resolve_tool_variant("gen_speech_from_sample", configured, default_variant=DEEPY_DEFAULT_GEN_SPEECH_FROM_SAMPLE)
def _format_ineligible_tool_settings_error(tool_name: str, error_text: str) -> str:
return f"settings no eligible for tool {str(tool_name or '').strip()}: {str(error_text or '').strip()}"
@lru_cache(maxsize=None)
def _load_static_tool_preset(tool_name: str, variant: str) -> dict[str, Any]:
preset_path = None
for entry in _preset_index().get(str(tool_name or "").strip(), ()):
if str(entry.get("variant", "")).strip() == str(variant or "").strip():
preset_path = Path(entry["path"])
break
if preset_path is None or not preset_path.is_file():
raise FileNotFoundError(f"Deepy preset file not found for tool '{tool_name}' variant '{variant}'.")
with preset_path.open("r", encoding="utf-8") as reader:
payload = json.load(reader)
if not isinstance(payload, dict):
raise TypeError(f"Deepy preset '{preset_path.name}' must contain a JSON object.")
return payload
def _load_live_tool_preset(tool_name: str, variant: str, preset_path: Path) -> dict[str, Any]:
resolved_path = Path(preset_path).resolve()
if not resolved_path.is_file():
raise FileNotFoundError(f"Deepy preset file not found for tool '{tool_name}' variant '{variant}'.")
cache_key = (str(tool_name or "").strip(), str(variant or "").strip())
stat = resolved_path.stat()
mtime_ns = int(getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1_000_000_000)))
cached = _LIVE_FILE_PRESET_CACHE.get(cache_key)
if isinstance(cached, dict) and cached.get("path") == str(resolved_path) and int(cached.get("mtime_ns", -1)) == mtime_ns:
cached_error = str(cached.get("eligibility_error", "") or "").strip()
if len(cached_error) > 0:
raise ValueError(_format_ineligible_tool_settings_error(tool_name, cached_error))
cached_payload = cached.get("payload", None)
if isinstance(cached_payload, dict):
return cached_payload
payload = _load_wangp_settings_payload(resolved_path)
eligibility_error = str(validate_wangp_settings_payload_for_tool(tool_name, payload) or "").strip()
_LIVE_FILE_PRESET_CACHE[cache_key] = {
"path": str(resolved_path),
"mtime_ns": mtime_ns,
"payload": copy.deepcopy(payload) if len(eligibility_error) == 0 else None,
"eligibility_error": eligibility_error,
}
if len(eligibility_error) > 0:
raise ValueError(_format_ineligible_tool_settings_error(tool_name, eligibility_error))
return payload
def load_tool_preset(tool_name: str, variant: str) -> dict[str, Any]:
lookup_name = str(tool_name or "").strip()
resolved_variant = resolve_tool_variant(lookup_name, variant)
linked_path = _resolve_linked_variant_path(resolved_variant)
if linked_path is not None:
return _load_live_tool_preset(lookup_name, resolved_variant, linked_path)
return _load_static_tool_preset(lookup_name, resolved_variant)
def clone_tool_preset(tool_name: str, variant: str) -> dict[str, Any]:
return copy.deepcopy(load_tool_preset(tool_name, variant))
def refresh_tool_presets() -> None:
_preset_index.cache_clear()
_load_static_tool_preset.cache_clear()
_LIVE_FILE_PRESET_CACHE.clear()
def build_generation_task(
tool_name: str,
variant: str,
*,
prompt: str,
client_id: str,
alt_prompt: str | None = None,
audio_guide: str | None = None,
image_start_target: str = "image_start",
image_start: str | None = None,
image_end: str | None = None,
image_refs: list[str] | None = None,
) -> dict[str, Any]:
task = clone_tool_preset(tool_name, variant)
task["prompt"] = str(prompt or "").strip()
task["client_id"] = str(client_id or "").strip()
uses_image_refs = str(image_start_target or "image_start").strip() == "image_refs"
model_def = _get_model_def_from_settings_payload(task)
image_prompt_types_allowed = str((model_def or {}).get("image_prompt_types_allowed", "") or "").strip()
if alt_prompt is not None:
alt_prompt = str(alt_prompt).strip()
if len(alt_prompt) > 0:
task["alt_prompt"] = alt_prompt
if audio_guide is not None:
audio_guide = str(audio_guide).strip()
if len(audio_guide) > 0:
task["audio_guide"] = audio_guide
if image_start is not None:
image_start = str(image_start).strip()
if len(image_start) > 0:
if uses_image_refs:
existing_image_refs = task.get("image_refs", None)
image_refs_list = [] if not isinstance(existing_image_refs, list) else [str(path).strip() for path in existing_image_refs if len(str(path).strip()) > 0]
image_refs_list.insert(0, image_start)
task["image_refs"] = image_refs_list
task.pop("image_start", None)
else:
task["image_start"] = image_start
if image_end is not None:
image_end = str(image_end).strip()
if len(image_end) > 0:
task["image_end"] = image_end
if image_refs is not None:
image_refs_list = [str(path).strip() for path in image_refs if len(str(path).strip()) > 0]
existing_image_refs = task.get("image_refs", None)
if isinstance(existing_image_refs, list) and len(existing_image_refs) > 0:
merged_image_refs = [str(path).strip() for path in existing_image_refs if len(str(path).strip()) > 0]
merged_image_refs.extend(path for path in image_refs_list if path not in merged_image_refs)
task["image_refs"] = merged_image_refs
else:
task["image_refs"] = image_refs_list
has_image_start = len(str(task.get("image_start", "") or "").strip()) > 0
has_image_end = len(str(task.get("image_end", "") or "").strip()) > 0
has_image_refs = any(len(str(path).strip()) > 0 for path in task.get("image_refs", []) or [])
if has_image_start:
if "S" not in image_prompt_types_allowed:
raise ValueError("This preset does not support a Start Image.")
task["image_prompt_type"] = _add_unique_flags(task.get("image_prompt_type", ""), "S")
if has_image_end:
if "E" not in image_prompt_types_allowed:
raise ValueError("This preset does not support an End Image.")
task["image_prompt_type"] = _add_unique_flags(task.get("image_prompt_type", ""), "E")
if has_image_refs and str(tool_name or "").strip() in {"gen_video", "gen_video_with_speech"} and "I" not in str(task.get("video_prompt_type", "") or ""):
raise ValueError("This preset received Reference Images but its Video Prompt Type does not enable them.")
return task
__all__ = [
"DEFAULT_IMAGE_EDITOR_VARIANT",
"TOOL_DISPLAY_NAMES",
"SETTINGS_DIR",
"build_generation_task",
"build_linked_tool_variant",
"clone_tool_preset",
"find_tool_variant",
"get_default_image_editor_variant",
"get_default_image_generator_variant",
"get_default_speech_from_description_variant",
"get_default_speech_from_sample_variant",
"get_default_video_generator_variant",
"get_default_video_with_speech_variant",
"get_tool_variant_model_def",
"get_tool_variant_path",
"is_linked_tool_variant",
"list_tool_variant_choices",
"list_tool_variants",
"load_tool_preset",
"refresh_tool_presets",
"resolve_tool_variant",
"resolve_wangp_settings_file",
"validate_wangp_settings_for_tool",
"validate_wangp_settings_payload_for_tool",
]