|
|
import os |
|
|
import comfy.samplers |
|
|
import comfy.sample |
|
|
import torch |
|
|
from nodes import common_ksampler |
|
|
from .utils import expand_mask, FONTS_DIR, parse_string_to_list |
|
|
import torchvision.transforms.v2 as T |
|
|
import torch.nn.functional as F |
|
|
|
|
|
class KSamplerVariationsWithNoise: |
|
|
@classmethod |
|
|
def INPUT_TYPES(s): |
|
|
return {"required": { |
|
|
"model": ("MODEL", ), |
|
|
"latent_image": ("LATENT", ), |
|
|
"main_seed": ("INT:seed", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}), |
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), |
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), |
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), |
|
|
"positive": ("CONDITIONING", ), |
|
|
"negative": ("CONDITIONING", ), |
|
|
"variation_strength": ("FLOAT", {"default": 0.17, "min": 0.0, "max": 1.0, "step":0.01, "round": 0.01}), |
|
|
|
|
|
|
|
|
|
|
|
"variation_seed": ("INT:seed", {"default": 12345, "min": 0, "max": 0xffffffffffffffff}), |
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step":0.01, "round": 0.01}), |
|
|
}} |
|
|
|
|
|
RETURN_TYPES = ("LATENT",) |
|
|
FUNCTION = "execute" |
|
|
CATEGORY = "essentials/sampling" |
|
|
|
|
|
|
|
|
def slerp(self, val, low, high): |
|
|
dims = low.shape |
|
|
|
|
|
low = low.reshape(dims[0], -1) |
|
|
high = high.reshape(dims[0], -1) |
|
|
|
|
|
low_norm = low/torch.norm(low, dim=1, keepdim=True) |
|
|
high_norm = high/torch.norm(high, dim=1, keepdim=True) |
|
|
|
|
|
low_norm[low_norm != low_norm] = 0.0 |
|
|
high_norm[high_norm != high_norm] = 0.0 |
|
|
|
|
|
omega = torch.acos((low_norm*high_norm).sum(1)) |
|
|
so = torch.sin(omega) |
|
|
res = (torch.sin((1.0-val)*omega)/so).unsqueeze(1)*low + (torch.sin(val*omega)/so).unsqueeze(1) * high |
|
|
|
|
|
return res.reshape(dims) |
|
|
|
|
|
def prepare_mask(self, mask, shape): |
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(shape[2], shape[3]), mode="bilinear") |
|
|
mask = mask.expand((-1,shape[1],-1,-1)) |
|
|
if mask.shape[0] < shape[0]: |
|
|
mask = mask.repeat((shape[0] -1) // mask.shape[0] + 1, 1, 1, 1)[:shape[0]] |
|
|
return mask |
|
|
|
|
|
def execute(self, model, latent_image, main_seed, steps, cfg, sampler_name, scheduler, positive, negative, variation_strength, variation_seed, denoise): |
|
|
if main_seed == variation_seed: |
|
|
variation_seed += 1 |
|
|
|
|
|
end_at_step = steps |
|
|
start_at_step = round(end_at_step - end_at_step * denoise) |
|
|
|
|
|
force_full_denoise = True |
|
|
disable_noise = True |
|
|
|
|
|
device = comfy.model_management.get_torch_device() |
|
|
|
|
|
|
|
|
batch_size, _, height, width = latent_image["samples"].shape |
|
|
generator = torch.manual_seed(main_seed) |
|
|
base_noise = torch.randn((1, 4, height, width), dtype=torch.float32, device="cpu", generator=generator).repeat(batch_size, 1, 1, 1).cpu() |
|
|
|
|
|
|
|
|
generator = torch.manual_seed(variation_seed) |
|
|
variation_noise = torch.randn((batch_size, 4, height, width), dtype=torch.float32, device="cpu", generator=generator).cpu() |
|
|
|
|
|
slerp_noise = self.slerp(variation_strength, base_noise, variation_noise) |
|
|
|
|
|
|
|
|
comfy.model_management.load_model_gpu(model) |
|
|
sampler = comfy.samplers.KSampler(model, steps=steps, device=device, sampler=sampler_name, scheduler=scheduler, denoise=1.0, model_options=model.model_options) |
|
|
sigmas = sampler.sigmas |
|
|
sigma = sigmas[start_at_step] - sigmas[end_at_step] |
|
|
sigma /= model.model.latent_format.scale_factor |
|
|
sigma = sigma.detach().cpu().item() |
|
|
|
|
|
work_latent = latent_image.copy() |
|
|
work_latent["samples"] = latent_image["samples"].clone() + slerp_noise * sigma |
|
|
|
|
|
|
|
|
if "noise_mask" in latent_image: |
|
|
noise_mask = self.prepare_mask(latent_image["noise_mask"], latent_image['samples'].shape) |
|
|
work_latent["samples"] = noise_mask * work_latent["samples"] + (1-noise_mask) * latent_image["samples"] |
|
|
work_latent['noise_mask'] = expand_mask(latent_image["noise_mask"].clone(), 5, True) |
|
|
|
|
|
return common_ksampler(model, main_seed, steps, cfg, sampler_name, scheduler, positive, negative, work_latent, denoise=1.0, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) |
|
|
|
|
|
|
|
|
class KSamplerVariationsStochastic: |
|
|
@classmethod |
|
|
def INPUT_TYPES(s): |
|
|
return {"required":{ |
|
|
"model": ("MODEL",), |
|
|
"latent_image": ("LATENT", ), |
|
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
|
"steps": ("INT", {"default": 25, "min": 1, "max": 10000}), |
|
|
"cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), |
|
|
"sampler": (comfy.samplers.KSampler.SAMPLERS, ), |
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), |
|
|
"positive": ("CONDITIONING", ), |
|
|
"negative": ("CONDITIONING", ), |
|
|
"variation_seed": ("INT:seed", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
|
"variation_strength": ("FLOAT", {"default": 0.2, "min": 0.0, "max": 1.0, "step":0.05, "round": 0.01}), |
|
|
|
|
|
"cfg_scale": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step":0.05, "round": 0.01}), |
|
|
}} |
|
|
|
|
|
RETURN_TYPES = ("LATENT", ) |
|
|
FUNCTION = "execute" |
|
|
CATEGORY = "essentials/sampling" |
|
|
|
|
|
def execute(self, model, latent_image, noise_seed, steps, cfg, sampler, scheduler, positive, negative, variation_seed, variation_strength, cfg_scale, variation_sampler="dpmpp_2m_sde"): |
|
|
|
|
|
force_full_denoise = False |
|
|
disable_noise = False |
|
|
|
|
|
end_at_step = max(int(steps * (1-variation_strength)), 1) |
|
|
start_at_step = 0 |
|
|
|
|
|
work_latent = latent_image.copy() |
|
|
batch_size = work_latent["samples"].shape[0] |
|
|
work_latent["samples"] = work_latent["samples"][0].unsqueeze(0) |
|
|
|
|
|
stage1 = common_ksampler(model, noise_seed, steps, cfg, sampler, scheduler, positive, negative, work_latent, denoise=1.0, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)[0] |
|
|
|
|
|
if batch_size > 1: |
|
|
stage1["samples"] = stage1["samples"].clone().repeat(batch_size, 1, 1, 1) |
|
|
|
|
|
|
|
|
force_full_denoise = True |
|
|
disable_noise = True |
|
|
cfg = max(cfg * cfg_scale, 1.0) |
|
|
start_at_step = end_at_step |
|
|
end_at_step = steps |
|
|
|
|
|
return common_ksampler(model, variation_seed, steps, cfg, variation_sampler, scheduler, positive, negative, stage1, denoise=1.0, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) |
|
|
|
|
|
class InjectLatentNoise: |
|
|
@classmethod |
|
|
def INPUT_TYPES(s): |
|
|
return {"required": { |
|
|
"latent": ("LATENT", ), |
|
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
|
"noise_strength": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step":0.01, "round": 0.01}), |
|
|
}} |
|
|
|
|
|
RETURN_TYPES = ("LATENT",) |
|
|
FUNCTION = "execute" |
|
|
CATEGORY = "essentials/sampling" |
|
|
|
|
|
def execute(self, latent, noise_seed, noise_strength): |
|
|
torch.manual_seed(noise_seed) |
|
|
noise_latent = latent.copy() |
|
|
noise_latent["samples"] = noise_latent["samples"].clone() + torch.randn_like(noise_latent["samples"]) * noise_strength |
|
|
|
|
|
return (noise_latent, ) |
|
|
|
|
|
class FluxSamplerParams: |
|
|
@classmethod |
|
|
def INPUT_TYPES(s): |
|
|
return {"required": { |
|
|
"model": ("MODEL", ), |
|
|
"conditioning": ("CONDITIONING", ), |
|
|
"latent_image": ("LATENT", ), |
|
|
|
|
|
"noise": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "?" }), |
|
|
"sampler": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "ipndm" }), |
|
|
"scheduler": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "simple" }), |
|
|
"steps": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "20" }), |
|
|
"guidance": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "3.5" }), |
|
|
"max_shift": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "1.15" }), |
|
|
"base_shift": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "0.5" }), |
|
|
"split_sigmas": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "1.0" }), |
|
|
"denoise": ("STRING", { "multiline": False, "dynamicPrompts": False, "default": "1.0" }), |
|
|
}} |
|
|
|
|
|
RETURN_TYPES = ("LATENT","SAMPLER_PARAMS") |
|
|
RETURN_NAMES = ("latent", "params") |
|
|
FUNCTION = "execute" |
|
|
CATEGORY = "essentials/sampling" |
|
|
|
|
|
def execute(self, model, conditioning, latent_image, noise, sampler, scheduler, steps, guidance, max_shift, base_shift, split_sigmas, denoise): |
|
|
import random |
|
|
import time |
|
|
from comfy_extras.nodes_custom_sampler import Noise_RandomNoise, BasicScheduler, BasicGuider, SamplerCustomAdvanced, SplitSigmasDenoise |
|
|
from comfy_extras.nodes_latent import LatentBatch |
|
|
from comfy_extras.nodes_model_advanced import ModelSamplingFlux |
|
|
from node_helpers import conditioning_set_values |
|
|
|
|
|
noise = noise.replace("\n", ",").split(",") |
|
|
noise = [random.randint(0, 999999) if "?" in n else int(n) for n in noise] |
|
|
if not noise: |
|
|
noise = [random.randint(0, 999999)] |
|
|
|
|
|
if sampler == '*': |
|
|
sampler = comfy.samplers.KSampler.SAMPLERS |
|
|
elif sampler.startswith("!"): |
|
|
sampler = sampler.replace("\n", ",").split(",") |
|
|
sampler = [s.strip("! ") for s in sampler] |
|
|
sampler = [s for s in comfy.samplers.KSampler.SAMPLERS if s not in sampler] |
|
|
else: |
|
|
sampler = sampler.replace("\n", ",").split(",") |
|
|
sampler = [s.strip() for s in sampler if s.strip() in comfy.samplers.KSampler.SAMPLERS] |
|
|
if not sampler: |
|
|
sampler = ['ipndm'] |
|
|
|
|
|
if scheduler == '*': |
|
|
scheduler = comfy.samplers.KSampler.SCHEDULERS |
|
|
elif scheduler.startswith("!"): |
|
|
scheduler = scheduler.replace("\n", ",").split(",") |
|
|
scheduler = [s.strip("! ") for s in scheduler] |
|
|
scheduler = [s for s in comfy.samplers.KSampler.SCHEDULERS if s not in scheduler] |
|
|
else: |
|
|
scheduler = scheduler.replace("\n", ",").split(",") |
|
|
scheduler = [s.strip() for s in scheduler] |
|
|
scheduler = [s for s in scheduler if s in comfy.samplers.KSampler.SCHEDULERS] |
|
|
if not scheduler: |
|
|
scheduler = ['simple'] |
|
|
|
|
|
steps = steps.replace("\n", ",").split(",") |
|
|
steps = [int(s) for s in steps] |
|
|
if not steps: |
|
|
steps = [20] |
|
|
|
|
|
denoise = parse_string_to_list(denoise) |
|
|
if not denoise: |
|
|
denoise = [1.0] |
|
|
|
|
|
guidance = parse_string_to_list(guidance) |
|
|
if not guidance: |
|
|
guidance = [3.5] |
|
|
|
|
|
max_shift = parse_string_to_list(max_shift) |
|
|
if not max_shift: |
|
|
max_shift = [1.15] |
|
|
|
|
|
base_shift = parse_string_to_list(base_shift) |
|
|
if not base_shift: |
|
|
base_shift = [0.5] |
|
|
|
|
|
split_sigmas = parse_string_to_list(split_sigmas) |
|
|
if not split_sigmas: |
|
|
split_sigmas = [1.0] |
|
|
|
|
|
out_latent = None |
|
|
out_params = [] |
|
|
|
|
|
basicschedueler = BasicScheduler() |
|
|
basicguider = BasicGuider() |
|
|
samplercustomadvanced = SamplerCustomAdvanced() |
|
|
latentbatch = LatentBatch() |
|
|
modelsamplingflux = ModelSamplingFlux() |
|
|
splitsigmadenoise = SplitSigmasDenoise() |
|
|
width = latent_image["samples"].shape[3]*8 |
|
|
height = latent_image["samples"].shape[2]*8 |
|
|
|
|
|
for n in noise: |
|
|
randnoise = Noise_RandomNoise(n) |
|
|
for ms in max_shift: |
|
|
for bs in base_shift: |
|
|
work_model = modelsamplingflux.patch(model, ms, bs, width, height)[0] |
|
|
for g in guidance: |
|
|
cond = conditioning_set_values(conditioning, {"guidance": g}) |
|
|
guider = basicguider.get_guider(work_model, cond)[0] |
|
|
for s in sampler: |
|
|
samplerobj = comfy.samplers.sampler_object(s) |
|
|
for sc in scheduler: |
|
|
for st in steps: |
|
|
for d in denoise: |
|
|
sigmas = basicschedueler.get_sigmas(work_model, sc, st, d)[0] |
|
|
for ss in split_sigmas: |
|
|
sigmas = splitsigmadenoise.get_sigmas(sigmas, ss)[1] |
|
|
start_time = time.time() |
|
|
latent = samplercustomadvanced.sample(randnoise, guider, samplerobj, sigmas, latent_image)[1] |
|
|
elapsed_time = time.time() - start_time |
|
|
out_params.append({"time": elapsed_time, |
|
|
"seed": n, |
|
|
"sampler": s, |
|
|
"scheduler": sc, |
|
|
"steps": st, |
|
|
"guidance": g, |
|
|
"max_shift": ms, |
|
|
"base_shift": bs, |
|
|
"denoise": d, |
|
|
"split_sigmas": ss}) |
|
|
|
|
|
if out_latent is None: |
|
|
out_latent = latent |
|
|
else: |
|
|
out_latent = latentbatch.batch(out_latent, latent)[0] |
|
|
|
|
|
return (out_latent, out_params) |
|
|
|
|
|
class PlotParameters: |
|
|
@classmethod |
|
|
def INPUT_TYPES(s): |
|
|
return {"required": { |
|
|
"images": ("IMAGE", ), |
|
|
"params": ("SAMPLER_PARAMS", ), |
|
|
"order_by": (["none", "time", "seed", "steps", "denoise", "sampler", "scheduler"], ), |
|
|
"cols_value": (["none", "time", "seed", "steps", "denoise", "sampler", "scheduler"], ), |
|
|
"cols_num": ("INT", {"default": -1, "min": -1, "max": 1024 }), |
|
|
}} |
|
|
|
|
|
RETURN_TYPES = ("IMAGE", ) |
|
|
FUNCTION = "execute" |
|
|
CATEGORY = "essentials/sampling" |
|
|
|
|
|
def execute(self, images, params, order_by, cols_value, cols_num): |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
import math |
|
|
|
|
|
if images.shape[0] != len(params): |
|
|
raise ValueError("Number of images and number of parameters do not match.") |
|
|
|
|
|
if order_by != "none": |
|
|
if cols_value != "none" and cols_num < 1: |
|
|
cols_num = len(set(p[cols_value] for p in params)) |
|
|
sorted_params = sorted(params, key=lambda x: x[order_by]) |
|
|
indices = [params.index(item) for item in sorted_params] |
|
|
params = sorted_params |
|
|
images = images[torch.tensor(indices)] |
|
|
|
|
|
width = images.shape[2] |
|
|
out_image = None |
|
|
|
|
|
font = ImageFont.truetype(os.path.join(FONTS_DIR, 'ShareTechMono-Regular.ttf'), min(48, int(32*(width/1024)))) |
|
|
text_padding = 3 |
|
|
line_height = font.getmask('WwMmQqlL1234567890').getbbox()[3] + font.getmetrics()[1] + text_padding*2 |
|
|
|
|
|
for (image, param) in zip(images, params): |
|
|
text = f"time: {param['time']:.2f}s, seed: {param['seed']}, steps: {param['steps']}, denoise: {param['denoise']}\nsampler: {param['sampler']}, sched: {param['scheduler']}, sigmas at: {param['split_sigmas']}\nguidance: {param['guidance']}, max/base shift: {param['max_shift']}/{param['base_shift']}" |
|
|
lines = text.split("\n") |
|
|
text_height = line_height * len(lines) |
|
|
text_image = Image.new('RGB', (width, text_height), color=(0, 0, 0, 0)) |
|
|
|
|
|
for i, line in enumerate(lines): |
|
|
draw = ImageDraw.Draw(text_image) |
|
|
draw.text((text_padding, i * line_height + text_padding), line, font=font, fill=(255, 255, 255)) |
|
|
|
|
|
text_image = T.ToTensor()(text_image).unsqueeze(0).permute([0,2,3,1]).to(image.device) |
|
|
image = torch.cat([image.unsqueeze(0), text_image], 1) |
|
|
|
|
|
if out_image is None: |
|
|
out_image = image |
|
|
else: |
|
|
out_image = torch.cat([out_image, image], 0) |
|
|
|
|
|
if cols_num > -1: |
|
|
if cols_num == 0: |
|
|
mosaic_columns = int(math.sqrt(out_image.shape[0])) |
|
|
mosaic_columns = max(1, min(mosaic_columns, 1024)) |
|
|
|
|
|
cols = min(mosaic_columns, out_image.shape[0]) |
|
|
b, h, w, c = out_image.shape |
|
|
rows = math.ceil(b / cols) |
|
|
|
|
|
|
|
|
if b % cols != 0: |
|
|
padding = cols - (b % cols) |
|
|
out_image = F.pad(out_image, (0, 0, 0, 0, 0, 0, 0, padding)) |
|
|
b = out_image.shape[0] |
|
|
|
|
|
|
|
|
out_image = out_image.reshape(rows, cols, h, w, c) |
|
|
out_image = out_image.permute(0, 2, 1, 3, 4) |
|
|
out_image = out_image.reshape(rows * h, cols * w, c).unsqueeze(0) |
|
|
|
|
|
""" |
|
|
width = out_image.shape[2] |
|
|
# add the title and notes on top |
|
|
if title and export_labels: |
|
|
title_font = ImageFont.truetype(os.path.join(FONTS_DIR, 'ShareTechMono-Regular.ttf'), 48) |
|
|
title_width = title_font.getbbox(title)[2] |
|
|
title_padding = 6 |
|
|
title_line_height = title_font.getmask(title).getbbox()[3] + title_font.getmetrics()[1] + title_padding*2 |
|
|
title_text_height = title_line_height |
|
|
title_text_image = Image.new('RGB', (width, title_text_height), color=(0, 0, 0, 0)) |
|
|
|
|
|
draw = ImageDraw.Draw(title_text_image) |
|
|
draw.text((width//2 - title_width//2, title_padding), title, font=title_font, fill=(255, 255, 255)) |
|
|
|
|
|
title_text_image = T.ToTensor()(title_text_image).unsqueeze(0).permute([0,2,3,1]).to(out_image.device) |
|
|
out_image = torch.cat([title_text_image, out_image], 1) |
|
|
""" |
|
|
|
|
|
return (out_image, ) |
|
|
|
|
|
SAMPLING_CLASS_MAPPINGS = { |
|
|
"KSamplerVariationsStochastic+": KSamplerVariationsStochastic, |
|
|
"KSamplerVariationsWithNoise+": KSamplerVariationsWithNoise, |
|
|
"InjectLatentNoise+": InjectLatentNoise, |
|
|
"FluxSamplerParams+": FluxSamplerParams, |
|
|
"PlotParameters+": PlotParameters, |
|
|
} |
|
|
|
|
|
SAMPLING_NAME_MAPPINGS = { |
|
|
"KSamplerVariationsStochastic+": "🔧 KSampler Stochastic Variations", |
|
|
"KSamplerVariationsWithNoise+": "🔧 KSampler Variations with Noise Injection", |
|
|
"InjectLatentNoise+": "🔧 Inject Latent Noise", |
|
|
"FluxSamplerParams+": "🔧 Flux Sampler Parameters", |
|
|
"PlotParameters+": "🔧 Plot Sampler Parameters", |
|
|
} |