Eji-Sensei14's picture
Upload folder using huggingface_hub
c6535db verified
import torch
import numpy as np
import comfy.utils, comfy.sample, comfy.samplers, comfy.controlnet, comfy.model_base, comfy.model_management, comfy.sampler_helpers, comfy.supported_models
from comfy.model_patcher import ModelPatcher
from nodes import RepeatLatentBatch, CLIPTextEncode, VAEEncodeForInpaint
from ..modules.layer_diffuse import LayerMethod
from ..config import *
from .. import easyCache, sampler
# 预采样设置(基础)
class samplerSettings:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{"pipe": ("PIPE_LINE",),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS + NEW_SCHEDULERS,),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"optional": {
"image_to_latent": ("IMAGE",),
"latent": ("LATENT",),
},
"hidden":
{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE", )
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def settings(self, pipe, steps, cfg, sampler_name, scheduler, denoise, seed, image_to_latent=None, latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
# 图生图转换
vae = pipe["vae"]
batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1
if image_to_latent is not None:
_, height, width, _ = image_to_latent.shape
if height == 1 and width == 1:
samples = pipe["samples"]
images = pipe["images"]
else:
samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])}
samples = RepeatLatentBatch().repeat(samples, batch_size)[0]
images = image_to_latent
elif latent is not None:
samples = latent
images = pipe["images"]
else:
samples = pipe["samples"]
images = pipe["images"]
new_pipe = {
"model": pipe['model'],
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": samples,
"images": images,
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"steps": steps,
"cfg": cfg,
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": denoise,
"add_noise": "enabled"
}
}
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# 预采样设置(高级)
class samplerSettingsAdvanced:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{"pipe": ("PIPE_LINE",),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS + NEW_SCHEDULERS,),
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
"add_noise": (["enable (CPU)", "enable (GPU=A1111)", "disable"], {"default": "enable (CPU)"}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
"return_with_leftover_noise": (["disable", "enable"], ),
},
"optional": {
"image_to_latent": ("IMAGE",),
"latent": ("LATENT",)
},
"hidden":
{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE", )
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def settings(self, pipe, steps, cfg, sampler_name, scheduler, start_at_step, end_at_step, add_noise, seed, return_with_leftover_noise, image_to_latent=None, latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
# 图生图转换
vae = pipe["vae"]
batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1
if image_to_latent is not None:
_, height, width, _ = image_to_latent.shape
if height == 1 and width == 1:
samples = pipe["samples"]
images = pipe["images"]
else:
samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])}
samples = RepeatLatentBatch().repeat(samples, batch_size)[0]
images = image_to_latent
elif latent is not None:
samples = latent
images = pipe["images"]
else:
samples = pipe["samples"]
images = pipe["images"]
force_full_denoise = True
if return_with_leftover_noise == "enable":
force_full_denoise = False
new_pipe = {
"model": pipe['model'],
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": samples,
"images": images,
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"steps": steps,
"cfg": cfg,
"sampler_name": sampler_name,
"scheduler": scheduler,
"start_step": start_at_step,
"last_step": end_at_step,
"denoise": 1.0,
"add_noise": add_noise,
"force_full_denoise": force_full_denoise
}
}
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# 预采样设置(噪声注入)
class samplerSettingsNoiseIn:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{"pipe": ("PIPE_LINE",),
"factor": ("FLOAT", {"default": 0.1, "min": 0.0, "max": 1.0, "step":0.01, "round": 0.01}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS+NEW_SCHEDULERS,),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"optional": {
"optional_noise_seed": ("INT",{"forceInput": True}),
"optional_latent": ("LATENT",),
},
"hidden":
{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE", )
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def slerp(self, val, low, high):
dims = low.shape
low = low.reshape(dims[0], -1)
high = high.reshape(dims[0], -1)
low_norm = low / torch.norm(low, dim=1, keepdim=True)
high_norm = high / torch.norm(high, dim=1, keepdim=True)
low_norm[low_norm != low_norm] = 0.0
high_norm[high_norm != high_norm] = 0.0
omega = torch.acos((low_norm * high_norm).sum(1))
so = torch.sin(omega)
res = (torch.sin((1.0 - val) * omega) / so).unsqueeze(1) * low + (torch.sin(val * omega) / so).unsqueeze(
1) * high
return res.reshape(dims)
def prepare_mask(self, mask, shape):
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])),
size=(shape[2], shape[3]), mode="bilinear")
mask = mask.expand((-1, shape[1], -1, -1))
if mask.shape[0] < shape[0]:
mask = mask.repeat((shape[0] - 1) // mask.shape[0] + 1, 1, 1, 1)[:shape[0]]
return mask
def expand_mask(self, mask, expand, tapered_corners):
try:
import scipy
c = 0 if tapered_corners else 1
kernel = np.array([[c, 1, c],
[1, 1, 1],
[c, 1, c]])
mask = mask.reshape((-1, mask.shape[-2], mask.shape[-1]))
out = []
for m in mask:
output = m.numpy()
for _ in range(abs(expand)):
if expand < 0:
output = scipy.ndimage.grey_erosion(output, footprint=kernel)
else:
output = scipy.ndimage.grey_dilation(output, footprint=kernel)
output = torch.from_numpy(output)
out.append(output)
return torch.stack(out, dim=0)
except:
return None
def settings(self, pipe, factor, steps, cfg, sampler_name, scheduler, denoise, seed, optional_noise_seed=None, optional_latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
latent = optional_latent if optional_latent is not None else pipe["samples"]
model = pipe["model"]
# generate base noise
batch_size, _, height, width = latent["samples"].shape
generator = torch.manual_seed(seed)
base_noise = torch.randn((1, 4, height, width), dtype=torch.float32, device="cpu", generator=generator).repeat(batch_size, 1, 1, 1).cpu()
# generate variation noise
if optional_noise_seed is None or optional_noise_seed == seed:
optional_noise_seed = seed+1
generator = torch.manual_seed(optional_noise_seed)
variation_noise = torch.randn((batch_size, 4, height, width), dtype=torch.float32, device="cpu",
generator=generator).cpu()
slerp_noise = self.slerp(factor, base_noise, variation_noise)
end_at_step = steps # min(steps, end_at_step)
start_at_step = round(end_at_step - end_at_step * denoise)
device = comfy.model_management.get_torch_device()
comfy.model_management.load_model_gpu(model)
model_patcher = comfy.model_patcher.ModelPatcher(model.model, load_device=device, offload_device=comfy.model_management.unet_offload_device())
sampler = comfy.samplers.KSampler(model_patcher, steps=steps, device=device, sampler=sampler_name,
scheduler=scheduler, denoise=1.0, model_options=model.model_options)
sigmas = sampler.sigmas
sigma = sigmas[start_at_step] - sigmas[end_at_step]
sigma /= model.model.latent_format.scale_factor
sigma = sigma.cpu().numpy()
work_latent = latent.copy()
work_latent["samples"] = latent["samples"].clone() + slerp_noise * sigma
if "noise_mask" in latent:
noise_mask = self.prepare_mask(latent["noise_mask"], latent['samples'].shape)
work_latent["samples"] = noise_mask * work_latent["samples"] + (1-noise_mask) * latent["samples"]
work_latent['noise_mask'] = self.expand_mask(latent["noise_mask"].clone(), 5, True)
if pipe is None:
pipe = {}
new_pipe = {
"model": pipe['model'],
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": work_latent,
"images": pipe['images'],
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"steps": steps,
"cfg": cfg,
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": denoise,
"add_noise": "disable"
}
}
return (new_pipe,)
# 预采样设置(自定义)
class samplerCustomSettings:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"pipe": ("PIPE_LINE",),
"guider": (['CFG','DualCFG','Basic', 'IP2P+CFG', 'IP2P+DualCFG','IP2P+Basic'],{"default":"Basic"}),
"cfg": ("FLOAT", {"default": 3.5, "min": 0.0, "max": 100.0}),
"cfg_negative": ("FLOAT", {"default": 1.5, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS + ['inversed_euler'],),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS + ['karrasADV','exponentialADV','polyExponential', 'sdturbo', 'vp', 'alignYourSteps', 'gits'],),
"coeff": ("FLOAT", {"default": 1.20, "min": 0.80, "max": 1.50, "step": 0.05}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"sigma_max": ("FLOAT", {"default": 14.614642, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}),
"sigma_min": ("FLOAT", {"default": 0.0291675, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}),
"rho": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "step": 0.01, "round": False}),
"beta_d": ("FLOAT", {"default": 19.9, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}),
"beta_min": ("FLOAT", {"default": 0.1, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}),
"eps_s": ("FLOAT", {"default": 0.001, "min": 0.0, "max": 1.0, "step": 0.0001, "round": False}),
"flip_sigmas": ("BOOLEAN", {"default": False}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"add_noise": (["enable (CPU)", "enable (GPU=A1111)", "disable"], {"default": "enable (CPU)"}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"optional": {
"image_to_latent": ("IMAGE",),
"latent": ("LATENT",),
"optional_sampler":("SAMPLER",),
"optional_sigmas":("SIGMAS",),
},
"hidden":
{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE", )
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def ip2p(self, positive, negative, vae, pixels, latent=None):
if latent is not None:
concat_latent = latent
else:
x = (pixels.shape[1] // 8) * 8
y = (pixels.shape[2] // 8) * 8
if pixels.shape[1] != x or pixels.shape[2] != y:
x_offset = (pixels.shape[1] % 8) // 2
y_offset = (pixels.shape[2] % 8) // 2
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
concat_latent = vae.encode(pixels)
out_latent = {}
out_latent["samples"] = torch.zeros_like(concat_latent)
out = []
for conditioning in [positive, negative]:
c = []
for t in conditioning:
d = t[1].copy()
d["concat_latent_image"] = concat_latent
n = [t[0], d]
c.append(n)
out.append(c)
return (out[0], out[1], out_latent)
def settings(self, pipe, guider, cfg, cfg_negative, sampler_name, scheduler, coeff, steps, sigma_max, sigma_min, rho, beta_d, beta_min, eps_s, flip_sigmas, denoise, add_noise, seed, image_to_latent=None, latent=None, optional_sampler=None, optional_sigmas=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
# 图生图转换
vae = pipe["vae"]
model = pipe["model"]
positive = pipe['positive']
negative = pipe['negative']
batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1
if image_to_latent is not None:
_, height, width, _ = image_to_latent.shape
if height == 1 and width == 1:
samples = pipe["samples"]
images = pipe["images"]
else:
if "IP2P" in guider:
positive, negative, latent = self.ip2p(pipe['positive'], pipe['negative'], vae, image_to_latent)
samples = latent
else:
samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])}
samples = RepeatLatentBatch().repeat(samples, batch_size)[0]
images = image_to_latent
elif latent is not None:
if "IP2P" in guider:
positive, negative, latent = self.ip2p(pipe['positive'], pipe['negative'], latent=latent)
samples = latent
else:
samples = latent
images = pipe["images"]
else:
samples = pipe["samples"]
images = pipe["images"]
new_pipe = {
"model": model,
"positive": positive,
"negative": negative,
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": samples,
"images": images,
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"middle": pipe['negative'],
"steps": steps,
"cfg": cfg,
"cfg_negative": cfg_negative,
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": denoise,
"add_noise": add_noise,
"custom": {
"guider": guider,
"coeff": coeff,
"sigma_max": sigma_max,
"sigma_min": sigma_min,
"rho": rho,
"beta_d": beta_d,
"beta_min": beta_min,
"eps_s": beta_min,
"flip_sigmas": flip_sigmas
},
"optional_sampler": optional_sampler,
"optional_sigmas": optional_sigmas
}
}
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# 预采样设置(SDTurbo)
from ..libs.gradual_latent_hires_fix import sample_dpmpp_2s_ancestral, sample_dpmpp_2m_sde, sample_lcm, sample_euler_ancestral
class sdTurboSettings:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"pipe": ("PIPE_LINE",),
"steps": ("INT", {"default": 1, "min": 1, "max": 10}),
"cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.SAMPLER_NAMES,),
"eta": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}),
"s_noise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}),
"upscale_ratio": ("FLOAT", {"default": 2.0, "min": 0.0, "max": 16.0, "step": 0.01, "round": False}),
"start_step": ("INT", {"default": 5, "min": 0, "max": 1000, "step": 1}),
"end_step": ("INT", {"default": 15, "min": 0, "max": 1000, "step": 1}),
"upscale_n_step": ("INT", {"default": 3, "min": 0, "max": 1000, "step": 1}),
"unsharp_kernel_size": ("INT", {"default": 3, "min": 1, "max": 21, "step": 1}),
"unsharp_sigma": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}),
"unsharp_strength": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE",)
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def settings(self, pipe, steps, cfg, sampler_name, eta, s_noise, upscale_ratio, start_step, end_step, upscale_n_step, unsharp_kernel_size, unsharp_sigma, unsharp_strength, seed, prompt=None, extra_pnginfo=None, my_unique_id=None):
model = pipe['model']
# sigma
timesteps = torch.flip(torch.arange(1, 11) * 100 - 1, (0,))[:steps]
sigmas = model.model.model_sampling.sigma(timesteps)
sigmas = torch.cat([sigmas, sigmas.new_zeros([1])])
#sampler
sample_function = None
extra_options = {
"eta": eta,
"s_noise": s_noise,
"upscale_ratio": upscale_ratio,
"start_step": start_step,
"end_step": end_step,
"upscale_n_step": upscale_n_step,
"unsharp_kernel_size": unsharp_kernel_size,
"unsharp_sigma": unsharp_sigma,
"unsharp_strength": unsharp_strength,
}
if sampler_name == "euler_ancestral":
sample_function = sample_euler_ancestral
elif sampler_name == "dpmpp_2s_ancestral":
sample_function = sample_dpmpp_2s_ancestral
elif sampler_name == "dpmpp_2m_sde":
sample_function = sample_dpmpp_2m_sde
elif sampler_name == "lcm":
sample_function = sample_lcm
if sample_function is not None:
unsharp_kernel_size = unsharp_kernel_size if unsharp_kernel_size % 2 == 1 else unsharp_kernel_size + 1
extra_options["unsharp_kernel_size"] = unsharp_kernel_size
_sampler = comfy.samplers.KSAMPLER(sample_function, extra_options)
else:
_sampler = comfy.samplers.sampler_object(sampler_name)
extra_options = None
new_pipe = {
"model": pipe['model'],
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": pipe["samples"],
"images": pipe["images"],
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"extra_options": extra_options,
"sampler": _sampler,
"sigmas": sigmas,
"steps": steps,
"cfg": cfg,
"add_noise": "enabled"
}
}
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# cascade预采样参数
class cascadeSettings:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{"pipe": ("PIPE_LINE",),
"encode_vae_name": (["None"] + folder_paths.get_filename_list("vae"),),
"decode_vae_name": (["None"] + folder_paths.get_filename_list("vae"),),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 4.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"default":"euler_ancestral"}),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"default":"simple"}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"optional": {
"image_to_latent_c": ("IMAGE",),
"latent_c": ("LATENT",),
},
"hidden":{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE",)
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def settings(self, pipe, encode_vae_name, decode_vae_name, steps, cfg, sampler_name, scheduler, denoise, seed, model=None, image_to_latent_c=None, latent_c=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
images, samples_c = None, None
samples = pipe['samples']
batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1
encode_vae_name = encode_vae_name if encode_vae_name is not None else pipe['loader_settings']['encode_vae_name']
decode_vae_name = decode_vae_name if decode_vae_name is not None else pipe['loader_settings']['decode_vae_name']
if image_to_latent_c is not None:
if encode_vae_name != 'None':
encode_vae = easyCache.load_vae(encode_vae_name)
else:
encode_vae = pipe['vae'][0]
if "compression" not in pipe["loader_settings"]:
raise Exception("compression is not found")
compression = pipe["loader_settings"]['compression']
width = image_to_latent_c.shape[-2]
height = image_to_latent_c.shape[-3]
out_width = (width // compression) * encode_vae.downscale_ratio
out_height = (height // compression) * encode_vae.downscale_ratio
s = comfy.utils.common_upscale(image_to_latent_c.movedim(-1, 1), out_width, out_height, "bicubic",
"center").movedim(1,
-1)
c_latent = encode_vae.encode(s[:, :, :, :3])
b_latent = torch.zeros([c_latent.shape[0], 4, height // 4, width // 4])
samples_c = {"samples": c_latent}
samples_c = RepeatLatentBatch().repeat(samples_c, batch_size)[0]
samples_b = {"samples": b_latent}
samples_b = RepeatLatentBatch().repeat(samples_b, batch_size)[0]
samples = (samples_c, samples_b)
images = image_to_latent_c
elif latent_c is not None:
samples_c = latent_c
samples = (samples_c, samples[1])
images = pipe["images"]
if samples_c is not None:
samples = (samples_c, samples[1])
new_pipe = {
"model": pipe['model'],
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": samples,
"images": images,
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"encode_vae_name": encode_vae_name,
"decode_vae_name": decode_vae_name,
"steps": steps,
"cfg": cfg,
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": denoise,
"add_noise": "enabled"
}
}
sampler.update_value_by_id("pipe_line", my_unique_id, new_pipe)
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# layerDiffusion预采样参数
class layerDiffusionSettings:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{
"pipe": ("PIPE_LINE",),
"method": ([LayerMethod.FG_ONLY_ATTN.value, LayerMethod.FG_ONLY_CONV.value, LayerMethod.EVERYTHING.value, LayerMethod.FG_TO_BLEND.value, LayerMethod.BG_TO_BLEND.value],),
"weight": ("FLOAT",{"default": 1.0, "min": -1, "max": 3, "step": 0.05},),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"default": "euler"}),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS+ NEW_SCHEDULERS, {"default": "normal"}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"optional": {
"image": ("IMAGE",),
"blended_image": ("IMAGE",),
"mask": ("MASK",),
# "latent": ("LATENT",),
# "blended_latent": ("LATENT",),
},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE",)
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def get_layer_diffusion_method(self, method, has_blend_latent):
method = LayerMethod(method)
if has_blend_latent:
if method == LayerMethod.BG_TO_BLEND:
method = LayerMethod.BG_BLEND_TO_FG
elif method == LayerMethod.FG_TO_BLEND:
method = LayerMethod.FG_BLEND_TO_BG
return method
def settings(self, pipe, method, weight, steps, cfg, sampler_name, scheduler, denoise, seed, image=None, blended_image=None, mask=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
blend_samples = pipe['blend_samples'] if "blend_samples" in pipe else None
vae = pipe["vae"]
batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1
method = self.get_layer_diffusion_method(method, blend_samples is not None or blended_image is not None)
if image is not None or "image" in pipe:
image = image if image is not None else pipe['image']
if mask is not None:
print('inpaint')
samples, = VAEEncodeForInpaint().encode(vae, image, mask)
else:
samples = {"samples": vae.encode(image[:,:,:,:3])}
samples = RepeatLatentBatch().repeat(samples, batch_size)[0]
images = image
elif "samp_images" in pipe:
samples = {"samples": vae.encode(pipe["samp_images"][:,:,:,:3])}
samples = RepeatLatentBatch().repeat(samples, batch_size)[0]
images = pipe["samp_images"]
else:
if method not in [LayerMethod.FG_ONLY_ATTN, LayerMethod.FG_ONLY_CONV, LayerMethod.EVERYTHING]:
raise Exception("image is missing")
samples = pipe["samples"]
images = pipe["images"]
if method in [LayerMethod.BG_BLEND_TO_FG, LayerMethod.FG_BLEND_TO_BG]:
if blended_image is None and blend_samples is None:
raise Exception("blended_image is missing")
elif blended_image is not None:
blend_samples = {"samples": vae.encode(blended_image[:,:,:,:3])}
blend_samples = RepeatLatentBatch().repeat(blend_samples, batch_size)[0]
new_pipe = {
"model": pipe['model'],
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": samples,
"blend_samples": blend_samples,
"images": images,
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"steps": steps,
"cfg": cfg,
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": denoise,
"add_noise": "enabled",
"layer_diffusion_method": method,
"layer_diffusion_weight": weight,
}
}
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# 预采样设置(layerDiffuse附加)
class layerDiffusionSettingsADDTL:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{
"pipe": ("PIPE_LINE",),
"foreground_prompt": ("STRING", {"default": "", "placeholder": "Foreground Additional Prompt", "multiline": True}),
"background_prompt": ("STRING", {"default": "", "placeholder": "Background Additional Prompt", "multiline": True}),
"blended_prompt": ("STRING", {"default": "", "placeholder": "Blended Additional Prompt", "multiline": True}),
},
"optional": {
"optional_fg_cond": ("CONDITIONING",),
"optional_bg_cond": ("CONDITIONING",),
"optional_blended_cond": ("CONDITIONING",),
},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE",)
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def settings(self, pipe, foreground_prompt, background_prompt, blended_prompt, optional_fg_cond=None, optional_bg_cond=None, optional_blended_cond=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
fg_cond, bg_cond, blended_cond = None, None, None
clip = pipe['clip']
if optional_fg_cond is not None:
fg_cond = optional_fg_cond
elif foreground_prompt != "":
fg_cond, = CLIPTextEncode().encode(clip, foreground_prompt)
if optional_bg_cond is not None:
bg_cond = optional_bg_cond
elif background_prompt != "":
bg_cond, = CLIPTextEncode().encode(clip, background_prompt)
if optional_blended_cond is not None:
blended_cond = optional_blended_cond
elif blended_prompt != "":
blended_cond, = CLIPTextEncode().encode(clip, blended_prompt)
new_pipe = {
**pipe,
"loader_settings": {
**pipe["loader_settings"],
"layer_diffusion_cond": (fg_cond, bg_cond, blended_cond)
}
}
del pipe
return (new_pipe,)
# 预采样设置(动态CFG)
from ..libs.dynthres_core import DynThresh
class dynamicCFGSettings:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {"required":
{"pipe": ("PIPE_LINE",),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"cfg_mode": (DynThresh.Modes,),
"cfg_scale_min": ("FLOAT", {"default": 3.5, "min": 0.0, "max": 100.0, "step": 0.5}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS+NEW_SCHEDULERS,),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}),
},
"optional":{
"image_to_latent": ("IMAGE",),
"latent": ("LATENT",)
},
"hidden":
{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("PIPE_LINE",)
RETURN_NAMES = ("pipe",)
FUNCTION = "settings"
CATEGORY = "EasyUse/PreSampling"
def settings(self, pipe, steps, cfg, cfg_mode, cfg_scale_min,sampler_name, scheduler, denoise, seed, image_to_latent=None, latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None):
dynamic_thresh = DynThresh(7.0, 1.0,"CONSTANT", 0, cfg_mode, cfg_scale_min, 0, 0, 999, False,
"MEAN", "AD", 1)
def sampler_dyn_thresh(args):
input = args["input"]
cond = input - args["cond"]
uncond = input - args["uncond"]
cond_scale = args["cond_scale"]
time_step = args["timestep"]
dynamic_thresh.step = 999 - time_step[0]
return input - dynamic_thresh.dynthresh(cond, uncond, cond_scale, None)
model = pipe['model']
m = model.clone()
m.set_model_sampler_cfg_function(sampler_dyn_thresh)
# 图生图转换
vae = pipe["vae"]
batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1
if image_to_latent is not None:
samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])}
samples = RepeatLatentBatch().repeat(samples, batch_size)[0]
images = image_to_latent
elif latent is not None:
samples = RepeatLatentBatch().repeat(latent, batch_size)[0]
images = pipe["images"]
else:
samples = pipe["samples"]
images = pipe["images"]
new_pipe = {
"model": m,
"positive": pipe['positive'],
"negative": pipe['negative'],
"vae": pipe['vae'],
"clip": pipe['clip'],
"samples": samples,
"images": images,
"seed": seed,
"loader_settings": {
**pipe["loader_settings"],
"steps": steps,
"cfg": cfg,
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": denoise
},
}
del pipe
return {"ui": {"value": [seed]}, "result": (new_pipe,)}
# 动态CFG
class dynamicThresholdingFull:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("MODEL",),
"mimic_scale": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "step": 0.5}),
"threshold_percentile": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"mimic_mode": (DynThresh.Modes,),
"mimic_scale_min": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 100.0, "step": 0.5}),
"cfg_mode": (DynThresh.Modes,),
"cfg_scale_min": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 100.0, "step": 0.5}),
"sched_val": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.01}),
"separate_feature_channels": (["enable", "disable"],),
"scaling_startpoint": (DynThresh.Startpoints,),
"variability_measure": (DynThresh.Variabilities,),
"interpolate_phi": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
}
}
RETURN_TYPES = ("MODEL",)
FUNCTION = "patch"
CATEGORY = "EasyUse/PreSampling"
def patch(self, model, mimic_scale, threshold_percentile, mimic_mode, mimic_scale_min, cfg_mode, cfg_scale_min,
sched_val, separate_feature_channels, scaling_startpoint, variability_measure, interpolate_phi):
dynamic_thresh = DynThresh(mimic_scale, threshold_percentile, mimic_mode, mimic_scale_min, cfg_mode,
cfg_scale_min, sched_val, 0, 999, separate_feature_channels == "enable",
scaling_startpoint, variability_measure, interpolate_phi)
def sampler_dyn_thresh(args):
input = args["input"]
cond = input - args["cond"]
uncond = input - args["uncond"]
cond_scale = args["cond_scale"]
time_step = args["timestep"]
dynamic_thresh.step = 999 - time_step[0]
return input - dynamic_thresh.dynthresh(cond, uncond, cond_scale, None)
m = model.clone()
m.set_model_sampler_cfg_function(sampler_dyn_thresh)
return (m,)
NODE_CLASS_MAPPINGS = {
"easy preSampling": samplerSettings,
"easy preSamplingAdvanced": samplerSettingsAdvanced,
"easy preSamplingNoiseIn": samplerSettingsNoiseIn,
"easy preSamplingCustom": samplerCustomSettings,
"easy preSamplingSdTurbo": sdTurboSettings,
"easy preSamplingDynamicCFG": dynamicCFGSettings,
"easy preSamplingCascade": cascadeSettings,
"easy preSamplingLayerDiffusion": layerDiffusionSettings,
"easy preSamplingLayerDiffusionADDTL": layerDiffusionSettingsADDTL,
"dynamicThresholdingFull": dynamicThresholdingFull,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"easy preSampling": "PreSampling",
"easy preSamplingAdvanced": "PreSampling (Advanced)",
"easy preSamplingNoiseIn": "PreSampling (NoiseIn)",
"easy preSamplingCustom": "PreSampling (Custom)",
"easy preSamplingSdTurbo": "PreSampling (SDTurbo)",
"easy preSamplingDynamicCFG": "PreSampling (DynamicCFG)",
"easy preSamplingCascade": "PreSampling (Cascade)",
"easy preSamplingLayerDiffusion": "PreSampling (LayerDiffuse)",
"easy preSamplingLayerDiffusionADDTL": "PreSampling (LayerDiffuse ADDTL)",
"dynamicThresholdingFull": "DynamicThresholdingFull",
}