|
|
import torch |
|
|
import torch.nn as nn |
|
|
import math |
|
|
from typing import Tuple |
|
|
|
|
|
def cache_init(cache_interval, max_order, num_steps=None, |
|
|
enable_first_enhance=False, first_enhance_steps=3, |
|
|
enable_tailing_enhance=False, tailing_enhance_steps=1, |
|
|
low_freqs_order=0, high_freqs_order=2): |
|
|
cache_dic = {} |
|
|
cache_dic['counter']= 0 |
|
|
cache_dic['current_step'] = 0 |
|
|
cache_dic['cache_interval']= cache_interval |
|
|
cache_dic['max_order'] = max_order |
|
|
cache_dic['num_steps'] = num_steps |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cache_dic['enable_first_enhance'] = enable_first_enhance |
|
|
cache_dic['first_enhance_steps'] = first_enhance_steps |
|
|
|
|
|
|
|
|
cache_dic['enable_tailing_enhance'] = enable_tailing_enhance |
|
|
cache_dic['tailing_enhance_steps'] = tailing_enhance_steps |
|
|
|
|
|
|
|
|
cache_dic['low_freqs_order'] = low_freqs_order |
|
|
cache_dic['high_freqs_order'] = high_freqs_order |
|
|
|
|
|
|
|
|
cache_dic['enable_force_control']= False |
|
|
cache_dic['force_compute']=False |
|
|
return cache_dic |
|
|
|
|
|
class TaylorCacheContainer(nn.Module): |
|
|
def __init__(self, max_order): |
|
|
super().__init__() |
|
|
self.max_order = max_order |
|
|
|
|
|
for i in range(max_order + 1): |
|
|
self.register_buffer(f"derivative_{i}", None, persistent=False) |
|
|
self.register_buffer(f"temp_derivative_{i}", None, persistent=False) |
|
|
|
|
|
def get_derivative(self, order): |
|
|
return getattr(self, f"derivative_{order}") |
|
|
|
|
|
def set_derivative(self, order, tensor): |
|
|
setattr(self, f"derivative_{order}", tensor) |
|
|
|
|
|
def set_temp_derivative(self, order, tensor): |
|
|
setattr(self, f"temp_derivative_{order}", tensor) |
|
|
|
|
|
def get_temp_derivative(self, order): |
|
|
return getattr(self, f"temp_derivative_{order}") |
|
|
|
|
|
def clear_temp_derivative(self): |
|
|
for i in range(self.max_order + 1): |
|
|
setattr(self, f"temp_derivative_{i}", None) |
|
|
|
|
|
def move_temp_to_derivative(self): |
|
|
for i in range(self.max_order + 1): |
|
|
if self.get_temp_derivative(i) is not None: |
|
|
setattr(self, f"derivative_{i}", self.get_temp_derivative(i)) |
|
|
else: |
|
|
break |
|
|
self.clear_temp_derivative() |
|
|
|
|
|
def get_all_derivatives(self): |
|
|
return [getattr(self, f"derivative_{i}") for i in range(self.max_order + 1)] |
|
|
|
|
|
def get_all_filled_derivatives(self): |
|
|
return [self.get_derivative(i) for i in range(self.max_order + 1) if self.get_derivative(i) is not None] |
|
|
|
|
|
def taylor_formula(self, distance): |
|
|
output = 0 |
|
|
for i in range(len(self.get_all_filled_derivatives())): |
|
|
output += (1 / math.factorial(i)) * self.get_derivative(i) * (distance ** i) |
|
|
return output |
|
|
|
|
|
def derivatives_computation(self, x, distance): |
|
|
''' |
|
|
x: tensor, the new x_0 |
|
|
distance: int, the distance between the current step and the last full computation step |
|
|
''' |
|
|
self.set_temp_derivative(0, x) |
|
|
for i in range(self.max_order): |
|
|
if self.get_derivative(i) is not None: |
|
|
self.set_temp_derivative(i+1, (self.get_temp_derivative(i) - self.get_derivative(i)) / distance) |
|
|
else: |
|
|
break |
|
|
self.move_temp_to_derivative() |
|
|
|
|
|
def clear_derivatives(self): |
|
|
for i in range(self.max_order + 1): |
|
|
setattr(self, f"derivative_{i}", None) |
|
|
setattr(self, f"temp_derivative_{i}", None) |
|
|
|
|
|
|
|
|
@torch.compile |
|
|
def decomposition_FFT(x: torch.Tensor, cutoff_ratio: float = 0.1) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
""" |
|
|
Fast Fourier Transform frequency domain decomposition |
|
|
|
|
|
Args: |
|
|
x: Input tensor [B, H*W, D] |
|
|
cutoff_ratio: Cutoff frequency ratio (0~0.5) |
|
|
|
|
|
Returns: |
|
|
Tuple of (low_freq, high_freq) tensors with same dtype as input |
|
|
""" |
|
|
orig_dtype = x.dtype |
|
|
device = x.device |
|
|
|
|
|
x_fp32 = x.to(torch.float32) |
|
|
|
|
|
B, HW, D = x_fp32.shape |
|
|
freq = torch.fft.fft(x_fp32, dim=1) |
|
|
|
|
|
freqs = torch.fft.fftfreq(HW, d=1.0, device=device) |
|
|
cutoff = cutoff_ratio * freqs.abs().max() |
|
|
|
|
|
|
|
|
low_mask = freqs.abs() <= cutoff |
|
|
high_mask = ~low_mask |
|
|
|
|
|
low_mask = low_mask[None, :, None] |
|
|
high_mask = high_mask[None, :, None] |
|
|
|
|
|
low_freq_complex = freq * low_mask |
|
|
high_freq_complex = freq * high_mask |
|
|
|
|
|
|
|
|
low_fp32 = torch.fft.ifft(low_freq_complex, dim=1).real |
|
|
high_fp32 = torch.fft.ifft(high_freq_complex, dim=1).real |
|
|
|
|
|
low = low_fp32.to(device=device, dtype=orig_dtype) |
|
|
high = high_fp32.to(device=device, dtype=orig_dtype) |
|
|
|
|
|
return low, high |
|
|
|
|
|
@torch.compile |
|
|
def reconstruction(low_freq: torch.Tensor, high_freq: torch.Tensor) -> torch.Tensor: |
|
|
return low_freq + high_freq |
|
|
|
|
|
class CacheWithFreqsContainer(nn.Module): |
|
|
def __init__(self, max_order): |
|
|
super().__init__() |
|
|
self.max_order = max_order |
|
|
|
|
|
for i in range(max_order + 1): |
|
|
self.register_buffer(f"derivative_{i}_low_freqs", None, persistent=False) |
|
|
self.register_buffer(f"derivative_{i}_high_freqs", None, persistent=False) |
|
|
self.register_buffer(f"temp_derivative_{i}_low_freqs", None, persistent=False) |
|
|
self.register_buffer(f"temp_derivative_{i}_high_freqs", None, persistent=False) |
|
|
|
|
|
def get_derivative(self, order, freqs): |
|
|
return getattr(self, f"derivative_{order}_{freqs}") |
|
|
|
|
|
def set_derivative(self, order, freqs, tensor): |
|
|
setattr(self, f"derivative_{order}_{freqs}", tensor) |
|
|
|
|
|
def set_temp_derivative(self, order, freqs, tensor): |
|
|
setattr(self, f"temp_derivative_{order}_{freqs}", tensor) |
|
|
|
|
|
def get_temp_derivative(self, order, freqs): |
|
|
return getattr(self, f"temp_derivative_{order}_{freqs}") |
|
|
|
|
|
def move_temp_to_derivative(self): |
|
|
for i in range(self.max_order + 1): |
|
|
if self.get_temp_derivative(i, "low_freqs") is not None: |
|
|
setattr(self, f"derivative_{i}_low_freqs", self.get_temp_derivative(i, "low_freqs")) |
|
|
if self.get_temp_derivative(i, "high_freqs") is not None: |
|
|
setattr(self, f"derivative_{i}_high_freqs", self.get_temp_derivative(i, "high_freqs")) |
|
|
else: |
|
|
break |
|
|
self.clear_temp_derivative() |
|
|
|
|
|
def get_all_filled_derivatives(self, freqs): |
|
|
return [self.get_derivative(i, freqs) for i in range(self.max_order + 1) if self.get_derivative(i, freqs) is not None] |
|
|
|
|
|
def taylor_formula(self, distance): |
|
|
low_freqs_output = 0 |
|
|
high_freqs_output = 0 |
|
|
for i in range(len(self.get_all_filled_derivatives("low_freqs"))): |
|
|
low_freqs_output += (1 / math.factorial(i)) * self.get_derivative(i, "low_freqs") * (distance ** i) |
|
|
for i in range(len(self.get_all_filled_derivatives("high_freqs"))): |
|
|
high_freqs_output += (1 / math.factorial(i)) * self.get_derivative(i, "high_freqs") * (distance ** i) |
|
|
return reconstruction(low_freqs_output, high_freqs_output) |
|
|
|
|
|
def hermite_formula(self, distance): |
|
|
low_freqs_output = 0 |
|
|
high_freqs_output = 0 |
|
|
for i in range(len(self.get_all_filled_derivatives("low_freqs"))): |
|
|
low_freqs_output += (1 / math.factorial(i)) * self.get_derivative(i, "low_freqs") * (distance ** i) |
|
|
for i in range(len(self.get_all_filled_derivatives("high_freqs"))): |
|
|
high_freqs_output += (1 / math.factorial(i)) * self.get_derivative(i, "high_freqs") * (distance ** i) |
|
|
return reconstruction(low_freqs_output, high_freqs_output) |
|
|
|
|
|
def derivatives_computation(self, x, distance, low_freqs_order, high_freqs_order): |
|
|
''' |
|
|
x: tensor, the new x_0 |
|
|
distance: int, the distance between the current step and the last full computation step |
|
|
''' |
|
|
x_low, x_high = decomposition_FFT(x, cutoff_ratio=0.1) |
|
|
self.set_temp_derivative(0, "low_freqs", x_low) |
|
|
self.set_temp_derivative(0, "high_freqs", x_high) |
|
|
for i in range(low_freqs_order): |
|
|
if self.get_derivative(i, "low_freqs") is not None: |
|
|
self.set_temp_derivative(i+1, "low_freqs", (self.get_temp_derivative(i, "low_freqs") - self.get_derivative(i, "low_freqs")) / distance) |
|
|
for i in range(high_freqs_order): |
|
|
if self.get_derivative(i, "high_freqs") is not None: |
|
|
self.set_temp_derivative(i+1, "high_freqs", (self.get_temp_derivative(i, "high_freqs") - self.get_derivative(i, "high_freqs")) / distance) |
|
|
self.move_temp_to_derivative() |
|
|
|
|
|
def clear_temp_derivative(self): |
|
|
for i in range(self.max_order + 1): |
|
|
setattr(self, f"temp_derivative_{i}_low_freqs", None) |
|
|
setattr(self, f"temp_derivative_{i}_high_freqs", None) |
|
|
|
|
|
def clear_derivatives(self): |
|
|
for i in range(self.max_order + 1): |
|
|
setattr(self, f"derivative_{i}_low_freqs", None) |
|
|
setattr(self, f"derivative_{i}_high_freqs", None) |
|
|
setattr(self, f"temp_derivative_{i}_low_freqs", None) |
|
|
setattr(self, f"temp_derivative_{i}_high_freqs", None) |