saik0s's picture
Add files using upload-large-folder tool
f1e06e5 verified
Raw
History Blame Contribute Delete
3.23 kB
import asyncio
import contextlib
import os
import re
import time
from collections.abc import Callable
from io import BytesIO
from yarl import URL
from comfy.cli_args import args
from comfy.model_management import processing_interrupted
from comfy_api.latest import IO
from .common_exceptions import ProcessingInterrupted
_HAS_PCT_ESC = re.compile(r"%[0-9A-Fa-f]{2}") # any % followed by 2 hex digits
_HAS_BAD_PCT = re.compile(r"%(?![0-9A-Fa-f]{2})") # any % not followed by 2 hex digits
def is_processing_interrupted() -> bool:
"""Return True if user/runtime requested interruption."""
return processing_interrupted()
def get_node_id(node_cls: type[IO.ComfyNode]) -> str:
return node_cls.hidden.unique_id
def get_auth_header(node_cls: type[IO.ComfyNode]) -> dict[str, str]:
if node_cls.hidden.auth_token_comfy_org:
return {"Authorization": f"Bearer {node_cls.hidden.auth_token_comfy_org}"}
if node_cls.hidden.api_key_comfy_org:
return {"X-API-KEY": node_cls.hidden.api_key_comfy_org}
return {}
def default_base_url() -> str:
return getattr(args, "comfy_api_base", "https://api.comfy.org")
async def sleep_with_interrupt(
seconds: float,
node_cls: type[IO.ComfyNode] | None,
label: str | None = None,
start_ts: float | None = None,
estimated_total: int | None = None,
*,
display_callback: Callable[[type[IO.ComfyNode], str, int, int | None], None] | None = None,
):
"""
Sleep in 1s slices while:
- Checking for interruption (raises ProcessingInterrupted).
- Optionally emitting time progress via display_callback (if provided).
"""
end = time.monotonic() + seconds
while True:
if is_processing_interrupted():
raise ProcessingInterrupted("Task cancelled")
now = time.monotonic()
if start_ts is not None and label and display_callback:
with contextlib.suppress(Exception):
display_callback(node_cls, label, int(now - start_ts), estimated_total)
if now >= end:
break
await asyncio.sleep(min(1.0, end - now))
def mimetype_to_extension(mime_type: str) -> str:
"""Converts a MIME type to a file extension."""
return mime_type.split("/")[-1].lower()
def get_fs_object_size(path_or_object: str | BytesIO) -> int:
if isinstance(path_or_object, str):
return os.path.getsize(path_or_object)
return len(path_or_object.getvalue())
def to_aiohttp_url(url: str) -> URL:
"""If `url` appears to be already percent-encoded (contains at least one valid %HH
escape and no malformed '%' sequences) and contains no raw whitespace/control
characters preserve the original encoding byte-for-byte (important for signed/presigned URLs).
Otherwise, return `URL(url)` and allow yarl to normalize/quote as needed."""
if any(c.isspace() for c in url) or any(ord(c) < 0x20 for c in url):
# Avoid encoded=True if URL contains raw whitespace/control chars
return URL(url)
if _HAS_PCT_ESC.search(url) and not _HAS_BAD_PCT.search(url):
# Preserve encoding only if it appears pre-encoded AND has no invalid % sequences
return URL(url, encoded=True)
return URL(url)