| | |
| |
|
| | from typing_extensions import override |
| |
|
| | import torch |
| | from comfy_api.latest import ComfyExtension, io |
| |
|
| |
|
| | class DifferentialDiffusion(io.ComfyNode): |
| | @classmethod |
| | def define_schema(cls): |
| | return io.Schema( |
| | node_id="DifferentialDiffusion", |
| | display_name="Differential Diffusion", |
| | category="_for_testing", |
| | inputs=[ |
| | io.Model.Input("model"), |
| | io.Float.Input( |
| | "strength", |
| | default=1.0, |
| | min=0.0, |
| | max=1.0, |
| | step=0.01, |
| | optional=True, |
| | ), |
| | ], |
| | outputs=[io.Model.Output()], |
| | is_experimental=True, |
| | ) |
| |
|
| | @classmethod |
| | def execute(cls, model, strength=1.0) -> io.NodeOutput: |
| | model = model.clone() |
| | model.set_model_denoise_mask_function(lambda *args, **kwargs: cls.forward(*args, **kwargs, strength=strength)) |
| | return io.NodeOutput(model) |
| |
|
| | @classmethod |
| | def forward(cls, sigma: torch.Tensor, denoise_mask: torch.Tensor, extra_options: dict, strength: float): |
| | model = extra_options["model"] |
| | step_sigmas = extra_options["sigmas"] |
| | sigma_to = model.inner_model.model_sampling.sigma_min |
| | if step_sigmas[-1] > sigma_to: |
| | sigma_to = step_sigmas[-1] |
| | sigma_from = step_sigmas[0] |
| |
|
| | ts_from = model.inner_model.model_sampling.timestep(sigma_from) |
| | ts_to = model.inner_model.model_sampling.timestep(sigma_to) |
| | current_ts = model.inner_model.model_sampling.timestep(sigma[0]) |
| |
|
| | threshold = (current_ts - ts_to) / (ts_from - ts_to) |
| |
|
| | |
| | binary_mask = (denoise_mask >= threshold).to(denoise_mask.dtype) |
| |
|
| | |
| | if strength and strength < 1: |
| | blended_mask = strength * binary_mask + (1 - strength) * denoise_mask |
| | return blended_mask |
| | else: |
| | return binary_mask |
| |
|
| |
|
| | class DifferentialDiffusionExtension(ComfyExtension): |
| | @override |
| | async def get_node_list(self) -> list[type[io.ComfyNode]]: |
| | return [ |
| | DifferentialDiffusion, |
| | ] |
| |
|
| |
|
| | async def comfy_entrypoint() -> DifferentialDiffusionExtension: |
| | return DifferentialDiffusionExtension() |
| |
|