| | import os
|
| | import re
|
| | import zipfile
|
| | import shutil
|
| | import tempfile
|
| | from urllib.request import Request, urlopen
|
| | from urllib.error import HTTPError, URLError
|
| |
|
| | import numpy as np
|
| | import torch
|
| | from PIL import Image
|
| |
|
| | try:
|
| | import folder_paths
|
| | except Exception:
|
| | folder_paths = None
|
| |
|
| |
|
| | def _get_cache_dir() -> str:
|
| | base_dir = None
|
| | if folder_paths is not None:
|
| | try:
|
| | base_dir = folder_paths.get_temp_directory()
|
| | except Exception:
|
| | base_dir = None
|
| |
|
| | if not base_dir:
|
| | base_dir = tempfile.gettempdir()
|
| |
|
| | cache_dir = os.path.join(base_dir, "hf_zip_cache")
|
| | os.makedirs(cache_dir, exist_ok=True)
|
| | return cache_dir
|
| |
|
| |
|
| | def _download_file(url: str, dest_path: str, timeout_sec: int = 60) -> None:
|
| | req = Request(url, headers={"User-Agent": "ComfyUI-HFZipLoader/1.0"})
|
| | try:
|
| | with urlopen(req, timeout=timeout_sec) as resp, open(dest_path, "wb") as out_f:
|
| | shutil.copyfileobj(resp, out_f)
|
| | except HTTPError as e:
|
| | raise ValueError(f"HTTP error while downloading: {url} (status={e.code})") from e
|
| | except URLError as e:
|
| | raise ValueError(f"Network error while downloading: {url} ({e.reason})") from e
|
| | except Exception as e:
|
| | raise ValueError(f"Unexpected error while downloading: {url} ({e})") from e
|
| |
|
| |
|
| | def _pil_to_tensor_rgb(pil_img: Image.Image) -> torch.Tensor:
|
| | """
|
| | Convert PIL image to ComfyUI IMAGE tensor: [H,W,3] float32 in [0..1].
|
| | """
|
| | if pil_img.mode != "RGB":
|
| | pil_img = pil_img.convert("RGB")
|
| |
|
| | arr = np.asarray(pil_img, dtype=np.float32) / 255.0
|
| | return torch.from_numpy(arr)
|
| |
|
| |
|
| | class _ImageSizeMismatchError(ValueError):
|
| | """Raised when images in the zip do not share the same dimensions."""
|
| |
|
| |
|
| | def _alphanum_key(s: str):
|
| | """
|
| | Natural/alphanumeric sort key for filenames/paths.
|
| | Example: img_2.png comes before img_10.png.
|
| |
|
| | Sorts by the full zip member name (including folders), case-insensitive.
|
| | """
|
| | s = (s or "").replace("\\", "/")
|
| | parts = re.split(r"(\d+)", s)
|
| |
|
| |
|
| | key = []
|
| | for p in parts:
|
| | if p.isdigit():
|
| | key.append((0, int(p)))
|
| | else:
|
| | key.append((1, p.lower()))
|
| | return key
|
| |
|
| |
|
| | def _load_images_from_zip(zip_path: str) -> torch.Tensor:
|
| | """
|
| | Forgiving loader:
|
| | - Accepts all filenames (any depth) in a zip
|
| | - Sorts members in alphanumeric (natural) order
|
| | - Tries to open each file as an image; skips files that PIL cannot read
|
| | - Enforces that all loaded images share the same dimensions
|
| |
|
| | Returns:
|
| | [B,H,W,3] float32 in [0..1]
|
| | """
|
| | images = []
|
| | shapes = None
|
| | skipped = []
|
| |
|
| | with zipfile.ZipFile(zip_path, "r") as zf:
|
| | members = [name for name in zf.namelist() if name and not name.endswith("/")]
|
| |
|
| | if not members:
|
| | raise ValueError("ZIP is empty (no files found).")
|
| |
|
| | members.sort(key=_alphanum_key)
|
| |
|
| | for member_name in members:
|
| | try:
|
| | with zf.open(member_name) as fp:
|
| | with Image.open(fp) as im:
|
| |
|
| | im.load()
|
| | t = _pil_to_tensor_rgb(im)
|
| |
|
| | if shapes is None:
|
| | shapes = tuple(t.shape)
|
| | else:
|
| | if tuple(t.shape) != shapes:
|
| | raise _ImageSizeMismatchError(
|
| | f"Image size mismatch in ZIP. Expected {shapes}, got {tuple(t.shape)} "
|
| | f"for {member_name}. All images must share the same dimensions."
|
| | )
|
| |
|
| | images.append(t)
|
| |
|
| | except _ImageSizeMismatchError:
|
| |
|
| | raise
|
| | except Exception:
|
| |
|
| | skipped.append(member_name)
|
| | continue
|
| |
|
| | if not images:
|
| | raise ValueError(
|
| | "No loadable images found in ZIP. Ensure the archive contains valid image files "
|
| | "(png/jpg/webp/etc.)."
|
| | )
|
| |
|
| | if skipped:
|
| | print(f"[HFLoadZipImageBatch] Skipped {len(skipped)} non-image/unreadable file(s) in ZIP.")
|
| |
|
| | return torch.stack(images, dim=0)
|
| |
|
| |
|
| | class HF_to_Batch:
|
| | """
|
| | Download public ZIP from Hugging Face resolve URL and output IMAGE batch.
|
| |
|
| | URL format:
|
| | https://huggingface.co/{owner}/{repo}/resolve/{revision}/{index}.zip
|
| |
|
| | Example:
|
| | owner=saliacoel, repo=pov_fs, revision=main, index=0
|
| | -> https://huggingface.co/saliacoel/pov_fs/resolve/main/0.zip
|
| | """
|
| |
|
| | CATEGORY = "HuggingFace"
|
| | RETURN_TYPES = ("IMAGE", "STRING", "INT", "STRING")
|
| | RETURN_NAMES = ("images", "source_url", "count", "local_zip_path")
|
| | FUNCTION = "load"
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "repo": ("STRING", {"default": "pov_fs", "multiline": False}),
|
| | "index": ("INT", {"default": 0, "min": 0, "max": 1000000, "step": 1}),
|
| | },
|
| | "optional": {
|
| | "owner": ("STRING", {"default": "saliacoel", "multiline": False}),
|
| | "revision": ("STRING", {"default": "main", "multiline": False}),
|
| | "force_redownload": ("BOOLEAN", {"default": False}),
|
| | },
|
| | }
|
| |
|
| | def load(
|
| | self,
|
| | repo: str,
|
| | index: int,
|
| | owner: str = "saliacoel",
|
| | revision: str = "main",
|
| | force_redownload: bool = False,
|
| | ):
|
| | repo = (repo or "").strip()
|
| | owner = (owner or "").strip()
|
| | revision = (revision or "").strip()
|
| |
|
| | if not repo:
|
| | raise ValueError("repo must be a non-empty string (e.g., 'pov_fs' or 'car').")
|
| | if not owner:
|
| | raise ValueError("owner must be a non-empty string (e.g., 'saliacoel').")
|
| | if index is None or int(index) < 0:
|
| | raise ValueError("index must be an integer >= 0.")
|
| |
|
| | index = int(index)
|
| |
|
| | source_url = f"https://huggingface.co/{owner}/{repo}/resolve/{revision}/{index}.zip"
|
| |
|
| | cache_dir = _get_cache_dir()
|
| | local_zip_path = os.path.join(cache_dir, f"{owner}__{repo}__{revision}__{index}.zip")
|
| |
|
| | if (
|
| | force_redownload
|
| | or (not os.path.exists(local_zip_path))
|
| | or (os.path.getsize(local_zip_path) == 0)
|
| | ):
|
| | _download_file(source_url, local_zip_path)
|
| |
|
| | images = _load_images_from_zip(local_zip_path)
|
| | count = int(images.shape[0])
|
| |
|
| | print(f"[HFLoadZipImageBatch] Loaded {count} image(s) from {source_url}")
|
| | return (images, source_url, count, local_zip_path)
|
| |
|
| |
|
| | NODE_CLASS_MAPPINGS = {
|
| | "HF_to_Batch": HF_to_Batch,
|
| | }
|
| |
|
| | NODE_DISPLAY_NAME_MAPPINGS = {
|
| | "HF_to_Batch": "HF_to_Batch",
|
| | }
|
| |
|