| import hashlib |
| import os |
| from pathlib import Path |
| from typing import Callable |
| from time import time |
|
|
| import numpy as np |
|
|
| import folder_paths |
| from comfy.model_base import SD21UNCLIP, SDXL, BaseModel, SDXLRefiner, SVD_img2vid, model_sampling |
| from comfy.model_management import xformers_enabled |
| from comfy.model_patcher import ModelPatcher |
|
|
|
|
| class IsChangedHelper: |
| def __init__(self): |
| self.val = 0 |
| |
| def no_change(self): |
| return self.val |
| |
| def change(self): |
| self.val = (self.val + 1) % 100 |
|
|
|
|
| class ModelSamplingConfig: |
| def __init__(self, beta_schedule: str): |
| self.sampling_settings = {"beta_schedule": beta_schedule} |
| self.beta_schedule = beta_schedule |
|
|
|
|
| class BetaSchedules: |
| SQRT_LINEAR = "sqrt_linear (AnimateDiff)" |
| LINEAR_ADXL = "linear (AnimateDiff-SDXL)" |
| LINEAR = "linear (HotshotXL/default)" |
| USE_EXISTING = "use existing" |
| SQRT = "sqrt" |
| COSINE = "cosine" |
| SQUAREDCOS_CAP_V2 = "squaredcos_cap_v2" |
|
|
| ALIAS_LIST = [SQRT_LINEAR, LINEAR_ADXL, LINEAR, USE_EXISTING, SQRT, COSINE, SQUAREDCOS_CAP_V2] |
|
|
| ALIAS_MAP = { |
| SQRT_LINEAR: "sqrt_linear", |
| LINEAR_ADXL: "linear", |
| LINEAR: "linear", |
| SQRT: "sqrt", |
| COSINE: "cosine", |
| SQUAREDCOS_CAP_V2: "squaredcos_cap_v2", |
| } |
|
|
| @classmethod |
| def to_name(cls, alias: str): |
| return cls.ALIAS_MAP[alias] |
| |
| @classmethod |
| def to_config(cls, alias: str) -> ModelSamplingConfig: |
| return ModelSamplingConfig(cls.to_name(alias)) |
| |
| @classmethod |
| def to_model_sampling(cls, alias: str, model: ModelPatcher): |
| if alias == cls.USE_EXISTING: |
| return None |
| ms_obj = model_sampling(cls.to_config(alias), model_type=model.model.model_type) |
| if alias == cls.LINEAR_ADXL: |
| |
| ms_obj._register_schedule(given_betas=None, beta_schedule=cls.to_name(alias), timesteps=1000, linear_start=0.00085, linear_end=0.020, cosine_s=8e-3) |
| return ms_obj |
|
|
| @staticmethod |
| def get_alias_list_with_first_element(first_element: str): |
| new_list = BetaSchedules.ALIAS_LIST.copy() |
| element_index = new_list.index(first_element) |
| new_list[0], new_list[element_index] = new_list[element_index], new_list[0] |
| return new_list |
|
|
|
|
| class BetaScheduleCache: |
| def __init__(self, model: ModelPatcher): |
| self.model_sampling = model.model.model_sampling |
|
|
| def use_cached_beta_schedule_and_clean(self, model: ModelPatcher): |
| model.model.model_sampling = self.model_sampling |
| self.clean() |
|
|
| def clean(self): |
| self.model_sampling = None |
|
|
|
|
| class Folders: |
| ANIMATEDIFF_MODELS = "AnimateDiffEvolved_Models" |
| MOTION_LORA = "AnimateDiffMotion_LoRA" |
| VIDEO_FORMATS = "AnimateDiffEvolved_video_formats" |
|
|
|
|
| |
| folder_paths.folder_names_and_paths[Folders.ANIMATEDIFF_MODELS] = ( |
| [ |
| str(Path(__file__).parent.parent / "models") |
| ], |
| folder_paths.supported_pt_extensions |
| ) |
|
|
| |
| folder_paths.folder_names_and_paths[Folders.MOTION_LORA] = ( |
| [ |
| str(Path(__file__).parent.parent / "motion_lora") |
| ], |
| folder_paths.supported_pt_extensions |
| ) |
|
|
|
|
| |
| folder_paths.folder_names_and_paths[Folders.VIDEO_FORMATS] = ( |
| [ |
| str(Path(__file__).parent.parent / "video_formats") |
| ], |
| [".json"] |
| ) |
|
|
|
|
| def get_available_motion_models(): |
| return folder_paths.get_filename_list(Folders.ANIMATEDIFF_MODELS) |
|
|
|
|
| def get_motion_model_path(model_name: str): |
| return folder_paths.get_full_path(Folders.ANIMATEDIFF_MODELS, model_name) |
|
|
|
|
| def get_available_motion_loras(): |
| return folder_paths.get_filename_list(Folders.MOTION_LORA) |
|
|
|
|
| def get_motion_lora_path(lora_name: str): |
| return folder_paths.get_full_path(Folders.MOTION_LORA, lora_name) |
|
|
|
|
| |
| def calculate_file_hash(filename: str, hash_every_n: int = 50): |
| h = hashlib.sha256() |
| b = bytearray(1024*1024) |
| mv = memoryview(b) |
| with open(filename, 'rb', buffering=0) as f: |
| i = 0 |
| |
| while n := f.readinto(mv): |
| if i%hash_every_n == 0: |
| h.update(mv[:n]) |
| i += 1 |
| return h.hexdigest() |
|
|
|
|
| def calculate_model_hash(model: ModelPatcher): |
| unet = model.model.diff |
| t = unet.input_blocks[1] |
| m = hashlib.sha256() |
| for buf in t.buffers(): |
| m.update(buf.cpu().numpy().view(np.uint8)) |
| return m.hexdigest() |
|
|
|
|
| class ModelTypeSD: |
| SD1_5 = "SD1.5" |
| SD2_1 = "SD2.1" |
| SDXL = "SDXL" |
| SDXL_REFINER = "SDXL_Refiner" |
| SVD = "SVD" |
|
|
|
|
| def get_sd_model_type(model: ModelPatcher) -> str: |
| if model is None: |
| return None |
| elif type(model.model) == BaseModel: |
| return ModelTypeSD.SD1_5 |
| elif type(model.model) == SDXL: |
| return ModelTypeSD.SDXL |
| elif type(model.model) == SD21UNCLIP: |
| return ModelTypeSD.SD2_1 |
| elif type(model.model) == SDXLRefiner: |
| return ModelTypeSD.SDXL_REFINER |
| elif type(model.model) == SVD_img2vid: |
| return ModelTypeSD.SVD |
| else: |
| return str(type(model.model).__name__) |
|
|
| def is_checkpoint_sd1_5(model: ModelPatcher): |
| return False if model is None else type(model.model) == BaseModel |
|
|
| def is_checkpoint_sdxl(model: ModelPatcher): |
| return False if model is None else type(model.model) == SDXL |
|
|
|
|
| def raise_if_not_checkpoint_sd1_5(model: ModelPatcher): |
| if not is_checkpoint_sd1_5(model): |
| raise ValueError(f"For AnimateDiff, SD Checkpoint (model) is expected to be SD1.5-based (BaseModel), but was: {type(model.model).__name__}") |
|
|
|
|
| |
| def wrap_function_to_inject_xformers_bug_info(function_to_wrap: Callable) -> Callable: |
| if not xformers_enabled: |
| return function_to_wrap |
| else: |
| def wrapped_function(*args, **kwargs): |
| try: |
| return function_to_wrap(*args, **kwargs) |
| except RuntimeError as e: |
| if str(e).startswith("CUDA error: invalid configuration argument"): |
| raise RuntimeError(f"An xformers bug was encountered in AnimateDiff - this is unexpected, \ |
| report this to Kosinkadink/ComfyUI-AnimateDiff-Evolved repo as an issue, \ |
| and a workaround for now is to run ComfyUI with the --disable-xformers argument.") |
| raise |
| return wrapped_function |
|
|
|
|
| class Timer(object): |
| __slots__ = ("start_time", "end_time") |
|
|
| def __init__(self) -> None: |
| self.start_time = 0.0 |
| self.end_time = 0.0 |
|
|
| def start(self) -> None: |
| self.start_time = time() |
|
|
| def update(self) -> None: |
| self.start() |
|
|
| def stop(self) -> float: |
| self.end_time = time() |
| return self.get_time_diff() |
|
|
| def get_time_diff(self) -> float: |
| return self.end_time - self.start_time |
|
|
| def get_time_current(self) -> float: |
| return time() - self.start_time |
|
|
|
|
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|