File size: 2,939 Bytes
7344bef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import annotations

import json
import re
import zipfile
from pathlib import Path
from typing import Any

from shared.utils import files_locator as fl


WAN_GP_SETTINGS_SUFFIXES = {".json", ".zip"}
SETTINGS_BUNDLE_ATTACHMENT_KEYS = ("image_start", "image_end", "image_refs", "image_guide", "image_mask", "video_guide", "video_mask", "video_source", "audio_guide", "audio_guide2", "audio_source", "seedvc_voice_sample", "seedvc_voice_sample2", "custom_guide")


def is_wangp_settings_filename(value: Any) -> bool:
    return Path(str(value or "").strip()).suffix.lower() in WAN_GP_SETTINGS_SUFFIXES


def _cache_root() -> Path:
    return Path(__file__).resolve().parents[2] / "settings" / "_settings_bundle_cache"


def _safe_zip_name(name: Any) -> str:
    text = str(name or "").strip().replace("\\", "/")
    if not fl.is_relative_down_path(text):
        return ""
    return text


def _extract_bundle_file(zf: zipfile.ZipFile, member_name: str, cache_dir: Path) -> str:
    safe_name = _safe_zip_name(member_name)
    if len(safe_name) == 0 or safe_name not in zf.namelist():
        return member_name
    target_name = re.sub(r"[^A-Za-z0-9._-]+", "_", safe_name)
    target_path = cache_dir / target_name
    target_path.parent.mkdir(parents=True, exist_ok=True)
    with zf.open(safe_name) as source, target_path.open("wb") as target:
        target.write(source.read())
    return str(target_path.resolve())


def _extract_attachment_value(zf: zipfile.ZipFile, value: Any, cache_dir: Path) -> Any:
    if isinstance(value, str):
        if Path(value).is_absolute():
            return value
        return _extract_bundle_file(zf, value, cache_dir)
    if isinstance(value, list):
        return [_extract_attachment_value(zf, item, cache_dir) for item in value]
    return value


def load_first_settings_from_queue_zip(zip_path: str | Path, attachment_keys: list[str] | tuple[str, ...]) -> tuple[dict[str, Any] | None, int]:
    source_path = Path(zip_path).resolve()
    stat = source_path.stat()
    cache_dir = _cache_root() / f"{source_path.stem}_{int(getattr(stat, 'st_mtime_ns', int(stat.st_mtime * 1_000_000_000)))}"
    with zipfile.ZipFile(source_path, "r") as zf:
        if "queue.json" not in zf.namelist():
            return None, 0
        manifest = json.loads(zf.read("queue.json").decode("utf-8"))
        if not isinstance(manifest, list) or len(manifest) == 0:
            return None, 0
        task = manifest[0]
        if not isinstance(task, dict):
            return None, len(manifest)
        params = task.get("params", task)
        if not isinstance(params, dict):
            return None, len(manifest)
        payload = dict(params)
        cache_dir.mkdir(parents=True, exist_ok=True)
        for key in attachment_keys:
            if key in payload:
                payload[key] = _extract_attachment_value(zf, payload[key], cache_dir)
        return payload, len(manifest)