|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os
|
| import time
|
| import zipfile
|
| import tempfile
|
|
|
| import folder_paths
|
|
|
| try:
|
| from huggingface_hub import HfApi
|
| except ImportError:
|
| HfApi = None
|
|
|
|
|
| class ZipOutputToHuggingFace:
|
| """
|
| Zip the ComfyUI output folder and upload the archive to Hugging Face.
|
| """
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
| return {
|
| "required": {
|
|
|
| "hf_repo_id": (
|
| "STRING",
|
| {"default": "saliacoel/MyCustomNodes"},
|
| ),
|
| },
|
| "optional": {
|
|
|
| "hf_token": (
|
| "STRING",
|
| {
|
| "default": "",
|
| "multiline": False,
|
| },
|
| ),
|
|
|
| "zip_name_prefix": (
|
| "STRING",
|
| {
|
| "default": "comfy_output",
|
| "multiline": False,
|
| },
|
| ),
|
|
|
| "remote_dir": (
|
| "STRING",
|
| {
|
| "default": "exports",
|
| "multiline": False,
|
| },
|
| ),
|
| },
|
| }
|
|
|
| RETURN_TYPES = ("STRING",)
|
| RETURN_NAMES = ("download_url",)
|
| FUNCTION = "zip_and_upload"
|
| CATEGORY = "utils/huggingface"
|
| OUTPUT_NODE = True
|
|
|
| def zip_and_upload(
|
| self,
|
| hf_repo_id: str,
|
| hf_token: str = "",
|
| zip_name_prefix: str = "comfy_output",
|
| remote_dir: str = "exports",
|
| ):
|
|
|
| if HfApi is None:
|
| raise RuntimeError(
|
| "huggingface_hub is not installed.\n"
|
| "Install it in your ComfyUI environment, for example:\n"
|
| " pip install huggingface_hub"
|
| )
|
|
|
|
|
| output_dir = folder_paths.get_output_directory()
|
| if not os.path.isdir(output_dir):
|
| raise RuntimeError(f"Output directory does not exist: {output_dir}")
|
|
|
|
|
| has_files = False
|
| for _, _, files in os.walk(output_dir):
|
| if files:
|
| has_files = True
|
| break
|
| if not has_files:
|
| raise RuntimeError(
|
| f"No files found in output directory: {output_dir}"
|
| )
|
|
|
|
|
| timestamp = time.strftime("%Y%m%d-%H%M%S")
|
| safe_prefix = (zip_name_prefix or "").strip() or "comfy_output"
|
| zip_filename = f"{safe_prefix}_{timestamp}.zip"
|
|
|
| tmp_dir = tempfile.gettempdir()
|
| zip_path = os.path.join(tmp_dir, zip_filename)
|
| zip_abs = os.path.abspath(zip_path)
|
|
|
|
|
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
| for root, _, files in os.walk(output_dir):
|
| for fname in files:
|
| file_path = os.path.join(root, fname)
|
|
|
| if os.path.abspath(file_path) == zip_abs:
|
| continue
|
| rel_path = os.path.relpath(file_path, output_dir)
|
| zipf.write(file_path, arcname=rel_path)
|
|
|
|
|
| repo_id = (hf_repo_id or "").strip()
|
| if not repo_id:
|
|
|
| try:
|
| os.remove(zip_path)
|
| except OSError:
|
| pass
|
| raise RuntimeError("hf_repo_id cannot be empty")
|
|
|
| remote_dir = (remote_dir or "").strip().strip("/")
|
| if remote_dir:
|
| path_in_repo = f"{remote_dir}/{zip_filename}"
|
| else:
|
| path_in_repo = zip_filename
|
|
|
| api = HfApi()
|
|
|
|
|
| try:
|
| api.upload_file(
|
| path_or_fileobj=zip_path,
|
| path_in_repo=path_in_repo,
|
| repo_id=repo_id,
|
|
|
| token=hf_token or None,
|
| )
|
| except Exception as e:
|
|
|
| try:
|
| os.remove(zip_path)
|
| except OSError:
|
| pass
|
| raise RuntimeError(f"Failed to upload to Hugging Face: {e}")
|
|
|
|
|
|
|
| download_url = f"https://huggingface.co/{repo_id}/resolve/main/{path_in_repo}"
|
|
|
|
|
| try:
|
| os.remove(zip_path)
|
| except OSError:
|
| pass
|
|
|
|
|
| return (download_url,)
|
|
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "ZipOutputToHuggingFace": ZipOutputToHuggingFace,
|
| }
|
|
|
| NODE_DISPLAY_NAME_MAPPINGS = {
|
| "ZipOutputToHuggingFace": "Zip Output → Hugging Face",
|
| }
|
|
|