| | """ |
| | Adept Sampler FULL PORT for Automatic1111 WebUI |
| | Ported from ComfyUI/reForge extension |
| | |
| | COMPLETE VERSION with: |
| | - ALL Schedulers (16 types) |
| | - ALL Samplers (Euler, Euler A, Heun, DPM++ 2M, DPM++ 2S, LMS) |
| | - VAE Reflection |
| | - Dynamic Weight Scaling |
| | |
| | Version: 3.0 FULL |
| | """ |
| |
|
| | import torch |
| | import numpy as np |
| | import math |
| | from tqdm import trange |
| | from modules import scripts, shared, script_callbacks |
| | import gradio as gr |
| | import k_diffusion.sampling |
| |
|
| | |
| | |
| | |
| | ADEPT_STATE = { |
| | "enabled": False, |
| | "scale": 1.0, |
| | "shift": 0.0, |
| | "start_pct": 0.0, |
| | "end_pct": 1.0, |
| | "eta": 1.0, |
| | "s_noise": 1.0, |
| | "adaptive_eta": False, |
| | "scheduler": "Standard", |
| | "vae_reflection": False, |
| | } |
| |
|
| | |
| | ORIGINAL_SAMPLERS = {} |
| |
|
| | |
| | _vae_reflection_active = False |
| | _vae_original_padding_modes = {} |
| |
|
| | |
| | |
| | |
| |
|
| | def to_d(x, sigma, denoised): |
| | """Convert denoised prediction to derivative.""" |
| | diff = x - denoised |
| | safe_sigma = torch.clamp(sigma, min=1e-4) |
| | derivative = diff / safe_sigma |
| | |
| | sigma_adaptive_threshold = 1000.0 * (1.0 + sigma / 10.0) |
| | derivative_max = torch.abs(derivative).max() |
| | if derivative_max > sigma_adaptive_threshold: |
| | derivative = torch.clamp(derivative, -sigma_adaptive_threshold, sigma_adaptive_threshold) |
| | |
| | return derivative |
| |
|
| |
|
| | def get_ancestral_step(sigma, sigma_next, eta=1.0): |
| | """Calculate ancestral step sizes.""" |
| | if sigma_next == 0: |
| | return 0.0, 0.0 |
| | sigma_up = min(sigma_next, eta * (sigma_next ** 2 * (sigma ** 2 - sigma_next ** 2) / sigma ** 2) ** 0.5) |
| | sigma_down = (sigma_next ** 2 - sigma_up ** 2) ** 0.5 |
| | return sigma_down, sigma_up |
| |
|
| |
|
| | def compute_dynamic_scale(step_idx, total_steps, base_scale, start_pct, end_pct): |
| | """Compute dynamic scale based on progress.""" |
| | progress = step_idx / max(total_steps - 1, 1) |
| | |
| | if progress < start_pct or progress > end_pct: |
| | return 1.0 |
| | |
| | |
| | if progress < start_pct + 0.1: |
| | fade = (progress - start_pct) / 0.1 |
| | return 1.0 + (base_scale - 1.0) * fade |
| | elif progress > end_pct - 0.1: |
| | fade = (end_pct - progress) / 0.1 |
| | return 1.0 + (base_scale - 1.0) * fade |
| | else: |
| | return base_scale |
| |
|
| |
|
| | def default_noise_sampler(x): |
| | """Simple noise sampler fallback.""" |
| | def sampler(sigma, sigma_next): |
| | return torch.randn_like(x) |
| | return sampler |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AdeptWeightPatcher: |
| | """Context manager for safe model weight modification.""" |
| | |
| | def __init__(self, model, scale, shift): |
| | self.model = model |
| | self.scale = scale |
| | self.shift = shift |
| | self.backups = {} |
| | self.target_layers = [] |
| | |
| | |
| | for name, module in model.named_modules(): |
| | if any(block in name for block in ['input_blocks', 'middle_block', 'output_blocks']): |
| | if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)): |
| | if hasattr(module, 'weight') and module.weight is not None: |
| | self.target_layers.append((name, module)) |
| | |
| | def __enter__(self): |
| | if abs(self.scale - 1.0) < 1e-6 and abs(self.shift) < 1e-6: |
| | return self |
| | |
| | try: |
| | for name, module in self.target_layers: |
| | self.backups[name] = module.weight.data.clone() |
| | module.weight.data = module.weight.data * self.scale + self.shift |
| | except Exception as e: |
| | print(f"⚠️ Weight patching failed: {e}") |
| | self.__exit__(None, None, None) |
| | raise |
| | |
| | return self |
| | |
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | try: |
| | for name, module in self.target_layers: |
| | if name in self.backups: |
| | module.weight.data.copy_(self.backups[name]) |
| | self.backups.clear() |
| | except Exception as e: |
| | print(f"❌ CRITICAL: Failed to restore weights: {e}") |
| | for name, backup_data in self.backups.items(): |
| | try: |
| | for n, m in self.target_layers: |
| | if n == name: |
| | m.weight.data.copy_(backup_data) |
| | except: |
| | pass |
| | |
| | return False |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class VAEReflectionPatcher: |
| | """Context manager for VAE reflection padding.""" |
| | |
| | def __init__(self, vae_model): |
| | self.vae_model = vae_model |
| | self.backups = {} |
| | |
| | def __enter__(self): |
| | global _vae_reflection_active, _vae_original_padding_modes |
| | |
| | if _vae_reflection_active or self.vae_model is None: |
| | return self |
| | |
| | _vae_original_padding_modes.clear() |
| | patched_count = 0 |
| | |
| | try: |
| | for name, module in self.vae_model.named_modules(): |
| | if isinstance(module, torch.nn.Conv2d): |
| | _vae_original_padding_modes[name] = module.padding_mode |
| | module.padding_mode = 'reflect' |
| | patched_count += 1 |
| | |
| | _vae_reflection_active = True |
| | print(f"🪞 VAE Reflection: Patched {patched_count} Conv2d layers") |
| | except Exception as e: |
| | print(f"❌ VAE Reflection failed: {e}") |
| | self.__exit__(None, None, None) |
| | |
| | return self |
| | |
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | global _vae_reflection_active, _vae_original_padding_modes |
| | |
| | if self.vae_model is None: |
| | _vae_reflection_active = False |
| | _vae_original_padding_modes.clear() |
| | return False |
| | |
| | restored_count = 0 |
| | try: |
| | for name, module in self.vae_model.named_modules(): |
| | if isinstance(module, torch.nn.Conv2d) and name in _vae_original_padding_modes: |
| | module.padding_mode = _vae_original_padding_modes[name] |
| | restored_count += 1 |
| | |
| | _vae_reflection_active = False |
| | _vae_original_padding_modes.clear() |
| | print(f"🔄 VAE Reflection: Restored {restored_count} layers") |
| | except Exception as e: |
| | print(f"⚠️ VAE Reflection restore warning: {e}") |
| | |
| | return False |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def create_aos_v_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """AOS-V (Anime-Optimized Schedule for v-prediction models).""" |
| | rho = 7.0 |
| | |
| | p1_steps = int(num_steps * 0.2) |
| | p2_steps = int(num_steps * 0.6) |
| | |
| | ramp = torch.empty(num_steps, device=device, dtype=torch.float32) |
| | |
| | if p1_steps > 0: |
| | torch.linspace(0, 1, p1_steps, out=ramp[:p1_steps]) |
| | ramp[:p1_steps].pow_(0.5).mul_(0.6) |
| | |
| | if p2_steps > p1_steps: |
| | torch.linspace(0.6, 0.9, p2_steps - p1_steps, out=ramp[p1_steps:p2_steps]) |
| | |
| | if num_steps > p2_steps: |
| | torch.linspace(0, 1, num_steps - p2_steps, out=ramp[p2_steps:]) |
| | ramp[p2_steps:].pow_(3).mul_(0.1).add_(0.9) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | ramp.mul_(min_inv_rho - max_inv_rho).add_(max_inv_rho).pow_(rho) |
| | |
| | return torch.cat([ramp, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_aos_e_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """AOS-ε (Anime-Optimized Schedule for epsilon-prediction models).""" |
| | rho = 7.0 |
| | |
| | p1_frac, p2_frac = 0.35, 0.7 |
| | ramp_p1_val, ramp_p2_val = 0.4, 0.75 |
| |
|
| | p1_steps = int(num_steps * p1_frac) |
| | p2_steps = int(num_steps * p2_frac) |
| |
|
| | phase1_ramp = torch.linspace(0, 1, p1_steps, device=device) ** 1.5 * ramp_p1_val |
| | phase2_ramp = torch.linspace(ramp_p1_val, ramp_p2_val, p2_steps - p1_steps, device=device) |
| | phase3_base = torch.linspace(0, 1, num_steps - p2_steps, device=device) ** 0.7 |
| | phase3_ramp = phase3_base * (1 - ramp_p2_val) + ramp_p2_val |
| | |
| | if p1_steps == 0: phase1_ramp = torch.empty(0, device=device) |
| | if p2_steps - p1_steps == 0: phase2_ramp = torch.empty(0, device=device) |
| | if num_steps - p2_steps == 0: phase3_ramp = torch.empty(0, device=device) |
| |
|
| | ramp = torch.cat([phase1_ramp, phase2_ramp, phase3_ramp]) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_aos_akashic_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """AkashicAOS v2: Detail-Progressive Schedule for EQ-VAE SDXL models.""" |
| | rho = 7.0 |
| | |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | |
| | detail_power = 0.85 |
| | u_progressive = u ** detail_power |
| | |
| | mid_boost_strength = 0.08 |
| | mid_boost = mid_boost_strength * torch.sin(math.pi * u) * (1 - u * 0.5) |
| | |
| | u_modulated = u_progressive + mid_boost |
| | |
| | u_min, u_max = u_modulated.min(), u_modulated.max() |
| | if u_max - u_min > 1e-8: |
| | u_modulated = (u_modulated - u_min) / (u_max - u_min) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + u_modulated * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | for i in range(1, len(sigmas)): |
| | if sigmas[i] >= sigmas[i-1]: |
| | sigmas[i] = sigmas[i-1] * 0.995 |
| | max_ratio = 1.5 |
| | if i > 0 and sigmas[i-1] / sigmas[i] > max_ratio: |
| | sigmas[i] = sigmas[i-1] / max_ratio |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_entropic_sigmas(sigma_max, sigma_min, num_steps, power=6.0, device='cpu'): |
| | """Entropic power schedule.""" |
| | rho = 7.0 |
| | |
| | linear_ramp = torch.linspace(0, 1, num_steps, device=device) |
| | power_ramp = 1 - torch.linspace(1, 0, num_steps, device=device) ** power |
| | |
| | ramp = (linear_ramp + power_ramp) / 2.0 |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_snr_optimized_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Schedule optimized around log SNR = 0 region.""" |
| | rho = 7.0 |
| | |
| | log_snr_max = 2 * torch.log(sigma_max) |
| | log_snr_min = 2 * torch.log(sigma_min) |
| | |
| | t = torch.linspace(0, 1, num_steps, device=device) |
| | |
| | concentration_power = 3.0 |
| | sigmoid_t = torch.sigmoid(concentration_power * (t - 0.5)) |
| | |
| | linear_t = t |
| | blend_factor = 0.7 |
| | combined_t = blend_factor * sigmoid_t + (1 - blend_factor) * linear_t |
| | |
| | log_snr = log_snr_max + combined_t * (log_snr_min - log_snr_max) |
| | sigmas = torch.exp(log_snr / 2) |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_constant_rate_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Constant rate of distributional change.""" |
| | rho = 7.0 |
| | |
| | t = torch.linspace(0, 1, num_steps, device=device) |
| | corrected_t = t + 0.3 * torch.sin(math.pi * t) * (1 - t) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + corrected_t * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_adaptive_optimized_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Adaptive schedule combining multiple strategies.""" |
| | rho = 7.0 |
| | |
| | base_t = torch.linspace(0, 1, num_steps, device=device) |
| | |
| | strategies = [ |
| | lambda t: t, |
| | lambda t: t ** 0.8, |
| | lambda t: t + 0.2 * torch.sin(2 * math.pi * t) * (1 - t), |
| | lambda t: 1 / (1 + torch.exp(-3 * (t - 0.5))), |
| | ] |
| | |
| | weights = [0.2, 0.3, 0.2, 0.3] |
| | combined_t = sum(w * s(base_t) for w, s in zip(weights, strategies)) |
| | |
| | if (combined_t.max() - combined_t.min()) > 1e-6: |
| | combined_t = (combined_t - combined_t.min()) / (combined_t.max() - combined_t.min()) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + combined_t * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_cosine_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Cosine-annealed schedule.""" |
| | rho = 7.0 |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | t = (1 - torch.cos(math.pi * u)) / 2 |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + t * (min_inv_rho - max_inv_rho)) ** rho |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_logsnr_uniform_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Uniform in log-SNR space.""" |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | log_snr_max = 2 * torch.log(sigma_max) |
| | log_snr_min = 2 * torch.log(sigma_min) |
| | log_snr = log_snr_max + u * (log_snr_min - log_snr_max) |
| | sigmas = torch.exp(log_snr / 2) |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_tanh_midboost_sigmas(sigma_max, sigma_min, num_steps, device='cpu', k=4.0): |
| | """Concentrate steps near mid-range sigmas.""" |
| | rho = 7.0 |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | k_tensor = torch.tensor(k, device=device, dtype=u.dtype) |
| | t = 0.5 * (torch.tanh(k_tensor * (u - 0.5)) / torch.tanh(k_tensor / 2) + 1.0) |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + t * (min_inv_rho - max_inv_rho)) ** rho |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_exponential_tail_sigmas(sigma_max, sigma_min, num_steps, device='cpu', pivot=0.7, gamma=0.8, beta=5.0): |
| | """Faster early lock-in with extra resolution in final steps.""" |
| | rho = 7.0 |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | |
| | early_mask = u < pivot |
| | late_mask = ~early_mask |
| | |
| | t = torch.empty_like(u) |
| | t[early_mask] = (u[early_mask] / pivot) ** gamma * pivot |
| | late_u = u[late_mask] |
| | t[late_mask] = pivot + (1 - pivot) * (1 - torch.exp(-beta * (late_u - pivot) / (1 - pivot))) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + t * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_jittered_karras_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Karras schedule with controlled jitter.""" |
| | if num_steps <= 0: |
| | return torch.cat([sigma_max.unsqueeze(0), torch.zeros(1, device=device)]) |
| | |
| | rho = 7.0 |
| | indices = torch.arange(num_steps, device=device, dtype=torch.float32) |
| | denom = max(1, num_steps - 1) |
| | |
| | base = (indices + 0.5) / denom |
| | jitter_seed = torch.sin((indices + 1) * 2.3999632) |
| | jitter_strength = 0.35 |
| | jitter = jitter_seed * jitter_strength / denom |
| | |
| | u = torch.clamp(base + jitter, 0.0, 1.0) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + u * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_stochastic_sigmas(sigma_max, sigma_min, num_steps, device='cpu', noise_type='brownian', noise_scale=0.3, base_schedule='karras'): |
| | """Stochastic scheduler with controlled randomness.""" |
| | rho = 7.0 |
| | |
| | |
| | if base_schedule == 'karras': |
| | indices = torch.arange(num_steps, device=device, dtype=torch.float32) |
| | u = (indices / max(1, num_steps - 1)) ** (1 / rho) |
| | elif base_schedule == 'cosine': |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | u = (1 - torch.cos(math.pi * u)) / 2 |
| | else: |
| | u = torch.linspace(0, 1, num_steps, device=device) |
| | |
| | |
| | if noise_type == 'brownian': |
| | noise = torch.randn(num_steps, device=device).cumsum(0) |
| | noise = noise / noise.std() |
| | elif noise_type == 'uniform': |
| | noise = torch.rand(num_steps, device=device) * 2 - 1 |
| | else: |
| | noise = torch.randn(num_steps, device=device) |
| | |
| | u_noisy = u + noise * noise_scale / num_steps |
| | u_noisy = torch.clamp(u_noisy, 0, 1) |
| | |
| | |
| | u_noisy, _ = torch.sort(u_noisy, descending=True) |
| | |
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | sigmas = (max_inv_rho + u_noisy * (min_inv_rho - max_inv_rho)) ** rho |
| | |
| | return torch.cat([sigmas, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_jys_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """JYS (Jump Your Steps) dynamic scheduler.""" |
| | if num_steps <= 0: |
| | return torch.cat([sigma_max.unsqueeze(0), torch.zeros(1, device=device)]) |
| | if num_steps == 1: |
| | return torch.tensor([sigma_max.item(), 0.0], device=device) |
| | elif num_steps == 2: |
| | mid = (sigma_max + sigma_min) / 2 |
| | return torch.tensor([sigma_max.item(), mid.item(), 0.0], device=device) |
| | |
| | |
| | early_steps = max(1, int(num_steps * 0.2)) |
| | final_steps = max(1, int(num_steps * 0.2)) |
| | middle_steps = max(1, num_steps - early_steps - final_steps) |
| | |
| | sigma_max_val = sigma_max.item() if torch.is_tensor(sigma_max) else float(sigma_max) |
| | |
| | |
| | early_jump_size = max(50, (sigma_max_val - 600) // early_steps) |
| | early_sigmas = [] |
| | current_sigma = sigma_max_val |
| | for _ in range(early_steps): |
| | early_sigmas.append(current_sigma) |
| | current_sigma = max(600, current_sigma - early_jump_size) |
| | |
| | |
| | middle_sigmas = [] |
| | structure_steps = max(1, middle_steps // 2) |
| | structure_jump = max(10, (600 - 300) // structure_steps) |
| | current_sigma = 600 |
| | for _ in range(structure_steps): |
| | middle_sigmas.append(current_sigma) |
| | current_sigma = max(300, current_sigma - structure_jump) |
| | |
| | detail_steps = middle_steps - structure_steps |
| | if detail_steps > 0: |
| | detail_jump = max(5, (300 - 200) // detail_steps) |
| | current_sigma = 300 |
| | for _ in range(detail_steps): |
| | middle_sigmas.append(current_sigma) |
| | current_sigma = max(200, current_sigma - detail_jump) |
| | |
| | |
| | final_start = min(middle_sigmas) if middle_sigmas else 200 |
| | final_jump = max(5, final_start // final_steps) |
| | final_sigmas = [] |
| | current_sigma = final_start |
| | for _ in range(final_steps): |
| | final_sigmas.append(current_sigma) |
| | current_sigma = max(0, current_sigma - final_jump) |
| | |
| | all_sigmas = early_sigmas + middle_sigmas + final_sigmas |
| | unique_sigmas = list(dict.fromkeys(all_sigmas)) |
| | unique_sigmas.sort(reverse=True) |
| | |
| | |
| | while len(unique_sigmas) < num_steps: |
| | for i in range(len(unique_sigmas) - 1): |
| | mid = (unique_sigmas[i] + unique_sigmas[i + 1]) / 2 |
| | if mid not in unique_sigmas: |
| | unique_sigmas.insert(i + 1, mid) |
| | if len(unique_sigmas) >= num_steps: |
| | break |
| | |
| | if len(unique_sigmas) > num_steps: |
| | unique_sigmas = unique_sigmas[:num_steps] |
| | |
| | if unique_sigmas[-1] != 0: |
| | unique_sigmas.append(0) |
| | |
| | return torch.tensor(unique_sigmas, device=device, dtype=torch.float32) |
| |
|
| |
|
| | def create_hybrid_jys_karras_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """Hybrid: JYS mid-phase with Karras locks.""" |
| | if num_steps <= 0: |
| | return torch.cat([sigma_max.unsqueeze(0), torch.zeros(1, device=device)]) |
| |
|
| | rho = 7.0 |
| |
|
| | jys_sigmas = create_jys_sigmas(sigma_max, sigma_min, num_steps, device=device)[:-1] |
| |
|
| | indices = torch.arange(num_steps, device=device, dtype=torch.float32) |
| | denom = max(1, num_steps - 1) |
| | base = (indices + 0.5) / denom |
| | jitter_seed = torch.sin((indices + 1) * 2.3999632) |
| | jitter_strength = 0.35 |
| | jitter = jitter_seed * jitter_strength / denom |
| | u = torch.clamp(base + jitter, 0.0, 1.0) |
| |
|
| | min_inv_rho = sigma_min ** (1 / rho) |
| | max_inv_rho = sigma_max ** (1 / rho) |
| | karras_sigmas = (max_inv_rho + u * (min_inv_rho - max_inv_rho)) ** rho |
| |
|
| | positions = torch.linspace(0, 1, num_steps, device=device) |
| | jys_weight = torch.empty_like(positions) |
| | early_mask = positions < 0.3 |
| | mid_mask = (positions >= 0.3) & (positions < 0.8) |
| | late_mask = positions >= 0.8 |
| | jys_weight[early_mask] = 0.2 + 0.4 * (positions[early_mask] / 0.3) |
| | jys_weight[mid_mask] = 0.6 + 0.3 * ((positions[mid_mask] - 0.3) / 0.5) |
| | jys_weight[late_mask] = 0.9 |
| | jys_weight = jys_weight.clamp(0.2, 0.9) |
| |
|
| | log_jys = torch.log(jys_sigmas.clamp_min(1e-6)) |
| | log_karras = torch.log(karras_sigmas.clamp_min(1e-6)) |
| | log_hybrid = torch.lerp(log_karras, log_jys, jys_weight) |
| |
|
| | hybrid = torch.exp(log_hybrid) |
| |
|
| | smoothing = 1.0 - 0.05 * (1 - positions) ** 2 |
| | hybrid = hybrid * smoothing |
| |
|
| | for i in range(1, hybrid.shape[0]): |
| | if hybrid[i] > hybrid[i - 1]: |
| | hybrid[i] = hybrid[i - 1] * 0.999 |
| |
|
| | return torch.cat([hybrid, torch.zeros(1, device=device)]) |
| |
|
| |
|
| | def create_ays_sdxl_sigmas(sigma_max, sigma_min, num_steps, device='cpu'): |
| | """AYS (Align Your Steps) optimized for SDXL.""" |
| | |
| | AYS_SCHEDULES = { |
| | 10: [1.0000, 0.8751, 0.7502, 0.6254, 0.5004, 0.3755, 0.2506, 0.1253, 0.0502, 0.0000], |
| | 15: [1.0000, 0.9167, 0.8334, 0.7501, 0.6668, 0.5835, 0.5002, 0.4169, 0.3336, |
| | 0.2503, 0.1670, 0.0837, 0.0335, 0.0084, 0.0000], |
| | 20: [1.0000, 0.9375, 0.8750, 0.8125, 0.7500, 0.6875, 0.6250, 0.5625, 0.5000, |
| | 0.4375, 0.3750, 0.3125, 0.2500, 0.1875, 0.1250, 0.0625, 0.0313, 0.0156, |
| | 0.0039, 0.0000], |
| | 25: [1.0000, 0.9500, 0.9000, 0.8500, 0.8000, 0.7500, 0.7000, 0.6500, 0.6000, |
| | 0.5500, 0.5000, 0.4500, 0.4000, 0.3500, 0.3000, 0.2500, 0.2000, 0.1500, |
| | 0.1000, 0.0625, 0.0391, 0.0195, 0.0098, 0.0024, 0.0000], |
| | 30: [1.0000, 0.9583, 0.9167, 0.8750, 0.8333, 0.7917, 0.7500, 0.7083, 0.6667, |
| | 0.6250, 0.5833, 0.5417, 0.5000, 0.4583, 0.4167, 0.3750, 0.3333, 0.2917, |
| | 0.2500, 0.2083, 0.1667, 0.1250, 0.0833, 0.0521, 0.0326, 0.0163, 0.0081, |
| | 0.0041, 0.0010, 0.0000], |
| | } |
| | |
| | if num_steps in AYS_SCHEDULES: |
| | normalized = torch.tensor(AYS_SCHEDULES[num_steps], device=device, dtype=torch.float32) |
| | else: |
| | available_steps = sorted(AYS_SCHEDULES.keys()) |
| | |
| | if num_steps < available_steps[0]: |
| | ref_steps = available_steps[0] |
| | elif num_steps > available_steps[-1]: |
| | ref_steps = available_steps[-1] |
| | else: |
| | ref_steps = min([s for s in available_steps if s >= num_steps], default=available_steps[-1]) |
| | |
| | ref_schedule = np.array(AYS_SCHEDULES[ref_steps]) |
| | |
| | t_ref = np.linspace(0, 1, len(ref_schedule)) |
| | t_new = np.linspace(0, 1, num_steps + 1) |
| | |
| | log_ref = np.log(ref_schedule + 1e-8) |
| | log_ref[-1] = log_ref[-2] - 3.0 |
| | |
| | log_interp = np.interp(t_new, t_ref, log_ref) |
| | normalized_np = np.exp(log_interp) |
| | normalized_np[-1] = 0.0 |
| | |
| | normalized = torch.tensor(normalized_np, device=device, dtype=torch.float32) |
| | |
| | sigma_range = sigma_max - sigma_min |
| | sigmas = normalized * sigma_range + sigma_min |
| | |
| | sigmas[0] = sigma_max |
| | sigmas[-1] = 0.0 |
| | |
| | for i in range(1, len(sigmas) - 1): |
| | if sigmas[i] >= sigmas[i-1]: |
| | sigmas[i] = sigmas[i-1] * 0.999 |
| | |
| | return sigmas |
| |
|
| |
|
| | def apply_custom_scheduler(sigmas, scheduler_type="Standard"): |
| | """Apply custom scheduler to sigma schedule.""" |
| | if scheduler_type == "Standard" or len(sigmas) < 2: |
| | return sigmas |
| | |
| | sigma_min = sigmas[-1] if sigmas[-1] > 0 else sigmas[-2] * 0.001 |
| | sigma_max = sigmas[0] |
| | steps = len(sigmas) - 1 |
| | device = sigmas.device |
| | |
| | scheduler_map = { |
| | "AOS-V": create_aos_v_sigmas, |
| | "AOS-Epsilon": create_aos_e_sigmas, |
| | "AkashicAOS": create_aos_akashic_sigmas, |
| | "Entropic": create_entropic_sigmas, |
| | "SNR-Optimized": create_snr_optimized_sigmas, |
| | "Constant-Rate": create_constant_rate_sigmas, |
| | "Adaptive-Optimized": create_adaptive_optimized_sigmas, |
| | "Cosine-Annealed": create_cosine_sigmas, |
| | "LogSNR-Uniform": create_logsnr_uniform_sigmas, |
| | "Tanh Mid-Boost": create_tanh_midboost_sigmas, |
| | "Exponential Tail": create_exponential_tail_sigmas, |
| | "Jittered-Karras": create_jittered_karras_sigmas, |
| | "Stochastic": create_stochastic_sigmas, |
| | "JYS (Dynamic)": create_jys_sigmas, |
| | "Hybrid JYS-Karras": create_hybrid_jys_karras_sigmas, |
| | "AYS-SDXL": create_ays_sdxl_sigmas, |
| | } |
| | |
| | if scheduler_type in scheduler_map: |
| | try: |
| | return scheduler_map[scheduler_type](sigma_max, sigma_min, steps, device) |
| | except Exception as e: |
| | print(f"⚠️ Scheduler {scheduler_type} failed: {e}, using standard") |
| | return sigmas |
| | |
| | return sigmas |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @torch.no_grad() |
| | def sample_adept_euler(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.): |
| | """Euler sampler with Adept weight scaling.""" |
| | |
| | if not ADEPT_STATE.get('enabled', False): |
| | global ORIGINAL_SAMPLERS |
| | if 'euler' in ORIGINAL_SAMPLERS: |
| | return ORIGINAL_SAMPLERS['euler'](model, x, sigmas, extra_args, callback, disable, s_churn, s_tmin, s_tmax, s_noise) |
| | return _basic_euler(model, x, sigmas, extra_args, callback, disable) |
| | |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | |
| | base_scale = ADEPT_STATE.get('scale', 1.0) |
| | shift = ADEPT_STATE.get('shift', 0.0) |
| | start_pct = ADEPT_STATE.get('start_pct', 0.0) |
| | end_pct = ADEPT_STATE.get('end_pct', 1.0) |
| | |
| | |
| | try: |
| | unet_model = shared.sd_model.model.diffusion_model |
| | except AttributeError: |
| | unet_model = None |
| | |
| | total_steps = len(sigmas) - 1 |
| | print(f"✅ Adept Euler active: scale={base_scale:.2f}") |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable, desc="Adept Euler"): |
| | sigma = sigmas[i] |
| | |
| | |
| | current_scale = compute_dynamic_scale(i, total_steps, base_scale, start_pct, end_pct) |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | else: |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | |
| | |
| | d = to_d(x, sigma, denoised) |
| | |
| | if torch.isnan(d).any() or torch.isinf(d).any(): |
| | d = torch.nan_to_num(d, nan=0.0, posinf=1.0, neginf=-1.0) |
| | |
| | dt = sigmas[i + 1] - sigma |
| | x = x + d * dt |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigma, 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | def _basic_euler(model, x, sigmas, extra_args=None, callback=None, disable=None): |
| | """Fallback basic Euler.""" |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable): |
| | denoised = model(x, sigmas[i] * s_in, **extra_args) |
| | d = to_d(x, sigmas[i], denoised) |
| | dt = sigmas[i + 1] - sigmas[i] |
| | x = x + d * dt |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigmas[i], 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | @torch.no_grad() |
| | def sample_adept_euler_ancestral(model, x, sigmas, extra_args=None, callback=None, |
| | disable=None, eta=1.0, s_noise=1.0, noise_sampler=None): |
| | """Euler Ancestral with Adept weight scaling.""" |
| | |
| | if not ADEPT_STATE.get('enabled', False): |
| | global ORIGINAL_SAMPLERS |
| | if 'euler_ancestral' in ORIGINAL_SAMPLERS: |
| | return ORIGINAL_SAMPLERS['euler_ancestral'](model, x, sigmas, extra_args, callback, disable, eta, s_noise, noise_sampler) |
| | return _basic_euler_ancestral(model, x, sigmas, extra_args, callback, disable, eta, s_noise) |
| | |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | |
| | base_scale = ADEPT_STATE.get('scale', 1.0) |
| | shift = ADEPT_STATE.get('shift', 0.0) |
| | start_pct = ADEPT_STATE.get('start_pct', 0.0) |
| | end_pct = ADEPT_STATE.get('end_pct', 1.0) |
| | use_adaptive_eta = ADEPT_STATE.get('adaptive_eta', False) |
| | current_eta = ADEPT_STATE.get('eta', eta) |
| | current_s_noise = ADEPT_STATE.get('s_noise', s_noise) |
| | |
| | |
| | try: |
| | unet_model = shared.sd_model.model.diffusion_model |
| | except AttributeError: |
| | unet_model = None |
| | |
| | if noise_sampler is None: |
| | noise_sampler = default_noise_sampler(x) |
| | |
| | total_steps = len(sigmas) - 1 |
| | print(f"✅ Adept Euler A active: scale={base_scale:.2f}, eta={current_eta:.2f}") |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable, desc="Adept Euler A"): |
| | sigma = sigmas[i] |
| | sigma_next = sigmas[i + 1] |
| | |
| | progress = i / max(total_steps, 1) |
| | |
| | |
| | if use_adaptive_eta: |
| | if progress < 0.3: |
| | current_eta = eta * 1.08 |
| | elif progress < 0.7: |
| | current_eta = eta * 0.95 |
| | else: |
| | current_eta = eta * 1.02 |
| | else: |
| | current_eta = eta |
| | |
| | |
| | current_scale = compute_dynamic_scale(i, total_steps, base_scale, start_pct, end_pct) |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | else: |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | |
| | |
| | sigma_down, sigma_up = get_ancestral_step(sigma, sigma_next, current_eta) |
| | d = to_d(x, sigma, denoised) |
| | |
| | if torch.isnan(d).any() or torch.isinf(d).any(): |
| | d = torch.nan_to_num(d, nan=0.0, posinf=1.0, neginf=-1.0) |
| | |
| | dt = sigma_down - sigma |
| | x = x + d * dt |
| | |
| | if sigma_up > 0: |
| | noise = noise_sampler(sigma, sigma_next) * current_s_noise |
| | x = x + noise * sigma_up |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigma, 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | def _basic_euler_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None, eta=1.0, s_noise=1.0): |
| | """Fallback basic Euler Ancestral.""" |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | noise_sampler = default_noise_sampler(x) |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable): |
| | denoised = model(x, sigmas[i] * s_in, **extra_args) |
| | sigma_down, sigma_up = get_ancestral_step(sigmas[i], sigmas[i + 1], eta) |
| | d = to_d(x, sigmas[i], denoised) |
| | dt = sigma_down - sigmas[i] |
| | x = x + d * dt |
| | if sigma_up > 0: |
| | x = x + noise_sampler(sigmas[i], sigmas[i + 1]) * s_noise * sigma_up |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigmas[i], 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | @torch.no_grad() |
| | def sample_adept_heun(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.): |
| | """Heun sampler with Adept weight scaling.""" |
| | |
| | if not ADEPT_STATE.get('enabled', False): |
| | global ORIGINAL_SAMPLERS |
| | if 'heun' in ORIGINAL_SAMPLERS: |
| | return ORIGINAL_SAMPLERS['heun'](model, x, sigmas, extra_args, callback, disable, s_churn, s_tmin, s_tmax, s_noise) |
| | return _basic_heun(model, x, sigmas, extra_args, callback, disable) |
| | |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | |
| | base_scale = ADEPT_STATE.get('scale', 1.0) |
| | shift = ADEPT_STATE.get('shift', 0.0) |
| | start_pct = ADEPT_STATE.get('start_pct', 0.0) |
| | end_pct = ADEPT_STATE.get('end_pct', 1.0) |
| | |
| | |
| | try: |
| | unet_model = shared.sd_model.model.diffusion_model |
| | except AttributeError: |
| | unet_model = None |
| | |
| | total_steps = len(sigmas) - 1 |
| | print(f"✅ Adept Heun active: scale={base_scale:.2f}") |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable, desc="Adept Heun"): |
| | sigma = sigmas[i] |
| | sigma_next = sigmas[i + 1] |
| | |
| | |
| | current_scale = compute_dynamic_scale(i, total_steps, base_scale, start_pct, end_pct) |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | else: |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | |
| | d = to_d(x, sigma, denoised) |
| | |
| | if torch.isnan(d).any() or torch.isinf(d).any(): |
| | d = torch.nan_to_num(d, nan=0.0, posinf=1.0, neginf=-1.0) |
| | |
| | dt = sigma_next - sigma |
| | |
| | if sigma_next == 0: |
| | |
| | x = x + d * dt |
| | else: |
| | |
| | x_2 = x + d * dt |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised_2 = model(x_2, sigma_next * s_in, **extra_args) |
| | else: |
| | denoised_2 = model(x_2, sigma_next * s_in, **extra_args) |
| | |
| | d_2 = to_d(x_2, sigma_next, denoised_2) |
| | |
| | if torch.isnan(d_2).any() or torch.isinf(d_2).any(): |
| | d_2 = torch.nan_to_num(d_2, nan=0.0, posinf=1.0, neginf=-1.0) |
| | |
| | |
| | d_prime = (d + d_2) / 2 |
| | x = x + d_prime * dt |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigma, 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | def _basic_heun(model, x, sigmas, extra_args=None, callback=None, disable=None): |
| | """Fallback basic Heun.""" |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable): |
| | denoised = model(x, sigmas[i] * s_in, **extra_args) |
| | d = to_d(x, sigmas[i], denoised) |
| | dt = sigmas[i + 1] - sigmas[i] |
| | |
| | if sigmas[i + 1] == 0: |
| | x = x + d * dt |
| | else: |
| | x_2 = x + d * dt |
| | denoised_2 = model(x_2, sigmas[i + 1] * s_in, **extra_args) |
| | d_2 = to_d(x_2, sigmas[i + 1], denoised_2) |
| | d_prime = (d + d_2) / 2 |
| | x = x + d_prime * dt |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigmas[i], 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | @torch.no_grad() |
| | def sample_adept_dpmpp_2m(model, x, sigmas, extra_args=None, callback=None, disable=None): |
| | """DPM++ 2M sampler with Adept weight scaling.""" |
| | |
| | if not ADEPT_STATE.get('enabled', False): |
| | global ORIGINAL_SAMPLERS |
| | if 'dpmpp_2m' in ORIGINAL_SAMPLERS: |
| | return ORIGINAL_SAMPLERS['dpmpp_2m'](model, x, sigmas, extra_args, callback, disable) |
| | return _basic_dpmpp_2m(model, x, sigmas, extra_args, callback, disable) |
| | |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | |
| | base_scale = ADEPT_STATE.get('scale', 1.0) |
| | shift = ADEPT_STATE.get('shift', 0.0) |
| | start_pct = ADEPT_STATE.get('start_pct', 0.0) |
| | end_pct = ADEPT_STATE.get('end_pct', 1.0) |
| | |
| | |
| | try: |
| | unet_model = shared.sd_model.model.diffusion_model |
| | except AttributeError: |
| | unet_model = None |
| | |
| | total_steps = len(sigmas) - 1 |
| | print(f"✅ Adept DPM++ 2M active: scale={base_scale:.2f}") |
| | |
| | old_denoised = None |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable, desc="Adept DPM++ 2M"): |
| | sigma = sigmas[i] |
| | sigma_next = sigmas[i + 1] |
| | |
| | |
| | current_scale = compute_dynamic_scale(i, total_steps, base_scale, start_pct, end_pct) |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | else: |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | |
| | |
| | t, t_next = sigma, sigma_next |
| | h = t_next - t |
| | |
| | if old_denoised is None or sigma_next == 0: |
| | |
| | x = (sigma_next / sigma) * x - (-h).expm1() * denoised |
| | else: |
| | |
| | h_last = t - sigmas[i - 1] |
| | r = h_last / h |
| | denoised_d = (1 + 1 / (2 * r)) * denoised - (1 / (2 * r)) * old_denoised |
| | x = (sigma_next / sigma) * x - (-h).expm1() * denoised_d |
| | |
| | old_denoised = denoised |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigma, 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | def _basic_dpmpp_2m(model, x, sigmas, extra_args=None, callback=None, disable=None): |
| | """Fallback basic DPM++ 2M.""" |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | old_denoised = None |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable): |
| | denoised = model(x, sigmas[i] * s_in, **extra_args) |
| | t, t_next = sigmas[i], sigmas[i + 1] |
| | h = t_next - t |
| | |
| | if old_denoised is None or sigmas[i + 1] == 0: |
| | x = (t_next / t) * x - (-h).expm1() * denoised |
| | else: |
| | h_last = t - sigmas[i - 1] |
| | r = h_last / h |
| | denoised_d = (1 + 1 / (2 * r)) * denoised - (1 / (2 * r)) * old_denoised |
| | x = (t_next / t) * x - (-h).expm1() * denoised_d |
| | |
| | old_denoised = denoised |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigmas[i], 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | @torch.no_grad() |
| | def sample_adept_dpmpp_2s_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None, eta=1.0, s_noise=1.0, noise_sampler=None): |
| | """DPM++ 2S Ancestral with Adept weight scaling.""" |
| | |
| | if not ADEPT_STATE.get('enabled', False): |
| | global ORIGINAL_SAMPLERS |
| | if 'dpmpp_2s_ancestral' in ORIGINAL_SAMPLERS: |
| | return ORIGINAL_SAMPLERS['dpmpp_2s_ancestral'](model, x, sigmas, extra_args, callback, disable, eta, s_noise, noise_sampler) |
| | return _basic_dpmpp_2s_ancestral(model, x, sigmas, extra_args, callback, disable, eta, s_noise) |
| | |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | |
| | base_scale = ADEPT_STATE.get('scale', 1.0) |
| | shift = ADEPT_STATE.get('shift', 0.0) |
| | start_pct = ADEPT_STATE.get('start_pct', 0.0) |
| | end_pct = ADEPT_STATE.get('end_pct', 1.0) |
| | current_eta = ADEPT_STATE.get('eta', eta) |
| | current_s_noise = ADEPT_STATE.get('s_noise', s_noise) |
| | |
| | |
| | try: |
| | unet_model = shared.sd_model.model.diffusion_model |
| | except AttributeError: |
| | unet_model = None |
| | |
| | if noise_sampler is None: |
| | noise_sampler = default_noise_sampler(x) |
| | |
| | total_steps = len(sigmas) - 1 |
| | print(f"✅ Adept DPM++ 2S A active: scale={base_scale:.2f}") |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable, desc="Adept DPM++ 2S A"): |
| | sigma = sigmas[i] |
| | sigma_next = sigmas[i + 1] |
| | |
| | |
| | current_scale = compute_dynamic_scale(i, total_steps, base_scale, start_pct, end_pct) |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | else: |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | |
| | |
| | sigma_down, sigma_up = get_ancestral_step(sigma, sigma_next, current_eta) |
| | |
| | if sigma_down == 0: |
| | d = to_d(x, sigma, denoised) |
| | x = x + d * (sigma_down - sigma) |
| | else: |
| | |
| | t, t_next = sigma, sigma_down |
| | h = t_next - t |
| | s = t + h * 0.5 |
| | |
| | |
| | x_mid = (s / t) * x - (-(h * 0.5)).expm1() * denoised |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised_mid = model(x_mid, s * s_in, **extra_args) |
| | else: |
| | denoised_mid = model(x_mid, s * s_in, **extra_args) |
| | |
| | |
| | x = (t_next / t) * x - (-h).expm1() * denoised_mid |
| | |
| | |
| | if sigma_up > 0: |
| | noise = noise_sampler(sigma, sigma_next) * current_s_noise |
| | x = x + noise * sigma_up |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigma, 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | def _basic_dpmpp_2s_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None, eta=1.0, s_noise=1.0): |
| | """Fallback basic DPM++ 2S Ancestral.""" |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | noise_sampler = default_noise_sampler(x) |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable): |
| | denoised = model(x, sigmas[i] * s_in, **extra_args) |
| | sigma_down, sigma_up = get_ancestral_step(sigmas[i], sigmas[i + 1], eta) |
| | |
| | if sigma_down == 0: |
| | d = to_d(x, sigmas[i], denoised) |
| | x = x + d * (sigma_down - sigmas[i]) |
| | else: |
| | t, t_next = sigmas[i], sigma_down |
| | h = t_next - t |
| | s = t + h * 0.5 |
| | x_mid = (s / t) * x - (-(h * 0.5)).expm1() * denoised |
| | denoised_mid = model(x_mid, s * s_in, **extra_args) |
| | x = (t_next / t) * x - (-h).expm1() * denoised_mid |
| | |
| | if sigma_up > 0: |
| | x = x + noise_sampler(sigmas[i], sigmas[i + 1]) * s_noise * sigma_up |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigmas[i], 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | @torch.no_grad() |
| | def sample_adept_lms(model, x, sigmas, extra_args=None, callback=None, disable=None, order=4): |
| | """LMS sampler with Adept weight scaling.""" |
| | |
| | if not ADEPT_STATE.get('enabled', False): |
| | global ORIGINAL_SAMPLERS |
| | if 'lms' in ORIGINAL_SAMPLERS: |
| | return ORIGINAL_SAMPLERS['lms'](model, x, sigmas, extra_args, callback, disable, order) |
| | return _basic_lms(model, x, sigmas, extra_args, callback, disable, order) |
| | |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | |
| | |
| | base_scale = ADEPT_STATE.get('scale', 1.0) |
| | shift = ADEPT_STATE.get('shift', 0.0) |
| | start_pct = ADEPT_STATE.get('start_pct', 0.0) |
| | end_pct = ADEPT_STATE.get('end_pct', 1.0) |
| | |
| | |
| | try: |
| | unet_model = shared.sd_model.model.diffusion_model |
| | except AttributeError: |
| | unet_model = None |
| | |
| | total_steps = len(sigmas) - 1 |
| | print(f"✅ Adept LMS active: scale={base_scale:.2f}, order={order}") |
| | |
| | ds = [] |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable, desc="Adept LMS"): |
| | sigma = sigmas[i] |
| | |
| | |
| | current_scale = compute_dynamic_scale(i, total_steps, base_scale, start_pct, end_pct) |
| | |
| | |
| | if unet_model is not None and abs(current_scale - 1.0) > 1e-6: |
| | with AdeptWeightPatcher(unet_model, current_scale, shift): |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | else: |
| | denoised = model(x, sigma * s_in, **extra_args) |
| | |
| | d = to_d(x, sigma, denoised) |
| | ds.append(d) |
| | |
| | if len(ds) > order: |
| | ds.pop(0) |
| | |
| | |
| | cur_order = min(i + 1, order) |
| | coeffs = [1.0] |
| | |
| | for j in range(1, cur_order): |
| | prod = 1.0 |
| | for k in range(cur_order): |
| | if k != j: |
| | prod *= (sigmas[i] - sigmas[i - k]) / (sigmas[i - j] - sigmas[i - k]) |
| | coeffs.append(prod) |
| | |
| | |
| | d_multistep = sum(c * d_val for c, d_val in zip(coeffs, reversed(ds[-cur_order:]))) |
| | |
| | dt = sigmas[i + 1] - sigma |
| | x = x + d_multistep * dt |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigma, 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | def _basic_lms(model, x, sigmas, extra_args=None, callback=None, disable=None, order=4): |
| | """Fallback basic LMS.""" |
| | extra_args = {} if extra_args is None else extra_args |
| | s_in = x.new_ones([x.shape[0]]) |
| | ds = [] |
| | |
| | for i in trange(len(sigmas) - 1, disable=disable): |
| | denoised = model(x, sigmas[i] * s_in, **extra_args) |
| | d = to_d(x, sigmas[i], denoised) |
| | ds.append(d) |
| | |
| | if len(ds) > order: |
| | ds.pop(0) |
| | |
| | cur_order = min(i + 1, order) |
| | coeffs = [1.0] |
| | for j in range(1, cur_order): |
| | prod = 1.0 |
| | for k in range(cur_order): |
| | if k != j: |
| | prod *= (sigmas[i] - sigmas[i - k]) / (sigmas[i - j] - sigmas[i - k]) |
| | coeffs.append(prod) |
| | |
| | d_multistep = sum(c * d_val for c, d_val in zip(coeffs, reversed(ds[-cur_order:]))) |
| | dt = sigmas[i + 1] - sigmas[i] |
| | x = x + d_multistep * dt |
| | |
| | if callback is not None: |
| | callback({'x': x, 'i': i, 'sigma': sigmas[i], 'denoised': denoised}) |
| | |
| | return x |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def patch_k_diffusion(): |
| | """Apply monkey patches to ALL k-diffusion samplers.""" |
| | global ORIGINAL_SAMPLERS |
| | |
| | samplers_to_patch = { |
| | 'sample_euler': sample_adept_euler, |
| | 'sample_euler_ancestral': sample_adept_euler_ancestral, |
| | 'sample_heun': sample_adept_heun, |
| | 'sample_dpmpp_2m': sample_adept_dpmpp_2m, |
| | 'sample_dpmpp_2s_ancestral': sample_adept_dpmpp_2s_ancestral, |
| | 'sample_lms': sample_adept_lms, |
| | } |
| | |
| | patched_count = 0 |
| | for original_name, adept_func in samplers_to_patch.items(): |
| | if hasattr(k_diffusion.sampling, original_name): |
| | |
| | if original_name not in ORIGINAL_SAMPLERS: |
| | original_func = getattr(k_diffusion.sampling, original_name) |
| | ORIGINAL_SAMPLERS[original_name.replace('sample_', '')] = original_func |
| | |
| | |
| | setattr(k_diffusion.sampling, original_name, adept_func) |
| | patched_count += 1 |
| | |
| | print(f"✅ Adept Sampler v3 FULL: Patched {patched_count} samplers") |
| | print(f" Samplers: Euler, Euler A, Heun, DPM++ 2M, DPM++ 2S A, LMS") |
| | print(f" Schedulers: 16 types available") |
| |
|
| |
|
| | def unpatch_k_diffusion(): |
| | """Restore original k-diffusion samplers.""" |
| | global ORIGINAL_SAMPLERS |
| | |
| | samplers_to_restore = { |
| | 'euler': 'sample_euler', |
| | 'euler_ancestral': 'sample_euler_ancestral', |
| | 'heun': 'sample_heun', |
| | 'dpmpp_2m': 'sample_dpmpp_2m', |
| | 'dpmpp_2s_ancestral': 'sample_dpmpp_2s_ancestral', |
| | 'lms': 'sample_lms', |
| | } |
| | |
| | restored_count = 0 |
| | for key, attr_name in samplers_to_restore.items(): |
| | if key in ORIGINAL_SAMPLERS: |
| | setattr(k_diffusion.sampling, attr_name, ORIGINAL_SAMPLERS[key]) |
| | restored_count += 1 |
| | |
| | print(f"🔄 Adept Sampler: Restored {restored_count} original samplers") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AdeptSamplerScript(scripts.Script): |
| | """Adept Sampler FULL extension for A1111.""" |
| | |
| | def title(self): |
| | return "Adept Sampler v3 FULL" |
| | |
| | def show(self, is_img2img): |
| | return scripts.AlwaysVisible |
| | |
| | def ui(self, is_img2img): |
| | """Create UI elements.""" |
| | with gr.Accordion("Adept Sampler v3 FULL", open=False): |
| | enabled = gr.Checkbox( |
| | label="Enable Adept Sampler", |
| | value=False, |
| | elem_id="adept_enabled" |
| | ) |
| | |
| | gr.HTML("<p style='color: #888;'>Works with: Euler, Euler A, Heun, DPM++ 2M, DPM++ 2S A, LMS</p>") |
| | |
| | with gr.Row(): |
| | scale = gr.Slider( |
| | minimum=0.5, |
| | maximum=2.0, |
| | step=0.05, |
| | value=1.0, |
| | label="Weight Scale", |
| | elem_id="adept_scale" |
| | ) |
| | shift = gr.Slider( |
| | minimum=-0.5, |
| | maximum=0.5, |
| | step=0.01, |
| | value=0.0, |
| | label="Weight Shift", |
| | elem_id="adept_shift" |
| | ) |
| | |
| | with gr.Row(): |
| | start_pct = gr.Slider( |
| | minimum=0.0, |
| | maximum=1.0, |
| | step=0.05, |
| | value=0.0, |
| | label="Start Percent", |
| | elem_id="adept_start" |
| | ) |
| | end_pct = gr.Slider( |
| | minimum=0.0, |
| | maximum=1.0, |
| | step=0.05, |
| | value=1.0, |
| | label="End Percent", |
| | elem_id="adept_end" |
| | ) |
| | |
| | with gr.Row(): |
| | eta = gr.Slider( |
| | minimum=0.0, |
| | maximum=2.0, |
| | step=0.01, |
| | value=1.0, |
| | label="Eta (Ancestral samplers)", |
| | elem_id="adept_eta" |
| | ) |
| | s_noise = gr.Slider( |
| | minimum=0.0, |
| | maximum=2.0, |
| | step=0.01, |
| | value=1.0, |
| | label="S-Noise", |
| | elem_id="adept_s_noise" |
| | ) |
| | |
| | adaptive_eta = gr.Checkbox( |
| | label="Adaptive Eta (dynamic eta during sampling)", |
| | value=False, |
| | elem_id="adept_adaptive_eta" |
| | ) |
| | |
| | scheduler = gr.Dropdown( |
| | choices=[ |
| | "Standard", |
| | "AOS-V", |
| | "AOS-Epsilon", |
| | "AkashicAOS", |
| | "Entropic", |
| | "SNR-Optimized", |
| | "Constant-Rate", |
| | "Adaptive-Optimized", |
| | "Cosine-Annealed", |
| | "LogSNR-Uniform", |
| | "Tanh Mid-Boost", |
| | "Exponential Tail", |
| | "Jittered-Karras", |
| | "Stochastic", |
| | "JYS (Dynamic)", |
| | "Hybrid JYS-Karras", |
| | "AYS-SDXL", |
| | ], |
| | value="Standard", |
| | label="Scheduler Type", |
| | elem_id="adept_scheduler" |
| | ) |
| | |
| | vae_reflection = gr.Checkbox( |
| | label="Enable VAE Reflection (fixes edge artifacts for EQ-VAE)", |
| | value=False, |
| | elem_id="adept_vae_reflection" |
| | ) |
| | |
| | return [enabled, scale, shift, start_pct, end_pct, eta, s_noise, adaptive_eta, scheduler, vae_reflection] |
| | |
| | def process(self, p, enabled, scale, shift, start_pct, end_pct, eta, s_noise, adaptive_eta, scheduler, vae_reflection): |
| | """Process parameters and update global state.""" |
| | global ADEPT_STATE |
| | |
| | |
| | if enabled and scheduler != "Standard": |
| | |
| | original_sigmas = p.sampler.model_wrap.sigmas |
| | |
| | |
| | new_sigmas = apply_custom_scheduler(original_sigmas, scheduler) |
| | |
| | |
| | p.sampler.model_wrap.sigmas = new_sigmas |
| | |
| | print(f"📊 Applied scheduler: {scheduler}") |
| | |
| | |
| | ADEPT_STATE.update({ |
| | "enabled": enabled, |
| | "scale": scale, |
| | "shift": shift, |
| | "start_pct": start_pct, |
| | "end_pct": end_pct, |
| | "eta": eta, |
| | "s_noise": s_noise, |
| | "adaptive_eta": adaptive_eta, |
| | "scheduler": scheduler, |
| | "vae_reflection": vae_reflection, |
| | }) |
| | |
| | |
| | if enabled: |
| | p.extra_generation_params.update({ |
| | "Adept Sampler": "v3 FULL", |
| | "Adept Scale": scale, |
| | "Adept Shift": shift, |
| | "Adept Range": f"{start_pct:.0%}-{end_pct:.0%}", |
| | "Adept Eta": eta, |
| | "Adept S-Noise": s_noise, |
| | "Adept Adaptive Eta": adaptive_eta, |
| | "Adept Scheduler": scheduler, |
| | "Adept VAE Reflection": vae_reflection, |
| | }) |
| | |
| | def process_batch(self, p, *args, **kwargs): |
| | """Wrap entire batch in VAE Reflection if enabled.""" |
| | if ADEPT_STATE.get('vae_reflection', False): |
| | try: |
| | vae_model = shared.sd_model.first_stage_model |
| | with VAEReflectionPatcher(vae_model): |
| | |
| | pass |
| | except Exception as e: |
| | print(f"⚠️ VAE Reflection error: {e}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | patch_k_diffusion() |
| |
|
| | |
| | def on_script_unloaded(): |
| | unpatch_k_diffusion() |
| |
|
| | try: |
| | script_callbacks.on_script_unloaded(on_script_unloaded) |
| | except AttributeError: |
| | print("⚠️ Script unload callback not available") |
| |
|
| | print("🚀 Adept Sampler v3 FULL loaded!") |
| | print(" - 6 Samplers: Euler, Euler A, Heun, DPM++ 2M, DPM++ 2S A, LMS") |
| | print(" - 16 Schedulers: AOS-V, AOS-Epsilon, AkashicAOS, Entropic, SNR-Optimized,") |
| | print(" Constant-Rate, Adaptive-Optimized, Cosine-Annealed, LogSNR-Uniform,") |
| | print(" Tanh Mid-Boost, Exponential Tail, Jittered-Karras, Stochastic,") |
| | print(" JYS (Dynamic), Hybrid JYS-Karras, AYS-SDXL") |
| | print(" - VAE Reflection support") |
| | print(" - Dynamic Weight Scaling") |
| |
|