llir / restormer.py
linxin02's picture
Upload portable Low_light_rainy_new code export
4336727 verified
## PromptIR: Prompting for All-in-One Blind Image Restoration
## Vaishnav Potlapalli, Syed Waqas Zamir, Salman Khan, and Fahad Shahbaz Khan
## https://arxiv.org/abs/2306.13090
import torch, torchvision
# print(torch.__version__)
import torch.nn as nn
import torch.nn.functional as F
from pdb import set_trace as stx
import numbers
from einops import rearrange
from einops.layers.torch import Rearrange
import time
import os
import sys
from net.arch_util import LayerNorm2d
from net.local_arch import Local_Base
from net.gbp_prior import rgb_to_hue
import torchvision.transforms as transforms
##########################################################################
## Layer Norm
def to_3d(x):
return rearrange(x, 'b c h w -> b (h w) c')
def to_4d(x,h,w):
return rearrange(x, 'b (h w) c -> b c h w',h=h,w=w)
class BiasFree_LayerNorm(nn.Module):
def __init__(self, normalized_shape):
super(BiasFree_LayerNorm, self).__init__()
if isinstance(normalized_shape, numbers.Integral):
normalized_shape = (normalized_shape,)
normalized_shape = torch.Size(normalized_shape)
assert len(normalized_shape) == 1
self.weight = nn.Parameter(torch.ones(normalized_shape))
self.normalized_shape = normalized_shape
def forward(self, x):
sigma = x.var(-1, keepdim=True, unbiased=False)
return x / torch.sqrt(sigma+1e-5) * self.weight
class WithBias_LayerNorm(nn.Module):
def __init__(self, normalized_shape):
super(WithBias_LayerNorm, self).__init__()
if isinstance(normalized_shape, numbers.Integral):
normalized_shape = (normalized_shape,)
normalized_shape = torch.Size(normalized_shape)
assert len(normalized_shape) == 1
self.weight = nn.Parameter(torch.ones(normalized_shape))
self.bias = nn.Parameter(torch.zeros(normalized_shape))
self.normalized_shape = normalized_shape
def forward(self, x):
mu = x.mean(-1, keepdim=True)
sigma = x.var(-1, keepdim=True, unbiased=False)
return (x - mu) / torch.sqrt(sigma+1e-5) * self.weight + self.bias
class LayerNorm(nn.Module):
def __init__(self, dim, LayerNorm_type):
super(LayerNorm, self).__init__()
if LayerNorm_type =='BiasFree':
self.body = BiasFree_LayerNorm(dim)
else:
self.body = WithBias_LayerNorm(dim)
def forward(self, x):
h, w = x.shape[-2:]
return to_4d(self.body(to_3d(x)), h, w)
##########################################################################
## Gated-Dconv Feed-Forward Network (GDFN)
class FeedForward(nn.Module):
def __init__(self, dim, ffn_expansion_factor, bias):
super(FeedForward, self).__init__()
hidden_features = int(dim*ffn_expansion_factor)
self.project_in = nn.Conv2d(dim, hidden_features*2, kernel_size=1, bias=bias)
self.dwconv = nn.Conv2d(hidden_features*2, hidden_features*2, kernel_size=3, stride=1, padding=1, groups=hidden_features*2, bias=bias)
self.project_out = nn.Conv2d(hidden_features, dim, kernel_size=1, bias=bias)
def forward(self, x):
x = self.project_in(x)
x1, x2 = self.dwconv(x).chunk(2, dim=1)
x = F.gelu(x1) * x2
x = self.project_out(x)
return x
##########################################################################
## Multi-DConv Head Transposed Self-Attention (MDTA)
class Attention(nn.Module):
def __init__(self, dim, num_heads, bias):
super(Attention, self).__init__()
self.num_heads = num_heads
self.temperature = nn.Parameter(torch.ones(num_heads, 1, 1))
self.qkv = nn.Conv2d(dim, dim*3, kernel_size=1, bias=bias)
self.qkv_dwconv = nn.Conv2d(dim*3, dim*3, kernel_size=3, stride=1, padding=1, groups=dim*3, bias=bias)
self.project_out = nn.Conv2d(dim, dim, kernel_size=1, bias=bias)
def forward(self, x):
b,c,h,w = x.shape
qkv = self.qkv_dwconv(self.qkv(x))
q,k,v = qkv.chunk(3, dim=1)
q = rearrange(q, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
k = rearrange(k, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
v = rearrange(v, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
q = torch.nn.functional.normalize(q, dim=-1)
k = torch.nn.functional.normalize(k, dim=-1)
attn = (q @ k.transpose(-2, -1)) * self.temperature
attn = attn.softmax(dim=-1)
out = (attn @ v)
out = rearrange(out, 'b head c (h w) -> b (head c) h w', head=self.num_heads, h=h, w=w)
out = self.project_out(out)
return out
class resblock(nn.Module):
def __init__(self, dim):
super(resblock, self).__init__()
# self.norm = LayerNorm(dim, LayerNorm_type='BiasFree')
self.body = nn.Sequential(nn.Conv2d(dim, dim, kernel_size=3, stride=1, padding=1, bias=False),
nn.PReLU(),
nn.Conv2d(dim, dim, kernel_size=3, stride=1, padding=1, bias=False))
def forward(self, x):
res = self.body((x))
res += x
return res
##########################################################################
## Resizing modules
class Downsample(nn.Module):
def __init__(self, n_feat):
super(Downsample, self).__init__()
self.body = nn.Sequential(nn.Conv2d(n_feat, n_feat//2, kernel_size=3, stride=1, padding=1, bias=False),
nn.PixelUnshuffle(2))
def forward(self, x):
return self.body(x)
class Upsample(nn.Module):
def __init__(self, n_feat):
super(Upsample, self).__init__()
self.body = nn.Sequential(nn.Conv2d(n_feat, n_feat*2, kernel_size=3, stride=1, padding=1, bias=False),
nn.PixelShuffle(2))
def forward(self, x):
return self.body(x)
##########################################################################
## Transformer Block
class TransformerBlock(nn.Module):
def __init__(self, dim, num_heads, ffn_expansion_factor, bias, LayerNorm_type):
super(TransformerBlock, self).__init__()
self.norm1 = LayerNorm(dim, LayerNorm_type)
self.attn = Attention(dim, num_heads, bias)
self.norm2 = LayerNorm(dim, LayerNorm_type)
self.ffn = FeedForward(dim, ffn_expansion_factor, bias)
def forward(self, x):
x = x + self.attn(self.norm1(x))
x = x + self.ffn(self.norm2(x))
return x
##########################################################################
## Overlapped image patch embedding with 3x3 Conv
class OverlapPatchEmbed(nn.Module):
def __init__(self, in_c=3, embed_dim=48, bias=False):
super(OverlapPatchEmbed, self).__init__()
self.proj = nn.Conv2d(in_c, embed_dim, kernel_size=3, stride=1, padding=1, bias=bias)
def forward(self, x):
x = self.proj(x)
return x
##########################################################################
##---------- Prompt Gen Module -----------------------
class PromptGenBlock(nn.Module):
def __init__(self,prompt_dim=128,prompt_len=5,prompt_size = 96,lin_dim = 192):
super(PromptGenBlock,self).__init__()
self.prompt_param = nn.Parameter(torch.rand(1,prompt_len,prompt_dim,prompt_size,prompt_size))
self.linear_layer = nn.Linear(lin_dim,prompt_len)
self.conv3x3 = nn.Conv2d(prompt_dim,prompt_dim,kernel_size=3,stride=1,padding=1,bias=False)
def forward(self,x):
B,C,H,W = x.shape
emb = x.mean(dim=(-2,-1))
prompt_weights = F.softmax(self.linear_layer(emb),dim=1)
prompt = prompt_weights.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1) * self.prompt_param.unsqueeze(0).repeat(B,1,1,1,1,1).squeeze(1)
prompt = torch.sum(prompt,dim=1)
prompt = F.interpolate(prompt,(H,W),mode="bilinear")
prompt = self.conv3x3(prompt)
return prompt
##########################################################################
##---------- PromptIR -----------------------
class PromptIR(nn.Module):
def __init__(self,
inp_channels=3,
out_channels=3,
dim = 48,
num_blocks = [4,6,6,8],
num_refinement_blocks = 4,
heads = [1,2,4,8],
ffn_expansion_factor = 2.66,
bias = False,
LayerNorm_type = 'WithBias', ## Other option 'BiasFree'
decoder = False,
):
super(PromptIR, self).__init__()
self.patch_embed = OverlapPatchEmbed(inp_channels, dim)
self.decoder = decoder
if self.decoder:
self.prompt1 = PromptGenBlock(prompt_dim=64,prompt_len=5,prompt_size = 64,lin_dim = 96)
self.prompt2 = PromptGenBlock(prompt_dim=128,prompt_len=5,prompt_size = 32,lin_dim = 192)
self.prompt3 = PromptGenBlock(prompt_dim=320,prompt_len=5,prompt_size = 16,lin_dim = 384)
self.chnl_reduce1 = nn.Conv2d(64,64,kernel_size=1,bias=bias)
self.chnl_reduce2 = nn.Conv2d(128,128,kernel_size=1,bias=bias)
self.chnl_reduce3 = nn.Conv2d(320,256,kernel_size=1,bias=bias)
self.reduce_noise_channel_1 = nn.Conv2d(dim + 64,dim,kernel_size=1,bias=bias)
self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.down1_2 = Downsample(dim) ## From Level 1 to Level 2
self.reduce_noise_channel_2 = nn.Conv2d(int(dim*2**1) + 128,int(dim*2**1),kernel_size=1,bias=bias)
self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.down2_3 = Downsample(int(dim*2**1)) ## From Level 2 to Level 3
self.reduce_noise_channel_3 = nn.Conv2d(int(dim*2**2) + 256,int(dim*2**2),kernel_size=1,bias=bias)
self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.down3_4 = Downsample(int(dim*2**2)) ## From Level 3 to Level 4
self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])])
self.up4_3 = Upsample(int(dim*2**2)) ## From Level 4 to Level 3
self.reduce_chan_level3 = nn.Conv2d(int(dim*2**1)+192, int(dim*2**2), kernel_size=1, bias=bias)
self.noise_level3 = TransformerBlock(dim=int(dim*2**2) + 512, num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type)
self.reduce_noise_level3 = nn.Conv2d(int(dim*2**2)+512,int(dim*2**2),kernel_size=1,bias=bias)
self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.up3_2 = Upsample(int(dim*2**2)) ## From Level 3 to Level 2
self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias)
self.noise_level2 = TransformerBlock(dim=int(dim*2**1) + 224, num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type)
self.reduce_noise_level2 = nn.Conv2d(int(dim*2**1)+224,int(dim*2**2),kernel_size=1,bias=bias)
self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.up2_1 = Upsample(int(dim*2**1)) ## From Level 2 to Level 1 (NO 1x1 conv to reduce channels)
self.noise_level1 = TransformerBlock(dim=int(dim*2**1)+64, num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type)
self.reduce_noise_level1 = nn.Conv2d(int(dim*2**1)+64,int(dim*2**1),kernel_size=1,bias=bias)
self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)])
self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias)
def forward(self, inp_img,noise_emb = None):
inp_enc_level1 = self.patch_embed(inp_img)
out_enc_level1 = self.encoder_level1(inp_enc_level1)
inp_enc_level2 = self.down1_2(out_enc_level1)
out_enc_level2 = self.encoder_level2(inp_enc_level2)
inp_enc_level3 = self.down2_3(out_enc_level2)
out_enc_level3 = self.encoder_level3(inp_enc_level3)
inp_enc_level4 = self.down3_4(out_enc_level3)
latent = self.latent(inp_enc_level4)
if self.decoder:
dec3_param = self.prompt3(latent)
latent = torch.cat([latent, dec3_param], 1)
latent = self.noise_level3(latent)
latent = self.reduce_noise_level3(latent)
inp_dec_level3 = self.up4_3(latent)
inp_dec_level3 = torch.cat([inp_dec_level3, out_enc_level3], 1)
inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3)
out_dec_level3 = self.decoder_level3(inp_dec_level3)
if self.decoder:
dec2_param = self.prompt2(out_dec_level3)
out_dec_level3 = torch.cat([out_dec_level3, dec2_param], 1)
out_dec_level3 = self.noise_level2(out_dec_level3)
out_dec_level3 = self.reduce_noise_level2(out_dec_level3)
inp_dec_level2 = self.up3_2(out_dec_level3)
inp_dec_level2 = torch.cat([inp_dec_level2, out_enc_level2], 1)
inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2)
out_dec_level2 = self.decoder_level2(inp_dec_level2)
if self.decoder:
dec1_param = self.prompt1(out_dec_level2)
out_dec_level2 = torch.cat([out_dec_level2, dec1_param], 1)
out_dec_level2 = self.noise_level1(out_dec_level2)
out_dec_level2 = self.reduce_noise_level1(out_dec_level2)
inp_dec_level1 = self.up2_1(out_dec_level2)
inp_dec_level1 = torch.cat([inp_dec_level1, out_enc_level1], 1)
out_dec_level1 = self.decoder_level1(inp_dec_level1)
out_dec_level1 = self.refinement(out_dec_level1)
out_dec_level1 = self.output(out_dec_level1) + inp_img
return out_dec_level1
class ch_shuffle_high_text(nn.Module):
def __init__(self, ch_dim,num_heads,LayerNorm_type,ffn_expansion_factor, bias,lin_ch=512):
super(ch_shuffle_high_text, self).__init__()
self.dim = ch_dim
self.linear_layer1 = nn.Linear(lin_ch,lin_ch)
self.linear_layer3 = nn.Linear(lin_ch,2*ch_dim)
# -----------------------------------------------------------------------
self.conv1x1 = nn.Conv2d(ch_dim, 2*ch_dim, kernel_size=1, stride=1, padding=0) #
self.conv_out = nn.Conv2d(2*ch_dim, ch_dim, kernel_size=1, stride=1, padding=0) #
self.norm1 = LayerNorm(ch_dim, LayerNorm_type)
self.norm2 = LayerNorm(ch_dim, LayerNorm_type)
self.norm3 = LayerNorm(ch_dim, LayerNorm_type)
self.select_attn = Topm_CrossAttention_Restormer(ch_dim, num_heads, bias=False)
self.ffn = FeedForward(ch_dim, ffn_expansion_factor, bias)
def forward(self, img_featur, text_code):
b,c,_,_ = img_featur.shape
img_feature2 = img_featur
text_code = self.linear_layer1(text_code)
text_code = self.linear_layer3(text_code)
soft_values, soft_indices = torch.topk(text_code, k=2*self.dim)
img_featur = self.conv1x1(img_featur)
shuffled_img = img_featur[torch.arange(b).unsqueeze(1), soft_indices, :, :] # shuffle
q = self.conv_out(shuffled_img)
att = self.select_attn(self.norm1(q),self.norm2(img_feature2))
output = att + self.ffn(self.norm3(att))
return output,img_feature2
class Topm_CrossAttention_Restormer(nn.Module):
def __init__(self, dim, num_heads, bias):
super(Topm_CrossAttention_Restormer, self).__init__()
self.num_heads = num_heads
self.temperature = nn.Parameter(torch.ones(num_heads, 1, 1))
self.kv = nn.Conv2d(dim, dim*2, kernel_size=1, bias=bias)
self.kv_dwconv = nn.Conv2d(dim*2, dim*2, kernel_size=3, stride=1, padding=1, groups=dim*2, bias=bias)
self.q = nn.Conv2d(dim, dim, kernel_size=1, bias=bias)
self.q_dwconv = nn.Conv2d(dim, dim, kernel_size=3, stride=1, padding=1, groups=dim, bias=bias)
self.project_out = nn.Conv2d(dim, dim, kernel_size=1, bias=bias)
self.attn_drop = nn.Dropout(0.)
self.attn4 = torch.nn.Parameter(torch.tensor([0.2]), requires_grad=True)
def forward(self, x_q, x_kv):
b,c,h,w = x_q.shape
q = self.q_dwconv(self.q(x_q))
kv = self.kv_dwconv(self.kv(x_kv))
k,v = kv.chunk(2, dim=1)
q = rearrange(q, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
k = rearrange(k, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
v = rearrange(v, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
q = torch.nn.functional.normalize(q, dim=-1)
k = torch.nn.functional.normalize(k, dim=-1)
_, _, C, _ = q.shape
mask4 = torch.zeros(b, self.num_heads, C, C, device=x_q.device, requires_grad=False)
attn = (q @ k.transpose(-2, -1)) * self.temperature
index = torch.topk(attn, k=int(C*9/10), dim=-1, largest=True)[1]
mask4.scatter_(-1, index, 1.)
attn4 = torch.where(mask4 > 0, attn, torch.full_like(attn, float('-inf')))
attn4 = attn4.softmax(dim=-1)
out4 = (attn4 @ v)
out = out4 * self.attn4
out = rearrange(out, 'b head c (h w) -> b (head c) h w', head=self.num_heads, h=h, w=w)
out = self.project_out(out)
return out
class ChannelShuffle_skip_textguaid(nn.Module):
"""ChannelShuffle model. Use forward(inp_img) only; text_code is replaced by a learnable embedding (no CLIP)."""
def __init__(self,
inp_channels=3,
out_channels=3,
dim = 48,
num_blocks = [4,6,6,8],
num_refinement_blocks = 4,
heads = [1,2,4,8],
ffn_expansion_factor = 2.66,
bias = False,
LayerNorm_type = 'WithBias', ## Other option 'BiasFree'
device = "cuda:0",
dual_pixel_task = False
):
super(ChannelShuffle_skip_textguaid, self).__init__()
self.device = device
# Learnable embedding replacing CLIP text (512-d); no CLIP dependency
self.learned_text = nn.Parameter(torch.zeros(1, 512))
self.patch_embed = OverlapPatchEmbed(inp_channels, dim)
self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.encoder_shuffle_channel1 = ch_shuffle_high_text(ch_dim = dim,num_heads=heads[0],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) # encoder level1 shuffle
self.down1_2 = Downsample(dim) ## From Level 1 to Level 2
self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.encoder_shuffle_channel2 = ch_shuffle_high_text(ch_dim = int(dim*2**1),num_heads=heads[1],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) # encoder level2 shuffle
self.down2_3 = Downsample(int(dim*2**1)) ## From Level 2 to Level 3
self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.encoder_shuffle_channel3 = ch_shuffle_high_text(ch_dim = int(dim*2**2),num_heads=heads[2],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) # encoder level3 shuffle
self.down3_4 = Downsample(int(dim*2**2)) ## From Level 3 to Level 4
self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])])
self.latent_shuffle_channel = ch_shuffle_high_text(ch_dim = int(dim*2**3),num_heads=heads[3],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) # latent latent shuffle
self.up4_3 = Upsample(int(dim*2**3)) ## From Level 4 to Level 3
self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias)
self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.up3_2 = Upsample(int(dim*2**2)) ## From Level 3 to Level 2
self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias)
self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.up2_1 = Upsample(int(dim*2**1)) ## From Level 2 to Level 1 (NO 1x1 conv to reduce channels)
self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)])
#### For Dual-Pixel Defocus Deblurring Task ####
# self.dual_pixel_task = dual_pixel_task
# if self.dual_pixel_task:
# self.skip_conv = nn.Conv2d(dim, int(dim*2**1), kernel_size=1, bias=bias)
# ###########################
self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias)
def forward(self, inp_img, text_code=None):
if text_code is None:
text_code = self.learned_text.expand(inp_img.size(0), -1)
inp_enc_level1 = self.patch_embed(inp_img)
out_enc_level1 = self.encoder_level1(inp_enc_level1) # ch dim:48-->dim:48
inp_enc_level2 = self.down1_2(out_enc_level1) # ch dim:48-->dim*2:96
out_enc_level2 = self.encoder_level2(inp_enc_level2) # ch dim*2:96-->dim*2:96
inp_enc_level3 = self.down2_3(out_enc_level2) # ch dim*2:96-->dim*2*2:192
out_enc_level3 = self.encoder_level3(inp_enc_level3) # ch dim*2*2:192-->dim*2*2:192
inp_enc_level4 = self.down3_4(out_enc_level3)
latent = self.latent(inp_enc_level4)
latent,_ = self.latent_shuffle_channel(latent,text_code) # latent latent shuffle
inp_dec_level3 = self.up4_3(latent)
outt1,_ = self.encoder_shuffle_channel3(out_enc_level3,text_code)
inp_dec_level3 = torch.cat([inp_dec_level3, outt1], 1)
inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3)
out_dec_level3 = self.decoder_level3(inp_dec_level3)
inp_dec_level2 = self.up3_2(out_dec_level3)
outt2,_ = self.encoder_shuffle_channel2(out_enc_level2,text_code)
inp_dec_level2 = torch.cat([inp_dec_level2, outt2], 1)
inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2)
out_dec_level2 = self.decoder_level2(inp_dec_level2)
inp_dec_level1 = self.up2_1(out_dec_level2)
outt3,_ = self.encoder_shuffle_channel1(out_enc_level1,text_code)
inp_dec_level1 = torch.cat([inp_dec_level1, outt3], 1)
out_dec_level1 = self.decoder_level1(inp_dec_level1)
out_dec_level1 = self.refinement(out_dec_level1)
#### For Dual-Pixel Defocus Deblurring Task ####
# if self.dual_pixel_task:
# out_dec_level1 = out_dec_level1 + self.skip_conv(inp_enc_level1)
# out_dec_level1 = self.output(out_dec_level1)
# ###########################
# else:
res = inp_img[:, :3] if inp_img.shape[1] == 4 else inp_img
out_dec_level1 = self.output(out_dec_level1) + res
return out_dec_level1
##########################################################################
## Prior Encoder: 无降质先验图 (hue) 单独走一套网络得到多尺度特征
##########################################################################
class PriorEncoder(nn.Module):
"""对 1 通道 hue 做多尺度编码,与主网络 encoder 各层分辨率对齐,用于后续融合。"""
def __init__(self, dim=48, bias=False):
super(PriorEncoder, self).__init__()
self.embed = nn.Conv2d(1, dim, kernel_size=3, stride=1, padding=1, bias=bias)
self.down1 = Downsample(dim) # dim -> 2*dim, H/2
self.down2 = Downsample(int(dim*2**1)) # 2*dim -> 4*dim, H/4
self.down3 = Downsample(int(dim*2**2)) # 4*dim -> 8*dim, H/8
def forward(self, hue):
# hue: (B, 1, H, W)
p1 = self.embed(hue) # (B, dim, H, W)
p2 = self.down1(p1) # (B, 2*dim, H/2, W/2)
p3 = self.down2(p2) # (B, 4*dim, H/4, W/4)
p4 = self.down3(p3) # (B, 8*dim, H/8, W/8)
return p1, p2, p3, p4
def _prior_fuse(main_feat, prior_feat, fuse_conv):
"""融合:concat(main, prior) -> 1x1 conv,再与 main 残差相加,引导 main 特征。"""
fused = torch.cat([main_feat, prior_feat], dim=1)
return main_feat + fuse_conv(fused)
##########################################################################
## ChannelShuffle_skip_textguaid + GBP (hue prior, liuxiao.pdf / GBPG-Net)
##########################################################################
class ChannelShuffleWithGBP(nn.Module):
"""
ChannelShuffle_skip_textguaid + 无降质先验:用 HSV 的 hue 通道 (GBP) 引导复原。
输入 RGB (B,3,H,W),内部 concat(RGB, hue) 成 4 通道再送入 ChannelShuffle_skip_textguaid。
"""
def __init__(self, inp_channels=3, out_channels=3, dim=48, num_blocks=[4,6,6,8],
num_refinement_blocks=4, heads=[1,2,4,8], ffn_expansion_factor=2.66,
bias=False, LayerNorm_type='WithBias', device='cuda:0', dual_pixel_task=False):
super(ChannelShuffleWithGBP, self).__init__()
self.net = ChannelShuffle_skip_textguaid(
inp_channels=4,
out_channels=3,
dim=dim,
num_blocks=num_blocks,
num_refinement_blocks=num_refinement_blocks,
heads=heads,
ffn_expansion_factor=ffn_expansion_factor,
bias=bias,
LayerNorm_type=LayerNorm_type,
device=device,
dual_pixel_task=dual_pixel_task,
)
def forward(self, inp_img, text_code=None):
hue = rgb_to_hue(inp_img)
x = torch.cat([inp_img, hue], dim=1)
return self.net(x, text_code=text_code)
##########################################################################
## ChannelShuffle + GBP Deep:先验图单独网络 → 多尺度特征与复原网络融合
##########################################################################
class ChannelShuffleWithGBPDeep(nn.Module):
"""
无降质先验 (hue) 单独经过 PriorEncoder 得到多尺度特征,
在 encoder 每一层与 ChannelShuffle 特征做融合(concat + 1x1 conv 残差),引导复原。
"""
def __init__(self, inp_channels=3, out_channels=3, dim=48, num_blocks=[4,6,6,8],
num_refinement_blocks=4, heads=[1,2,4,8], ffn_expansion_factor=2.66,
bias=False, LayerNorm_type='WithBias', device='cuda:0', dual_pixel_task=False):
super(ChannelShuffleWithGBPDeep, self).__init__()
self.device = device
self.learned_text = nn.Parameter(torch.zeros(1, 512))
self.prior_encoder = PriorEncoder(dim=dim, bias=bias)
self.fuse_conv1 = nn.Conv2d(dim * 2, dim, kernel_size=1, bias=bias)
self.fuse_conv2 = nn.Conv2d(int(dim*2**1) * 2, int(dim*2**1), kernel_size=1, bias=bias)
self.fuse_conv3 = nn.Conv2d(int(dim*2**2) * 2, int(dim*2**2), kernel_size=1, bias=bias)
self.fuse_conv4 = nn.Conv2d(int(dim*2**3) * 2, int(dim*2**3), kernel_size=1, bias=bias)
self.patch_embed = OverlapPatchEmbed(inp_channels, dim)
self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.encoder_shuffle_channel1 = ch_shuffle_high_text(ch_dim=dim, num_heads=heads[0], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias)
self.down1_2 = Downsample(dim)
self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.encoder_shuffle_channel2 = ch_shuffle_high_text(ch_dim=int(dim*2**1), num_heads=heads[1], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias)
self.down2_3 = Downsample(int(dim*2**1))
self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.encoder_shuffle_channel3 = ch_shuffle_high_text(ch_dim=int(dim*2**2), num_heads=heads[2], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias)
self.down3_4 = Downsample(int(dim*2**2))
self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])])
self.latent_shuffle_channel = ch_shuffle_high_text(ch_dim=int(dim*2**3), num_heads=heads[3], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias)
self.up4_3 = Upsample(int(dim*2**3))
self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias)
self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.up3_2 = Upsample(int(dim*2**2))
self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias)
self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.up2_1 = Upsample(int(dim*2**1))
self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)])
self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias)
def forward(self, inp_img, text_code=None):
if text_code is None:
text_code = self.learned_text.expand(inp_img.size(0), -1)
hue = rgb_to_hue(inp_img)
prior_1, prior_2, prior_3, prior_4 = self.prior_encoder(hue)
inp_enc_level1 = self.patch_embed(inp_img)
out_enc_level1 = self.encoder_level1(inp_enc_level1)
out_enc_level1 = _prior_fuse(out_enc_level1, prior_1, self.fuse_conv1)
inp_enc_level2 = self.down1_2(out_enc_level1)
out_enc_level2 = self.encoder_level2(inp_enc_level2)
out_enc_level2 = _prior_fuse(out_enc_level2, prior_2, self.fuse_conv2)
inp_enc_level3 = self.down2_3(out_enc_level2)
out_enc_level3 = self.encoder_level3(inp_enc_level3)
out_enc_level3 = _prior_fuse(out_enc_level3, prior_3, self.fuse_conv3)
inp_enc_level4 = self.down3_4(out_enc_level3)
latent = self.latent(inp_enc_level4)
latent = _prior_fuse(latent, prior_4, self.fuse_conv4)
latent, _ = self.latent_shuffle_channel(latent, text_code)
inp_dec_level3 = self.up4_3(latent)
outt1, _ = self.encoder_shuffle_channel3(out_enc_level3, text_code)
inp_dec_level3 = torch.cat([inp_dec_level3, outt1], 1)
inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3)
out_dec_level3 = self.decoder_level3(inp_dec_level3)
inp_dec_level2 = self.up3_2(out_dec_level3)
outt2, _ = self.encoder_shuffle_channel2(out_enc_level2, text_code)
inp_dec_level2 = torch.cat([inp_dec_level2, outt2], 1)
inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2)
out_dec_level2 = self.decoder_level2(inp_dec_level2)
inp_dec_level1 = self.up2_1(out_dec_level2)
outt3, _ = self.encoder_shuffle_channel1(out_enc_level1, text_code)
inp_dec_level1 = torch.cat([inp_dec_level1, outt3], 1)
out_dec_level1 = self.decoder_level1(inp_dec_level1)
out_dec_level1 = self.refinement(out_dec_level1)
out_dec_level1 = self.output(out_dec_level1) + inp_img
return out_dec_level1
##---------- Restormer -----------------------
class Restormer(nn.Module):
def __init__(self,
inp_channels=3,
out_channels=3,
dim = 48,
num_blocks = [4,6,6,8],
num_refinement_blocks = 4,
heads = [1,2,4,8],
ffn_expansion_factor = 2.66,
bias = False,
LayerNorm_type = 'WithBias', ## Other option 'BiasFree'
dual_pixel_task = False ## True for dual-pixel defocus deblurring only. Also set inp_channels=6
):
super(Restormer, self).__init__()
self.patch_embed = OverlapPatchEmbed(inp_channels, dim)
self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.down1_2 = Downsample(dim) ## From Level 1 to Level 2
self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.down2_3 = Downsample(int(dim*2**1)) ## From Level 2 to Level 3
self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.down3_4 = Downsample(int(dim*2**2)) ## From Level 3 to Level 4
self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])])
self.up4_3 = Upsample(int(dim*2**3)) ## From Level 4 to Level 3
self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias)
self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])
self.up3_2 = Upsample(int(dim*2**2)) ## From Level 3 to Level 2
self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias)
self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])
self.up2_1 = Upsample(int(dim*2**1)) ## From Level 2 to Level 1 (NO 1x1 conv to reduce channels)
self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])
self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)])
#### For Dual-Pixel Defocus Deblurring Task ####
self.dual_pixel_task = dual_pixel_task
if self.dual_pixel_task:
self.skip_conv = nn.Conv2d(dim, int(dim*2**1), kernel_size=1, bias=bias)
###########################
self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias)
def forward(self, inp_img):
inp_enc_level1 = self.patch_embed(inp_img)
out_enc_level1 = self.encoder_level1(inp_enc_level1)
inp_enc_level2 = self.down1_2(out_enc_level1)
out_enc_level2 = self.encoder_level2(inp_enc_level2)
inp_enc_level3 = self.down2_3(out_enc_level2)
out_enc_level3 = self.encoder_level3(inp_enc_level3)
inp_enc_level4 = self.down3_4(out_enc_level3)
latent = self.latent(inp_enc_level4)
inp_dec_level3 = self.up4_3(latent)
inp_dec_level3 = torch.cat([inp_dec_level3, out_enc_level3], 1)
inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3)
out_dec_level3 = self.decoder_level3(inp_dec_level3)
inp_dec_level2 = self.up3_2(out_dec_level3)
inp_dec_level2 = torch.cat([inp_dec_level2, out_enc_level2], 1)
inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2)
out_dec_level2 = self.decoder_level2(inp_dec_level2)
inp_dec_level1 = self.up2_1(out_dec_level2)
inp_dec_level1 = torch.cat([inp_dec_level1, out_enc_level1], 1)
out_dec_level1 = self.decoder_level1(inp_dec_level1)
out_dec_level1 = self.refinement(out_dec_level1)
#### For Dual-Pixel Defocus Deblurring Task ####
if self.dual_pixel_task:
out_dec_level1 = out_dec_level1 + self.skip_conv(inp_enc_level1)
out_dec_level1 = self.output(out_dec_level1)
###########################
else:
# When inp_channels==4 (e.g. RGB+GBP), residual uses only first 3 channels
res = inp_img[:, :3] if inp_img.shape[1] >= 4 else inp_img
out_dec_level1 = self.output(out_dec_level1) + res
return out_dec_level1
##########################################################################
## Restormer + Global Background Prior (GBP, hue channel, liuxiao.pdf / GBPG-Net)
##########################################################################
class RestormerWithGBP(nn.Module):
"""
Restormer guided by degradation-free prior: HSV hue channel (GBP).
Paper: GBPG-Net: Global Background Prior-Guided Rain and Snow Image Restoration (Xiao Liu et al., IEEE TNNLS 2025).
Forward: input RGB (B,3,H,W) -> concat(RGB, Hue) -> Restormer(4ch) -> clean RGB.
"""
def __init__(self, inp_channels=3, out_channels=3, dim=48, num_blocks=[4,6,6,8], num_refinement_blocks=4,
heads=[1,2,4,8], ffn_expansion_factor=2.66, bias=False, LayerNorm_type='WithBias', dual_pixel_task=False):
super(RestormerWithGBP, self).__init__()
self.restormer = Restormer(inp_channels=4, out_channels=3, dim=dim, num_blocks=num_blocks,
num_refinement_blocks=num_refinement_blocks, heads=heads,
ffn_expansion_factor=ffn_expansion_factor, bias=bias,
LayerNorm_type=LayerNorm_type, dual_pixel_task=dual_pixel_task)
def forward(self, inp_img):
hue = rgb_to_hue(inp_img)
x = torch.cat([inp_img, hue], dim=1)
return self.restormer(x)