Metaphysix2's picture
Upload folder using huggingface_hub
3e5f61c
import hashlib
import os
from pathlib import Path
from typing import Callable
from time import time
import numpy as np
import folder_paths
from comfy.model_base import SD21UNCLIP, SDXL, BaseModel, SDXLRefiner, SVD_img2vid, model_sampling
from comfy.model_management import xformers_enabled
from comfy.model_patcher import ModelPatcher
class IsChangedHelper:
def __init__(self):
self.val = 0
def no_change(self):
return self.val
def change(self):
self.val = (self.val + 1) % 100
class ModelSamplingConfig:
def __init__(self, beta_schedule: str):
self.sampling_settings = {"beta_schedule": beta_schedule}
self.beta_schedule = beta_schedule # keeping this for backwards compatibility
class BetaSchedules:
SQRT_LINEAR = "sqrt_linear (AnimateDiff)"
LINEAR_ADXL = "linear (AnimateDiff-SDXL)"
LINEAR = "linear (HotshotXL/default)"
USE_EXISTING = "use existing"
SQRT = "sqrt"
COSINE = "cosine"
SQUAREDCOS_CAP_V2 = "squaredcos_cap_v2"
ALIAS_LIST = [SQRT_LINEAR, LINEAR_ADXL, LINEAR, USE_EXISTING, SQRT, COSINE, SQUAREDCOS_CAP_V2]
ALIAS_MAP = {
SQRT_LINEAR: "sqrt_linear",
LINEAR_ADXL: "linear", # also linear, but has different linear_end (0.020)
LINEAR: "linear",
SQRT: "sqrt",
COSINE: "cosine",
SQUAREDCOS_CAP_V2: "squaredcos_cap_v2",
}
@classmethod
def to_name(cls, alias: str):
return cls.ALIAS_MAP[alias]
@classmethod
def to_config(cls, alias: str) -> ModelSamplingConfig:
return ModelSamplingConfig(cls.to_name(alias))
@classmethod
def to_model_sampling(cls, alias: str, model: ModelPatcher):
if alias == cls.USE_EXISTING:
return None
ms_obj = model_sampling(cls.to_config(alias), model_type=model.model.model_type)
if alias == cls.LINEAR_ADXL:
# uses linear_end=0.020
ms_obj._register_schedule(given_betas=None, beta_schedule=cls.to_name(alias), timesteps=1000, linear_start=0.00085, linear_end=0.020, cosine_s=8e-3)
return ms_obj
@staticmethod
def get_alias_list_with_first_element(first_element: str):
new_list = BetaSchedules.ALIAS_LIST.copy()
element_index = new_list.index(first_element)
new_list[0], new_list[element_index] = new_list[element_index], new_list[0]
return new_list
class BetaScheduleCache:
def __init__(self, model: ModelPatcher):
self.model_sampling = model.model.model_sampling
def use_cached_beta_schedule_and_clean(self, model: ModelPatcher):
model.model.model_sampling = self.model_sampling
self.clean()
def clean(self):
self.model_sampling = None
class Folders:
ANIMATEDIFF_MODELS = "AnimateDiffEvolved_Models"
MOTION_LORA = "AnimateDiffMotion_LoRA"
VIDEO_FORMATS = "AnimateDiffEvolved_video_formats"
# register motion models folder(s)
folder_paths.folder_names_and_paths[Folders.ANIMATEDIFF_MODELS] = (
[
str(Path(__file__).parent.parent / "models")
],
folder_paths.supported_pt_extensions
)
# register motion LoRA folder(s)
folder_paths.folder_names_and_paths[Folders.MOTION_LORA] = (
[
str(Path(__file__).parent.parent / "motion_lora")
],
folder_paths.supported_pt_extensions
)
#Register video_formats folder
folder_paths.folder_names_and_paths[Folders.VIDEO_FORMATS] = (
[
str(Path(__file__).parent.parent / "video_formats")
],
[".json"]
)
def get_available_motion_models():
return folder_paths.get_filename_list(Folders.ANIMATEDIFF_MODELS)
def get_motion_model_path(model_name: str):
return folder_paths.get_full_path(Folders.ANIMATEDIFF_MODELS, model_name)
def get_available_motion_loras():
return folder_paths.get_filename_list(Folders.MOTION_LORA)
def get_motion_lora_path(lora_name: str):
return folder_paths.get_full_path(Folders.MOTION_LORA, lora_name)
# modified from https://stackoverflow.com/questions/22058048/hashing-a-file-in-python
def calculate_file_hash(filename: str, hash_every_n: int = 50):
h = hashlib.sha256()
b = bytearray(1024*1024)
mv = memoryview(b)
with open(filename, 'rb', buffering=0) as f:
i = 0
# don't hash entire file, only portions of it
while n := f.readinto(mv):
if i%hash_every_n == 0:
h.update(mv[:n])
i += 1
return h.hexdigest()
def calculate_model_hash(model: ModelPatcher):
unet = model.model.diff
t = unet.input_blocks[1]
m = hashlib.sha256()
for buf in t.buffers():
m.update(buf.cpu().numpy().view(np.uint8))
return m.hexdigest()
class ModelTypeSD:
SD1_5 = "SD1.5"
SD2_1 = "SD2.1"
SDXL = "SDXL"
SDXL_REFINER = "SDXL_Refiner"
SVD = "SVD"
def get_sd_model_type(model: ModelPatcher) -> str:
if model is None:
return None
elif type(model.model) == BaseModel:
return ModelTypeSD.SD1_5
elif type(model.model) == SDXL:
return ModelTypeSD.SDXL
elif type(model.model) == SD21UNCLIP:
return ModelTypeSD.SD2_1
elif type(model.model) == SDXLRefiner:
return ModelTypeSD.SDXL_REFINER
elif type(model.model) == SVD_img2vid:
return ModelTypeSD.SVD
else:
return str(type(model.model).__name__)
def is_checkpoint_sd1_5(model: ModelPatcher):
return False if model is None else type(model.model) == BaseModel
def is_checkpoint_sdxl(model: ModelPatcher):
return False if model is None else type(model.model) == SDXL
def raise_if_not_checkpoint_sd1_5(model: ModelPatcher):
if not is_checkpoint_sd1_5(model):
raise ValueError(f"For AnimateDiff, SD Checkpoint (model) is expected to be SD1.5-based (BaseModel), but was: {type(model.model).__name__}")
# TODO: remove this filth when xformers bug gets fixed in future xformers version
def wrap_function_to_inject_xformers_bug_info(function_to_wrap: Callable) -> Callable:
if not xformers_enabled:
return function_to_wrap
else:
def wrapped_function(*args, **kwargs):
try:
return function_to_wrap(*args, **kwargs)
except RuntimeError as e:
if str(e).startswith("CUDA error: invalid configuration argument"):
raise RuntimeError(f"An xformers bug was encountered in AnimateDiff - this is unexpected, \
report this to Kosinkadink/ComfyUI-AnimateDiff-Evolved repo as an issue, \
and a workaround for now is to run ComfyUI with the --disable-xformers argument.")
raise
return wrapped_function
class Timer(object):
__slots__ = ("start_time", "end_time")
def __init__(self) -> None:
self.start_time = 0.0
self.end_time = 0.0
def start(self) -> None:
self.start_time = time()
def update(self) -> None:
self.start()
def stop(self) -> float:
self.end_time = time()
return self.get_time_diff()
def get_time_diff(self) -> float:
return self.end_time - self.start_time
def get_time_current(self) -> float:
return time() - self.start_time
# TODO: possibly add configuration file in future when needed?
# # Load config settings
# ADE_DIR = Path(__file__).parent.parent
# ADE_CONFIG_FILE = ADE_DIR / "ade_config.json"
# class ADE_Settings:
# USE_XFORMERS_IN_VERSATILE_ATTENTION = "use_xformers_in_VersatileAttention"
# # Create ADE config if not present
# ABS_CONFIG = {
# ADE_Settings.USE_XFORMERS_IN_VERSATILE_ATTENTION: True
# }
# if not ADE_CONFIG_FILE.exists():
# with ADE_CONFIG_FILE.open("w") as f:
# json.dumps(ABS_CONFIG, indent=4)
# # otherwise, load it and use values
# else:
# loaded_values: dict = None
# with ADE_CONFIG_FILE.open("r") as f:
# loaded_values = json.load(f)
# if loaded_values is not None:
# for key, value in loaded_values.items():
# if key in ABS_CONFIG:
# ABS_CONFIG[key] = value