extenew / extensions /SchedRide /scripts /forge_overSched.py
dikdimon's picture
Upload extensions using SD-Hub extension
f4a41d8 verified
import gradio as gr
import inspect, os
from modules import scripts
import modules.shared as shared
from modules.shared import opts
import modules.sd_samplers_kdiffusion as K
import k_diffusion.sampling
import k_diffusion.external
import modules.sd_samplers
import modules.sd_samplers_common
import modules.sd_samplers_extra
import modules.sd_samplers_lcm
import modules.sd_samplers_timesteps
import torch, math, random
import torchvision.transforms.functional as TF
from PIL import Image
from modules.processing import get_fixed_seed
import numpy
from modules.sd_samplers_common import images_tensor_to_samples, approximation_indexes
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from modules.ui_components import ToolButton
from modules_forge.forge_sampler import sampling_prepare, sampling_cleanup
import colourPresets
class patchedKDiffusionSampler(modules.sd_samplers_common.Sampler):
samplers_list = [
('No change', None, {} ),
('DPM++ 2M', k_diffusion.sampling.sample_dpmpp_2m, {} ),
('Euler a', k_diffusion.sampling.sample_euler_ancestral, {"uses_ensd": True} ),
('Euler', k_diffusion.sampling.sample_euler, {} ),
('LMS', k_diffusion.sampling.sample_lms, {} ),
('Heun', k_diffusion.sampling.sample_heun, {"second_order": True} ),
('DPM2', k_diffusion.sampling.sample_dpm_2, {'discard_next_to_last_sigma': True, "second_order": True} ),
('DPM2 a', k_diffusion.sampling.sample_dpm_2_ancestral, {'discard_next_to_last_sigma': True, "uses_ensd": True, "second_order": True} ),
('DPM++ 2S a', k_diffusion.sampling.sample_dpmpp_2s_ancestral, {"uses_ensd": True, "second_order": True} ),
('DPM++ SDE', k_diffusion.sampling.sample_dpmpp_sde, {"second_order": True, "brownian_noise": True} ),
('DPM++ 2M SDE',k_diffusion.sampling.sample_dpmpp_2m_sde, {"brownian_noise": True} ),
('DPM++ 3M SDE',k_diffusion.sampling.sample_dpmpp_3m_sde, {'discard_next_to_last_sigma': True, "brownian_noise": True} ),
('DPM fast', k_diffusion.sampling.sample_dpm_fast, {"uses_ensd": True} ),
('DPM adaptive',k_diffusion.sampling.sample_dpm_adaptive, {"uses_ensd": True} ),
('Restart', modules.sd_samplers_extra.restart_sampler, {"second_order": True} ),
('LCM', modules.sd_samplers_lcm.sample_lcm, {} ),
]
try:
import importlib
EulerDy = importlib.import_module("extensions.Euler-Smea-Dyn-Sampler.smea_sampling")
samplers_list.extend([ ("Euler Dy", EulerDy.sample_euler_dy, {} ), ])
samplers_list.extend([ ("Euler SMEA Dy", EulerDy.sample_euler_smea_dy, {} ), ])
samplers_list.extend([ ("Euler Negative", EulerDy.sample_euler_negative, {} ), ])
samplers_list.extend([ ("Euler Negative Dy",EulerDy.sample_euler_dy_negative, {} ), ])
except:
print ("Scheduler Override: Smea sampling extension not found.")
#also get function name from all_samplers?
#20 next
# ('DDIM', modules.sd_samplers_timesteps.sd_samplers_timesteps_impl.ddim, {} ),
# ('PLMS', modules.sd_samplers_timesteps.sd_samplers_timesteps_impl.plms, {} ),
# ('UniPC', modules.sd_samplers_timesteps.sd_samplers_timesteps_impl.unipc, {} ),
def __init__(self, funcname, sd_model, options=None):
super().__init__(funcname)
self.extra_params = sampler_extra_params.get(funcname, [])
self.options = options or {}
self.func = funcname if callable(funcname) else getattr(k_diffusion.sampling, self.funcname)
self.model_wrap_cfg = CFGDenoiserKDiffusion(self)
self.model_wrap = self.model_wrap_cfg.inner_model
def get_sigmas_fibonacci (n, sigma_min, sigma_max, device='cpu'):
revsigmas = torch.linspace(sigma_min, sigma_max, n) # probably a better way
sigmas = torch.linspace(0, 1, n) # probably a better way
revsigmas[0] *= 1.0
revsigmas[1] *= 1.0
i = 2
while i < n:
revsigmas[i] = revsigmas[i-2] + revsigmas[i-1]
i += 1
i = 0
while i < n:
revsigmas[i] /= i+1
i += 1
i = 0
while i < n:
sigmas[i] = sigma_min + (sigma_max-sigma_min) * ((revsigmas[(n-1)-i] - revsigmas[0]) / revsigmas[n-1])
i += 1
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_phi(n, sigma_min, sigma_max, device='cpu'):
sigmas = torch.linspace(sigma_max, sigma_min, n, device=device)
phi = (1 + 5**0.5) / 2
for x in range(n):
sigmas[x] = sigma_min + (sigma_max-sigma_min)*((1-x/(n-1))**(phi*phi))
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_cosine(n, sigma_min, sigma_max, device='cpu'):
sigmas = torch.linspace(1, 0, n, device=device)
for x in range(n):
p = x / (n-1)
C = sigma_min + 0.5*(sigma_max-sigma_min)*(1 - math.cos(math.pi*(1 - p**0.5)))
sigmas[x] = C
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_fourth(n, sigma_min, sigma_max, device='cpu'):
sigmas = torch.linspace(1, 0, n, device=device)
sigmas = sigmas**4
sigmas *= sigma_max - sigma_min
sigmas += sigma_min
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_4xlinear(n, sigma_min, sigma_max, device='cpu'):
dropRate1 = 0.75#825
dropRate2 = 0.75#55
dropRate3 = 0.01
b1start = 0
b2start = n/4
b3start = n/2
b4start = 3*n/4
b1_v = 1.0
b2_v = b1_v * (1.0 - dropRate1)
b3_v = b2_v * (1.0 - dropRate2)
b4_v = b3_v * (1.0 - dropRate3)
sigmaList = []
i = 0
br = b2start - b1start
while i < br:
r = b2_v + (b1_v - b2_v) * (br - i) / (br)
sigmaList.append(r)
i += 1
i = 0
br = b3start - b2start
while i < br:
r = b3_v + (b2_v - b3_v) * (br - i) / (br)
sigmaList.append(r)
i += 1
i = 0
br = b4start - b3start
while i < br:
r = b4_v + (b3_v - b4_v) * (br - i) / (br)
sigmaList.append(r)
i += 1
i = 0
br = n - b4start
while i < br:
r = b4_v * (br - (i + 1)) / (br)
sigmaList.append(r)
i += 1
sigmas = torch.tensor(sigmaList, device=device)
sigmas *= (sigma_max - sigma_min)
sigmas += sigma_min
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_4xnonlinear(n, sigma_min, sigma_max, device='cpu'):
target1 = 0.32
target2 = 0.08
target3 = target2
b1start = 0
b2start = int(n/4)
b3start = int(n/2)
b4start = int(3*n/4)
sigmaList = []
K=(sigmin/sigmax)**(1/n)
i = 0
r = 1.0
br = b4start - b3start
scale = target1 ** (1/(br-1))
while i < br:
sigmaList.append(r)
r *= scale
i += 1
i = 0
br = b3start - b2start
scale = (target1 - target2) ** (1/(br-1))
while i < br:
sigmaList.append(r)
r *= scale
i += 1
i = 0
br = b4start - b3start
while i < br:
sigmaList.append(r)
i += 1
i = 0
br = n - b4start
b4v = r
while i < br:
r = b4v * (br -(i + 1)) / (br)
sigmaList.append(r)
i += 1
sigmas = torch.tensor(sigmaList, device=device)
sigmas *= (sigma_max - sigma_min)
sigmas += sigma_min
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_custom(n, sigma_min, sigma_max, device='cpu'):
# some safety checks ?
if 'import' in OverSchedForge.custom:
sigmas = torch.linspace(sigma_max, sigma_min, n, device=device)
elif 'eval' in OverSchedForge.custom:
sigmas = torch.linspace(sigma_max, sigma_min, n, device=device)
elif 'os' in OverSchedForge.custom:
sigmas = torch.linspace(sigma_max, sigma_min, n, device=device)
elif 'scripts' in OverSchedForge.custom:
sigmas = torch.linspace(sigma_max, sigma_min, n, device=device)
elif OverSchedForge.custom[0] == '[' and OverSchedForge.custom[-1] == ']':
# sigmasList = eval(OverSchedForge.custom)
sigmasList = [float(x) for x in OverSchedForge.custom.strip('[]').split(',')]
xs = numpy.linspace(0, 1, len(sigmasList))
ys = numpy.log(sigmasList[::-1])
new_xs = numpy.linspace(0, 1, n)
new_ys = numpy.interp(new_xs, xs, ys)
interpolated_ys = numpy.exp(new_ys)[::-1].copy()
sigmas = torch.tensor(interpolated_ys, device=device)
else:
sigmas = torch.linspace(sigma_max, sigma_min, n, device=device)
phi = (1 + 5**0.5) / 2
pi = math.pi
s = 0
while (s < n):
x = (s) / (n - 1)
M = sigma_max
m = sigma_min
sigmas[s] = eval((OverSchedForge.custom))
s += 1
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_AYS_sd15(n, sigma_min, sigma_max, device='cpu'):
sigmas_d = [14.615, 6.475, 3.861, 2.697, 1.886, 1.396, 0.963, 0.652, 0.399, 0.152, 0.029]
xs = numpy.linspace(0, 1, len(sigmas_d))
ys = numpy.log(sigmas_d[::-1])
new_xs = numpy.linspace(0, 1, n)
new_ys = numpy.interp(new_xs, xs, ys)
interped_ys = numpy.exp(new_ys)[::-1].copy()
sigmas = torch.tensor(interped_ys, device=device)
return torch.cat([sigmas, sigmas.new_zeros([1])])
def get_sigmas_AYS_sdXL(n, sigma_min, sigma_max, device='cpu'):
sigmas_d = [14.615, 6.315, 3.771, 2.181, 1.342, 0.862, 0.555, 0.380, 0.234, 0.113, 0.029]
xs = numpy.linspace(0, 1, len(sigmas_d))
ys = numpy.log(sigmas_d[::-1])
new_xs = numpy.linspace(0, 1, n)
new_ys = numpy.interp(new_xs, xs, ys)
interped_ys = numpy.exp(new_ys)[::-1].copy()
sigmas = torch.tensor(interped_ys, device=device)
return torch.cat([sigmas, sigmas.new_zeros([1])])
def scale_sigmas (sigmas, sigma_min, sigma_max, device='cpu'):
#scales sigmas to between given min/max - correction for default, ideally only a temp. fix
#better to find and patch the used get_sigmas functions - but where is it? kdiffusion.external
listSigmas = sigmas.tolist()
#assume min/max at end/start
currentMin = listSigmas[-1]
currentMax = listSigmas[0]
for i in range(len(listSigmas)):
listSigmas[i] -= currentMin
listSigmas[i] /= (currentMax - currentMin)
listSigmas[i] *= (sigma_max - sigma_min)
listSigmas[i] += sigma_min
return torch.tensor(listSigmas, device=device)
def setup_img2img_steps(p, steps=None):
requested_steps = (steps or p.steps)
steps = requested_steps
t_enc = requested_steps - 1
return steps, t_enc
def calculate_sigmas (self, scheduler, steps, sigmaMin, sigmaMax): #scheduler is a parameter to enable previews (does it matter?)
if scheduler == 'karras':
if opts.use_old_karras_scheduler_sigmas:
sigmaMin, sigmaMax = (0.1, 10)
sigmas = k_diffusion.sampling.get_sigmas_karras (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif scheduler == 'exponential':
sigmas = k_diffusion.sampling.get_sigmas_exponential (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif scheduler == 'cosine':
sigmas = patchedKDiffusionSampler.get_sigmas_cosine (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif scheduler == 'phi':
sigmas = patchedKDiffusionSampler.get_sigmas_phi (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif scheduler == 'fibonacci':
sigmas = patchedKDiffusionSampler.get_sigmas_fibonacci (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif scheduler == 'continuous VP':
sigmas = k_diffusion.sampling.get_sigmas_vp (n=steps, device=shared.device)
elif scheduler == '4th power':
sigmas = patchedKDiffusionSampler.get_sigmas_fourth (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif scheduler == 'Align Your Steps':
if shared.sd_model.is_sdxl == True:
sigmas = patchedKDiffusionSampler.get_sigmas_AYS_sdXL (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
elif shared.sd_model.is_sd1 == True:
sigmas = patchedKDiffusionSampler.get_sigmas_AYS_sd15 (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
else: # fall back to default
sigmas = self.model_wrap.get_sigmas(steps)
sigmas = patchedKDiffusionSampler.scale_sigmas (sigmas, sigmaMin, sigmaMax)
# sigmas = patchedKDiffusionSampler.scale_sigmas (sigmas, sigmaMin, sigmaMax)
elif scheduler == 'custom' and OverSchedForge.custom != "":
sigmas = patchedKDiffusionSampler.get_sigmas_custom (n=steps, sigma_min=sigmaMin, sigma_max=sigmaMax, device=shared.device)
else:
sigmas = self.model_wrap.get_sigmas(steps)
sigmas = patchedKDiffusionSampler.scale_sigmas (sigmas, sigmaMin, sigmaMax)
#evenly spaced timesteps from 999 to 0
#uses table of log sigmas for all possible timesteps, interpolates
return sigmas
#DiscreteSchedule.get_sigmas CompVisDenoiser
def apply_action (action, sigmas):
if action == None:
return sigmas
else:
sigmaList = sigmas.tolist()
steps = len(sigmaList)-1 # -1 to ignore the zero
sigmaMin = sigmaList[-2]
sigmaMax = sigmaList[0]
if action == "blend to exponential":
K = (sigmaMin / sigmaMax)**(1/(steps-1))
E = sigmaMax
for x in range(steps):
p = x / (steps-1)
sigmaList[x] = sigmaList[x] + p * (E - sigmaList[x])
E *= K
elif action == "blend to linear":
E = sigmaMax**0.5
D = (E - sigmaMin) / (steps-1)
for x in range(steps):
p = x / (steps-1)
sigmaList[x] = sigmaList[x] + p * (E - sigmaList[x])
E -= D
elif action == "threshold":
E = sigmaMax**0.5
D = (E - sigmaMin) / (steps-1)
for x in range(steps):
sigmaList[x] = max(sigmaList[x], E)
E -= D
return torch.tensor(sigmaList, device=shared.device)
def get_sigmas(self, p, steps):
# restore original functions ASAP, in case of problem later
K.KDiffusionSampler.get_sigmas = OverSchedForge.get_sigmas_backup
if OverSchedForge.hiresAlt != "default":
if p.hr_second_pass_steps > 0:
steps = p.hr_second_pass_steps
modules.sd_samplers_common.setup_img2img_steps = OverSchedForge.setup_img2img_steps_backup
m_sigma_min = OverSchedForge.sigmaMin
m_sigma_max = OverSchedForge.sigmaMax
discard_next_to_last_sigma = self.config is not None and self.config.options.get('discard_next_to_last_sigma', False)
if opts.always_discard_next_to_last_sigma and not discard_next_to_last_sigma:
discard_next_to_last_sigma = True
p.extra_generation_params["Discard penultimate sigma"] = True
steps += 1 if discard_next_to_last_sigma else 0
if OverSchedForge.sgm == True:
steps += 1
if OverSchedForge.hiresAlt == "scale max sigma":
m_sigma_max *= p.denoising_strength
elif OverSchedForge.hiresAlt == "linear":
m_sigma_max *= p.denoising_strength
m_sigma_min *= p.denoising_strength
sigmas = torch.linspace(m_sigma_max, m_sigma_min, steps, device=shared.device)
return torch.cat([sigmas, sigmas.new_zeros([1])])
#other methods?
if steps == 1:
sigmas = torch.tensor([m_sigma_max**0.5, 0.0], device=shared.device)
else:
sigmas = patchedKDiffusionSampler.calculate_sigmas (self, OverSchedForge.scheduler, steps, m_sigma_min, m_sigma_max)
sigmas = patchedKDiffusionSampler.apply_action (OverSchedForge.action, sigmas)
if discard_next_to_last_sigma:
sigmas = torch.cat([sigmas[:-2], sigmas[-1:]])
if OverSchedForge.sgm == True:
sigmas = sigmas[:-1]
return sigmas
# via extraltodeus
# found at https://gist.github.com/vadimkantorov/ac1b097753f217c5c11bc2ff396e0a57
# which was ported from https://github.com/pvigier/perlin-numpy/blob/master/perlin2d.py
def rand_perlin_2d(shape, res, fade = lambda t: 6*t**5 - 15*t**4 + 10*t**3):
delta = (res[0] / shape[0], res[1] / shape[1])
d = (shape[0] // res[0], shape[1] // res[1])
grid = torch.stack(torch.meshgrid(torch.arange(0, res[0], delta[0]), torch.arange(0, res[1], delta[1])), dim = -1) % 1
angles = 2*math.pi*torch.rand(res[0]+1, res[1]+1)
gradients = torch.stack((torch.cos(angles), torch.sin(angles)), dim = -1)
tile_grads = lambda slice1, slice2: gradients[slice1[0]:slice1[1], slice2[0]:slice2[1]].repeat_interleave(d[0], 0).repeat_interleave(d[1], 1)
dot = lambda grad, shift: (torch.stack((grid[:shape[0],:shape[1],0] + shift[0], grid[:shape[0],:shape[1], 1] + shift[1] ), dim = -1) * grad[:shape[0], :shape[1]]).sum(dim = -1)
n00 = dot(tile_grads([0, -1], [0, -1]), [0, 0])
n10 = dot(tile_grads([1, None], [0, -1]), [-1, 0])
n01 = dot(tile_grads([0, -1],[1, None]), [0, -1])
n11 = dot(tile_grads([1, None], [1, None]), [-1,-1])
t = fade(grid[:shape[0], :shape[1]])
return math.sqrt(2) * torch.lerp(torch.lerp(n00, n10, t[..., 0]), torch.lerp(n01, n11, t[..., 0]), t[..., 1])
def rand_perlin_2d_octaves(shape, res, octaves=1, persistence=0.5):
noise = torch.zeros(shape)
frequency = 1
amplitude = 1
minDim = min(shape[0], shape[1])
for _ in range(octaves):
noise += amplitude * patchedKDiffusionSampler.rand_perlin_2d(shape, (frequency*res[0], frequency*res[1]))
frequency *= 2
if shape[0] % frequency != 0:
break
if shape[1] % frequency != 0:
break
amplitude *= persistence
noise = torch.remainder(torch.abs(noise)*1000000,17)/17
return noise
def create_noisy_latents_perlin(seed, width, height, batch_size, detail_level):
noise = torch.zeros((batch_size, 4, height, width), dtype=torch.float32, device="cpu").cpu()
if "(1 octave)" in OverSchedForge.noise:
octaves = 1
elif "(2 octaves)" in OverSchedForge.noise:
octaves = 2
elif "(4 octaves)" in OverSchedForge.noise:
octaves = 4
elif "(max octaves)" in OverSchedForge.noise:
octaves = 99
else:
octaves = 1
for i in range(batch_size):
torch.manual_seed(seed + i)
for j in range(4):
noise_values = patchedKDiffusionSampler.rand_perlin_2d_octaves((height, width), (1,1), octaves, 0.5)
noise_values -= 0.5 * noise_values.max()
noise_values *= 2
result = (1+detail_level/10)*torch.erfinv(noise_values) * (2 ** 0.5)
result = torch.clamp(result,-4,4)
noise[i, j, :, :] = result
return noise
def sample(self, p, x, conditioning, unconditional_conditioning, steps=None, image_conditioning=None):
# restore original function immediately, in case of failure later means the main extension can't remove it
K.KDiffusionSampler.sample = OverSchedForge.sample_backup
unet_patcher = self.model_wrap.inner_model.forge_objects.unet
sampling_prepare(unet_patcher, x=x)
self.model_wrap.log_sigmas = self.model_wrap.log_sigmas.to(x.device)
self.model_wrap.sigmas = self.model_wrap.sigmas.to(x.device)
steps = steps or p.steps
sigmas = K.KDiffusionSampler.get_sigmas(self, p, steps).to(x.device)
w = x.size(3)
h = x.size(2)
n = x.size(0)
seed = p.seed
detail = 0.0 # range -1 to 1
# can modify initial noise here? yep
if "Perlin" in OverSchedForge.noise:
x = patchedKDiffusionSampler.create_noisy_latents_perlin (seed, w, h, n, detail).to('cuda')
if OverSchedForge.centreNoise:
for b in range(len(x)):
for c in range(4):
x[b][c] -= x[b][c].mean()
if OverSchedForge.lowDNoise:
for b in range(len(x)): #3,5,9
blur2 = TF.gaussian_blur(x[b], 3)
blur4 = TF.gaussian_blur(x[b], 5)
blur8 = TF.gaussian_blur(x[b], 9)
x[b] = (0.0375 * blur8) + (0.0375 * blur4) + (0.075 * blur2) + (0.985 * x[b])
# sharpen the initial noise, using trial derived values
if OverSchedForge.sharpNoise:
minDim = 1 + 2 * (min(w, h) // 2)
for b in range(len(x)):
blurred = TF.gaussian_blur(x[b], minDim)
x[b] = 1.04*x[b] - 0.04*blurred
# clamp noise
# set all latent channels to same value
# colour the initial noise
if OverSchedForge.noiseStrength != 0.0:
nr = ((OverSchedForge.initialNoiseR ** 0.5) * 2) - 1.0
ng = ((OverSchedForge.initialNoiseG ** 0.5) * 2) - 1.0
nb = ((OverSchedForge.initialNoiseB ** 0.5) * 2) - 1.0
imageR = torch.tensor(numpy.full((8,8), (nr), dtype=numpy.float32))
imageG = torch.tensor(numpy.full((8,8), (ng), dtype=numpy.float32))
imageB = torch.tensor(numpy.full((8,8), (nb), dtype=numpy.float32))
image = torch.stack((imageR, imageG, imageB), dim=0)
image = image.unsqueeze(0)
latent = images_tensor_to_samples(image, approximation_indexes.get(opts.sd_vae_encode_method), p.sd_model)
if shared.sd_model.is_sd1 == True:
latent *= 3.5
latent = latent.repeat (x.size(0), 1, h, w)
# effect seems reduced with sdxl, so here's a hack
strength = OverSchedForge.noiseStrength
if shared.sd_model.is_sdxl == True:
strength *= 2.0 ** 0.5
# method 0: mean stays approximately the sames
torch.lerp (x, latent, strength, out=x)
# method 1: mean moves toward colour
#x += latent * OverSchedForge.noiseStrength
del imageR, imageG, imageB, image, latent
if opts.sgm_noise_multiplier:
p.extra_generation_params["SGM noise multiplier"] = True
x = x * torch.sqrt(1.0 + sigmas[0] ** 2.0)
else:
x = x * sigmas[0]
extra_params_kwargs = self.initialize(p)
#p is modules.processing.StableDiffusionProcessingTxt2Img object
self.last_latent = x
self.sampler_extra_args = {
'cond': conditioning,
'image_cond': image_conditioning,
'uncond': unconditional_conditioning,
'cond_scale': p.cfg_scale,
's_min_uncond': self.s_min_uncond
}
listSigmas = sigmas.tolist()
samplerIndex = OverSchedForge.samplerIndex
if samplerIndex == 0:
s1 = sigmas.to('cuda')
else:
stepToChange = int(OverSchedForge.step * len(sigmas))
s1 = torch.tensor(listSigmas[0:stepToChange+1], device='cuda:0')
s2 = torch.tensor(listSigmas[stepToChange:len(sigmas)], device='cuda:0')
parameters = inspect.signature(self.func).parameters
if 'n' in parameters:
extra_params_kwargs['n'] = steps
if 'sigma_min' in parameters:
extra_params_kwargs['sigma_min'] = OverSchedForge.sigmaMin #self.model_wrap.sigmas[-1].item()
extra_params_kwargs['sigma_max'] = OverSchedForge.sigmaMax #self.model_wrap.sigmas[0].item()
if 'sigmas' in parameters:
extra_params_kwargs['sigmas'] = s1
if self.config.options.get('brownian_noise', False):
noise_sampler = self.create_noise_sampler(x, sigmas, p)
extra_params_kwargs['noise_sampler'] = noise_sampler
if self.config.options.get('solver_type', None) == 'heun':
extra_params_kwargs['solver_type'] = 'heun'
samples = self.launch_sampling(steps, lambda: self.func(self.model_wrap_cfg, x, extra_args=self.sampler_extra_args,
disable=False, callback=self.callback_state, **extra_params_kwargs))
### this is correct, but is it complete??
#self.sampler_extra_args might need updating
#some samplers might need more arguments to be set
#but how useful are they anyway?
#euler dy slightly different results with switch, and dpm2 (uses sigma[i-1])
if samplerIndex != 0:
self.func = patchedKDiffusionSampler.samplers_list[samplerIndex][1]
extraParams = patchedKDiffusionSampler.samplers_list[samplerIndex][2]
parameters = inspect.signature(self.func).parameters
if 'n' in parameters:
extra_params_kwargs['n'] = steps
if 'sigma_min' in parameters:
extra_params_kwargs['sigma_min'] = OverSchedForge.sigmaMin #self.model_wrap.sigmas[-1].item()
extra_params_kwargs['sigma_max'] = OverSchedForge.sigmaMax #self.model_wrap.sigmas[0].item()
if 'sigmas' in parameters:
extra_params_kwargs['sigmas'] = s2
else:
extra_params_kwargs.pop('sigmas', None)
if extraParams.get('brownian_noise', False):
noise_sampler = self.create_noise_sampler(x, sigmas, p) # should this use samples instead of x?
extra_params_kwargs['noise_sampler'] = noise_sampler
extra_params_kwargs['s_noise'] = 1.0
extra_params_kwargs['eta'] = 1.0
else:
extra_params_kwargs.pop('eta', None)
extra_params_kwargs.pop('s_noise', None)
extra_params_kwargs.pop('noise_sampler', None)
if extraParams.get('solver_type', None) == 'heun':
extra_params_kwargs['solver_type'] = 'heun'
if samplerIndex == 3 or samplerIndex == 5 or samplerIndex == 6 : #euler, heun, dpm2
extra_params_kwargs['s_churn'] = shared.opts.s_churn
extra_params_kwargs['s_tmin'] = shared.opts.s_tmin
extra_params_kwargs['s_tmax'] = shared.opts.s_tmax
elif samplerIndex >= 16 and samplerIndex <= 19: #euler dy *4
extra_params_kwargs['s_churn'] = shared.opts.s_churn
extra_params_kwargs['s_tmin'] = shared.opts.s_tmin
extra_params_kwargs['s_tmax'] = shared.opts.s_tmax
extra_params_kwargs['s_noise'] = shared.opts.s_noise
else:
extra_params_kwargs.pop('s_churn', None)
extra_params_kwargs.pop('s_tmin', None)
extra_params_kwargs.pop('s_tmax', None)
samples = self.launch_sampling(steps, lambda: self.func(self.model_wrap_cfg, samples, extra_args=self.sampler_extra_args,
disable=False, callback=self.callback_state, **extra_params_kwargs))
## #save
##import pickle
## with open("c:\\temp\\latent.pkl", "wb") as file:
## pickle.dump(samples, file, pickle.HIGHEST_PROTOCOL)
self.add_infotext(p)
sampling_cleanup(unet_patcher)
return samples
class OverSchedForge(scripts.Script):
custom = ""
scheduler = "None"
samplerIndex = 0
step = 0.5
sample_backup = None
get_sigmas_backup = None
setup_img2img_steps_backup = None
sgm = False
centreNoise = False
sharpNoise = False
lowDNoise = False
## last_scheduler = None
from colourPresets import presetList
def __init__(self):
OverSchedForge.get_sigmas_backup = K.KDiffusionSampler.get_sigmas
OverSchedForge.sample_backup = K.KDiffusionSampler.sample
OverSchedForge.setup_img2img_steps_backup = modules.sd_samplers_common.setup_img2img_steps
def title(self):
return "Scheduler Override"
def show(self, is_img2img):
# make this extension visible in both txt2img and img2img tab.
return scripts.AlwaysVisible
def ui(self, *args, **kwargs):
samplerList = [x[0] for x in patchedKDiffusionSampler.samplers_list]
with gr.Accordion(open=False, label=self.title()):
with gr.Row(equalHeight=True):
enabled = gr.Checkbox(value=False, label='Enabled')
sgm = gr.Checkbox(value=False, label='SGM')
hiresAlt = gr.Dropdown(["default", "scale max sigma", "linear"], value="default", type="value", label='HiRes method')
with gr.Row(equalHeight=True):
scheduler = gr.Dropdown(["None", "simple", "karras", "exponential", "cosine",
"phi", "fibonacci", "continuous VP", "4th power",
"Align Your Steps", "custom"],
value="None", type='value', label='Scheduler choice', scale=1)
action = gr.Dropdown(["None", "blend to exponential", "blend to linear", "threshold"],
value="None", type="value", label="extra action")
custom = gr.Textbox(value='', label='custom function/list', lines=1.1, visible=False)
with gr.Row():
defMin = ToolButton (value='\U000027F3')
sigmaMin = gr.Slider (label="sigma minimum", value=0.029168,
minimum=0.001, maximum=2.0, step=0.001);
sigmaMax = gr.Slider (label="sigma maximum", value=14.614642,
minimum=2.0, maximum=30.0, step=0.001);
defMax = ToolButton (value='\U000027F3')
with gr.Accordion (open=False, label="Sigmas graph"):
z_vis = gr.Plot(value=None, elem_id='schedride-vis', show_label=False, scale=2)
with gr.Row(equalHeight=True):
sampler = gr.Dropdown(samplerList, value="No change", type='index', label='Sampler choice', scale=1)
step = gr.Slider(minimum=0.01, maximum=0.99, value=0.5, label='Step to change sampler')
with gr.Accordion (open=False, label="Initial noise"):
with gr.Row(equalHeight=True):
delPreset = ToolButton(value="-", variant='secondary', tooltip='remove preset')
preset = gr.Dropdown([i[0] for i in self.presetList], value="(None)", type='value', label='Colour presets', allow_custom_value=True)
addPreset = ToolButton(value="+", variant='secondary', tooltip='add preset')
savePreset = ToolButton(value="save", variant='secondary', tooltip='save presets')
noise = gr.Dropdown(["default", "Perlin (1 octave)", "Perlin (2 octaves)", "Perlin (4 octaves)", "Perlin (max octaves)"], type="value", value="default", label='noise type', scale=0)
centreNoise = ToolButton(value="c", variant='secondary', tooltip='Centre initial noise')
lowDNoise = ToolButton(value="d", variant='secondary', tooltip='low discrepancy noise')
sharpNoise = ToolButton(value="s", variant='secondary', tooltip='Sharpen initial noise')
with gr.Row():
initialNoiseR = gr.Slider(minimum=0, maximum=1.0, value=0.0, label='red')
initialNoiseG = gr.Slider(minimum=0, maximum=1.0, value=0.0, label='green')
initialNoiseB = gr.Slider(minimum=0, maximum=1.0, value=0.0, label='blue')
noiseStrength = gr.Slider(minimum=0, maximum=0.1, value=0.0, step=0.001, label='strength')
for i in [scheduler, action, custom, sigmaMin, sigmaMax]:
i.change(
fn=self.visualize,
inputs=[scheduler, action, sigmaMin, sigmaMax, custom],
outputs=[z_vis],
show_progress=False
)
def toggleCustom (scheduler):
if scheduler == "custom":
return gr.update(visible=True)
else:
return gr.update(visible=False)
def updateColours (preset, nR, nG, nB, nS):
for i in range(len(self.presetList)):
p = self.presetList[i]
if p[0] == preset:
return p[1], p[2], p[3], p[4]
return nR, nG, nB, nS
scheduler.change(fn=toggleCustom, inputs=[scheduler], outputs=[custom], show_progress=False)
preset.change(fn=updateColours, inputs=[preset, initialNoiseR, initialNoiseG, initialNoiseB, noiseStrength], outputs=[initialNoiseR, initialNoiseG, initialNoiseB, noiseStrength], show_progress=False)
def defaultSigmaMin ():
return 0.029168
def defaultSigmaMax ():
return 14.614642
defMin.click(defaultSigmaMin, inputs=[], outputs=sigmaMin, show_progress=False)
defMax.click(defaultSigmaMax, inputs=[], outputs=sigmaMax, show_progress=False)
def toggleCentre ():
OverSchedForge.centreNoise ^= True
return gr.Button.update(value=['c', 'C'][OverSchedForge.centreNoise],
variant=['secondary', 'primary'][OverSchedForge.centreNoise])
def togglelowD ():
OverSchedForge.lowDNoise ^= True
return gr.Button.update(value=['d', 'D'][OverSchedForge.lowDNoise],
variant=['secondary', 'primary'][OverSchedForge.lowDNoise])
def toggleSharp ():
OverSchedForge.sharpNoise ^= True
return gr.Button.update(value=['s', 'S'][OverSchedForge.sharpNoise],
variant=['secondary', 'primary'][OverSchedForge.sharpNoise])
def addColourPreset (name, r, g, b, s):
namelist = [i[0] for i in self.presetList]
if name not in namelist:
self.presetList.append((name, r, g, b, s))
self.presetList = sorted(self.presetList)
return gr.Dropdown.update(choices=[i[0] for i in self.presetList])
def delColourPreset (name):
for i in range(len(self.presetList)):
if name != "(None)" and self.presetList[i][0] == name:
del (self.presetList[i])
break
return gr.Dropdown.update(choices=[i[0] for i in self.presetList])
def saveColourPreset ():
#sort alphabetically? or button for that
file = os.path.abspath(colourPresets.__file__)
text = "presetList = [\n\t" + ',\n\t'.join(map(str, self.presetList)) +"\n]"
with open(file, 'w') as f:
f.write(text)
centreNoise.click(toggleCentre, inputs=[], outputs=centreNoise)
lowDNoise.click(togglelowD, inputs=[], outputs=lowDNoise)
sharpNoise.click(toggleSharp, inputs=[], outputs=sharpNoise)
addPreset.click(addColourPreset, inputs=[preset, initialNoiseR, initialNoiseG, initialNoiseB, noiseStrength], outputs=preset)
delPreset.click(delColourPreset, inputs=preset, outputs=preset)
savePreset.click(saveColourPreset, inputs=[], outputs=[])
self.infotext_fields = [
(enabled, lambda d: enabled.update(value=("os_enabled" in d))),
(hiresAlt, "os_hiresAlt"),
(sgm, "os_sgm"),
(scheduler, "os_scheduler"),
(action, "os_action"),
(custom, "os_custom"),
(sigmaMin, "os_sigmaMin"),
(sigmaMax, "os_sigmaMax"),
(sampler, "os_sampler"),
(step, "os_step"),
(noiseStrength, "os_noiseStr"),
(initialNoiseR, "os_nR"),
(initialNoiseG, "os_nG"),
(initialNoiseB, "os_nB"),
(noise, "os_noise"),
]
return enabled, hiresAlt, sgm, scheduler, action, custom, sigmaMin, sigmaMax, initialNoiseR, initialNoiseG, initialNoiseB, noiseStrength, sampler, step, noise
def visualize(self, scheduler, action, sigmaMin, sigmaMax, custom):
if scheduler == "None" or scheduler == "simple":
return
if scheduler == "custom":
if custom == "":
return
try:
m, M, x, pi, phi, n, s = 1, 1, 1, 1, 1, 1, 1
dummy = eval(custom.strip())
OverSchedForge.custom = custom.strip()
except:
return
steps = 35 # shared.state.sampling_steps not updated until Generate
plot_color = (1, 1, 0.8, 1.0)
plt.rcParams.update({
"text.color": plot_color,
"axes.labelcolor": plot_color,
"axes.edgecolor": plot_color,
"figure.facecolor": (0.0, 0.0, 0.0, 0.0),
"axes.facecolor": (0.0, 0.0, 0.0, 0.0),
"ytick.labelsize": 6,
"ytick.labelcolor": plot_color,
"ytick.color": plot_color,
"figure.figsize": [5, 2.5]
})
fig, ax = plt.subplots(layout="constrained")
if steps == 1:
values = [sigmaMax**0.5, 0.0]
else:
values = patchedKDiffusionSampler.calculate_sigmas (self, scheduler, steps-1, sigmaMin, sigmaMax)
values = patchedKDiffusionSampler.apply_action (action, values).tolist()
ax.plot(range(steps), values, color=plot_color)
## better to specify a comparison scheduler, but if custom, should also remember those settings
## if scheduler != OverSchedForge.last_scheduler and OverSchedForge.last_scheduler != None:
## values2 = patchedKDiffusionSampler.calculate_sigmas (OverSchedForge.last_scheduler, steps-1, sigmaMin, sigmaMax).tolist()
## plot_color2 = (0.8, 0.4, 0.4, 1.0)
## ax.plot(range(steps), values2, color=plot_color2)
## OverSchedForge.last_scheduler = scheduler
ax.tick_params(right=False, color=plot_color)
ax.set_xticks([i * (steps - 1) / 10 for i in range(10)][1:])
ax.set_xticklabels([])
ax.set_ylim([0,sigmaMax])
ax.set_xlim([0,steps-1])
plt.close()
return fig
def process(self, params, *script_args, **kwargs):
enabled, hiresAlt, sgm, scheduler, action, custom, sigmaMin, sigmaMax, initialNoiseR, initialNoiseG, initialNoiseB, noiseStrength, sampler, step, noise = script_args
if not enabled:
return
OverSchedForge.hiresAlt = "default"
OverSchedForge.sgm = sgm
OverSchedForge.scheduler = scheduler
OverSchedForge.action = action
OverSchedForge.custom = custom.strip()
OverSchedForge.sigmaMin = sigmaMin
OverSchedForge.sigmaMax = sigmaMax
OverSchedForge.initialNoiseR = initialNoiseR
OverSchedForge.initialNoiseG = initialNoiseG
OverSchedForge.initialNoiseB = initialNoiseB
OverSchedForge.noiseStrength = noiseStrength
OverSchedForge.samplerIndex = sampler
OverSchedForge.step = step
OverSchedForge.noise = noise
params.extra_generation_params.update({
"os_enabled" : enabled,
"os_hiresAlt" : hiresAlt,
"os_sgm" : sgm,
"os_scheduler" : scheduler,
"os_action" : action,
"os_sigmaMin" : sigmaMin,
"os_sigmaMax" : sigmaMax,
"os_sampler" : patchedKDiffusionSampler.samplers_list[sampler][0],
"os_noise" : OverSchedForge.noise,
"os_centreNoise" : OverSchedForge.centreNoise,
"os_lowDNoise" : OverSchedForge.lowDNoise,
"os_sharpNoise" : OverSchedForge.sharpNoise,
"os_noiseStr" : noiseStrength,
})
if scheduler == "custom":
params.extra_generation_params.update(dict(os_custom = custom, ))
if sampler != 0:
params.extra_generation_params.update(dict(os_step = step, ))
if noiseStrength != 0:
params.extra_generation_params.update(dict(os_nR = initialNoiseR, os_nG = initialNoiseG, os_nB = initialNoiseB, ))
return
def before_process (self, params, *args):
enabled = args[0]
if enabled and params.seed == -1:
params.seed = get_fixed_seed(params.seed)
def process_before_every_sampling(self, params, *script_args, **kwargs):
# This will be called before every sampling.
# If you use highres fix, this will be called twice.
enabled = script_args[0]
hiresAlt = script_args[1]
if enabled:
if OverSchedForge.scheduler != "None":
K.KDiffusionSampler.get_sigmas = patchedKDiffusionSampler.get_sigmas
if hiresAlt != "default" and params.is_hr_pass == True:
K.KDiffusionSampler.get_sigmas = patchedKDiffusionSampler.get_sigmas
OverSchedForge.hiresAlt = hiresAlt
modules.sd_samplers_common.setup_img2img_steps = patchedKDiffusionSampler.setup_img2img_steps
K.KDiffusionSampler.sample = patchedKDiffusionSampler.sample
return