| |
| |
| |
|
|
|
|
| import torch, torchvision |
| |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from pdb import set_trace as stx |
| import numbers |
|
|
| from einops import rearrange |
| from einops.layers.torch import Rearrange |
| import time |
| import os |
| import sys |
| from net.arch_util import LayerNorm2d |
| from net.local_arch import Local_Base |
| from net.gbp_prior import rgb_to_hue |
| import torchvision.transforms as transforms |
|
|
| |
| |
|
|
| def to_3d(x): |
| return rearrange(x, 'b c h w -> b (h w) c') |
|
|
| def to_4d(x,h,w): |
| return rearrange(x, 'b (h w) c -> b c h w',h=h,w=w) |
|
|
| class BiasFree_LayerNorm(nn.Module): |
| def __init__(self, normalized_shape): |
| super(BiasFree_LayerNorm, self).__init__() |
| if isinstance(normalized_shape, numbers.Integral): |
| normalized_shape = (normalized_shape,) |
| normalized_shape = torch.Size(normalized_shape) |
|
|
| assert len(normalized_shape) == 1 |
|
|
| self.weight = nn.Parameter(torch.ones(normalized_shape)) |
| self.normalized_shape = normalized_shape |
|
|
| def forward(self, x): |
| sigma = x.var(-1, keepdim=True, unbiased=False) |
| return x / torch.sqrt(sigma+1e-5) * self.weight |
|
|
| class WithBias_LayerNorm(nn.Module): |
| def __init__(self, normalized_shape): |
| super(WithBias_LayerNorm, self).__init__() |
| if isinstance(normalized_shape, numbers.Integral): |
| normalized_shape = (normalized_shape,) |
| normalized_shape = torch.Size(normalized_shape) |
|
|
| assert len(normalized_shape) == 1 |
|
|
| self.weight = nn.Parameter(torch.ones(normalized_shape)) |
| self.bias = nn.Parameter(torch.zeros(normalized_shape)) |
| self.normalized_shape = normalized_shape |
|
|
| def forward(self, x): |
| mu = x.mean(-1, keepdim=True) |
| sigma = x.var(-1, keepdim=True, unbiased=False) |
| return (x - mu) / torch.sqrt(sigma+1e-5) * self.weight + self.bias |
|
|
| class LayerNorm(nn.Module): |
| def __init__(self, dim, LayerNorm_type): |
| super(LayerNorm, self).__init__() |
| if LayerNorm_type =='BiasFree': |
| self.body = BiasFree_LayerNorm(dim) |
| else: |
| self.body = WithBias_LayerNorm(dim) |
|
|
| def forward(self, x): |
| h, w = x.shape[-2:] |
| return to_4d(self.body(to_3d(x)), h, w) |
|
|
| |
| |
| class FeedForward(nn.Module): |
| def __init__(self, dim, ffn_expansion_factor, bias): |
| super(FeedForward, self).__init__() |
|
|
| hidden_features = int(dim*ffn_expansion_factor) |
|
|
| self.project_in = nn.Conv2d(dim, hidden_features*2, kernel_size=1, bias=bias) |
|
|
| self.dwconv = nn.Conv2d(hidden_features*2, hidden_features*2, kernel_size=3, stride=1, padding=1, groups=hidden_features*2, bias=bias) |
|
|
| self.project_out = nn.Conv2d(hidden_features, dim, kernel_size=1, bias=bias) |
|
|
| def forward(self, x): |
| x = self.project_in(x) |
| x1, x2 = self.dwconv(x).chunk(2, dim=1) |
| x = F.gelu(x1) * x2 |
| x = self.project_out(x) |
| return x |
|
|
| |
| |
| class Attention(nn.Module): |
| def __init__(self, dim, num_heads, bias): |
| super(Attention, self).__init__() |
| self.num_heads = num_heads |
| self.temperature = nn.Parameter(torch.ones(num_heads, 1, 1)) |
|
|
| self.qkv = nn.Conv2d(dim, dim*3, kernel_size=1, bias=bias) |
| self.qkv_dwconv = nn.Conv2d(dim*3, dim*3, kernel_size=3, stride=1, padding=1, groups=dim*3, bias=bias) |
| self.project_out = nn.Conv2d(dim, dim, kernel_size=1, bias=bias) |
| def forward(self, x): |
| b,c,h,w = x.shape |
| qkv = self.qkv_dwconv(self.qkv(x)) |
| q,k,v = qkv.chunk(3, dim=1) |
| q = rearrange(q, 'b (head c) h w -> b head c (h w)', head=self.num_heads) |
| k = rearrange(k, 'b (head c) h w -> b head c (h w)', head=self.num_heads) |
| v = rearrange(v, 'b (head c) h w -> b head c (h w)', head=self.num_heads) |
| q = torch.nn.functional.normalize(q, dim=-1) |
| k = torch.nn.functional.normalize(k, dim=-1) |
| attn = (q @ k.transpose(-2, -1)) * self.temperature |
| attn = attn.softmax(dim=-1) |
| out = (attn @ v) |
| out = rearrange(out, 'b head c (h w) -> b (head c) h w', head=self.num_heads, h=h, w=w) |
|
|
| out = self.project_out(out) |
| return out |
|
|
| class resblock(nn.Module): |
| def __init__(self, dim): |
|
|
| super(resblock, self).__init__() |
| |
|
|
| self.body = nn.Sequential(nn.Conv2d(dim, dim, kernel_size=3, stride=1, padding=1, bias=False), |
| nn.PReLU(), |
| nn.Conv2d(dim, dim, kernel_size=3, stride=1, padding=1, bias=False)) |
| def forward(self, x): |
| res = self.body((x)) |
| res += x |
| return res |
|
|
|
|
| |
| |
| class Downsample(nn.Module): |
| def __init__(self, n_feat): |
| super(Downsample, self).__init__() |
|
|
| self.body = nn.Sequential(nn.Conv2d(n_feat, n_feat//2, kernel_size=3, stride=1, padding=1, bias=False), |
| nn.PixelUnshuffle(2)) |
| def forward(self, x): |
| return self.body(x) |
|
|
| class Upsample(nn.Module): |
| def __init__(self, n_feat): |
| super(Upsample, self).__init__() |
|
|
| self.body = nn.Sequential(nn.Conv2d(n_feat, n_feat*2, kernel_size=3, stride=1, padding=1, bias=False), |
| nn.PixelShuffle(2)) |
| def forward(self, x): |
| return self.body(x) |
|
|
| |
| |
| class TransformerBlock(nn.Module): |
| def __init__(self, dim, num_heads, ffn_expansion_factor, bias, LayerNorm_type): |
| super(TransformerBlock, self).__init__() |
| self.norm1 = LayerNorm(dim, LayerNorm_type) |
| self.attn = Attention(dim, num_heads, bias) |
| self.norm2 = LayerNorm(dim, LayerNorm_type) |
| self.ffn = FeedForward(dim, ffn_expansion_factor, bias) |
|
|
| def forward(self, x): |
| x = x + self.attn(self.norm1(x)) |
| x = x + self.ffn(self.norm2(x)) |
| return x |
|
|
| |
| |
| class OverlapPatchEmbed(nn.Module): |
| def __init__(self, in_c=3, embed_dim=48, bias=False): |
| super(OverlapPatchEmbed, self).__init__() |
| self.proj = nn.Conv2d(in_c, embed_dim, kernel_size=3, stride=1, padding=1, bias=bias) |
| def forward(self, x): |
| x = self.proj(x) |
| return x |
|
|
| |
| |
| class PromptGenBlock(nn.Module): |
| def __init__(self,prompt_dim=128,prompt_len=5,prompt_size = 96,lin_dim = 192): |
| super(PromptGenBlock,self).__init__() |
| self.prompt_param = nn.Parameter(torch.rand(1,prompt_len,prompt_dim,prompt_size,prompt_size)) |
| self.linear_layer = nn.Linear(lin_dim,prompt_len) |
| self.conv3x3 = nn.Conv2d(prompt_dim,prompt_dim,kernel_size=3,stride=1,padding=1,bias=False) |
| def forward(self,x): |
| B,C,H,W = x.shape |
| emb = x.mean(dim=(-2,-1)) |
| prompt_weights = F.softmax(self.linear_layer(emb),dim=1) |
| prompt = prompt_weights.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1) * self.prompt_param.unsqueeze(0).repeat(B,1,1,1,1,1).squeeze(1) |
| prompt = torch.sum(prompt,dim=1) |
| prompt = F.interpolate(prompt,(H,W),mode="bilinear") |
| prompt = self.conv3x3(prompt) |
| return prompt |
|
|
|
|
|
|
|
|
|
|
| |
| |
|
|
| class PromptIR(nn.Module): |
| def __init__(self, |
| inp_channels=3, |
| out_channels=3, |
| dim = 48, |
| num_blocks = [4,6,6,8], |
| num_refinement_blocks = 4, |
| heads = [1,2,4,8], |
| ffn_expansion_factor = 2.66, |
| bias = False, |
| LayerNorm_type = 'WithBias', |
| decoder = False, |
| ): |
| super(PromptIR, self).__init__() |
| self.patch_embed = OverlapPatchEmbed(inp_channels, dim) |
| self.decoder = decoder |
| if self.decoder: |
| self.prompt1 = PromptGenBlock(prompt_dim=64,prompt_len=5,prompt_size = 64,lin_dim = 96) |
| self.prompt2 = PromptGenBlock(prompt_dim=128,prompt_len=5,prompt_size = 32,lin_dim = 192) |
| self.prompt3 = PromptGenBlock(prompt_dim=320,prompt_len=5,prompt_size = 16,lin_dim = 384) |
| self.chnl_reduce1 = nn.Conv2d(64,64,kernel_size=1,bias=bias) |
| self.chnl_reduce2 = nn.Conv2d(128,128,kernel_size=1,bias=bias) |
| self.chnl_reduce3 = nn.Conv2d(320,256,kernel_size=1,bias=bias) |
|
|
| self.reduce_noise_channel_1 = nn.Conv2d(dim + 64,dim,kernel_size=1,bias=bias) |
| self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| |
| self.down1_2 = Downsample(dim) |
|
|
| self.reduce_noise_channel_2 = nn.Conv2d(int(dim*2**1) + 128,int(dim*2**1),kernel_size=1,bias=bias) |
| self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| |
| self.down2_3 = Downsample(int(dim*2**1)) |
|
|
| self.reduce_noise_channel_3 = nn.Conv2d(int(dim*2**2) + 256,int(dim*2**2),kernel_size=1,bias=bias) |
| self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
|
|
| self.down3_4 = Downsample(int(dim*2**2)) |
| self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])]) |
| |
| self.up4_3 = Upsample(int(dim*2**2)) |
| self.reduce_chan_level3 = nn.Conv2d(int(dim*2**1)+192, int(dim*2**2), kernel_size=1, bias=bias) |
| self.noise_level3 = TransformerBlock(dim=int(dim*2**2) + 512, num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) |
| self.reduce_noise_level3 = nn.Conv2d(int(dim*2**2)+512,int(dim*2**2),kernel_size=1,bias=bias) |
|
|
|
|
| self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
|
|
|
|
| self.up3_2 = Upsample(int(dim*2**2)) |
| self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias) |
| self.noise_level2 = TransformerBlock(dim=int(dim*2**1) + 224, num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) |
| self.reduce_noise_level2 = nn.Conv2d(int(dim*2**1)+224,int(dim*2**2),kernel_size=1,bias=bias) |
|
|
| self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| |
| self.up2_1 = Upsample(int(dim*2**1)) |
|
|
| self.noise_level1 = TransformerBlock(dim=int(dim*2**1)+64, num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) |
| self.reduce_noise_level1 = nn.Conv2d(int(dim*2**1)+64,int(dim*2**1),kernel_size=1,bias=bias) |
|
|
| self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| |
| self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)]) |
| |
| self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
| def forward(self, inp_img,noise_emb = None): |
| inp_enc_level1 = self.patch_embed(inp_img) |
| out_enc_level1 = self.encoder_level1(inp_enc_level1) |
| inp_enc_level2 = self.down1_2(out_enc_level1) |
| out_enc_level2 = self.encoder_level2(inp_enc_level2) |
| inp_enc_level3 = self.down2_3(out_enc_level2) |
| out_enc_level3 = self.encoder_level3(inp_enc_level3) |
| inp_enc_level4 = self.down3_4(out_enc_level3) |
| latent = self.latent(inp_enc_level4) |
| if self.decoder: |
| dec3_param = self.prompt3(latent) |
| latent = torch.cat([latent, dec3_param], 1) |
| latent = self.noise_level3(latent) |
| latent = self.reduce_noise_level3(latent) |
| |
| inp_dec_level3 = self.up4_3(latent) |
|
|
| inp_dec_level3 = torch.cat([inp_dec_level3, out_enc_level3], 1) |
| inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3) |
|
|
| out_dec_level3 = self.decoder_level3(inp_dec_level3) |
| if self.decoder: |
| dec2_param = self.prompt2(out_dec_level3) |
| out_dec_level3 = torch.cat([out_dec_level3, dec2_param], 1) |
| out_dec_level3 = self.noise_level2(out_dec_level3) |
| out_dec_level3 = self.reduce_noise_level2(out_dec_level3) |
|
|
| inp_dec_level2 = self.up3_2(out_dec_level3) |
| inp_dec_level2 = torch.cat([inp_dec_level2, out_enc_level2], 1) |
| inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2) |
|
|
| out_dec_level2 = self.decoder_level2(inp_dec_level2) |
| if self.decoder: |
| dec1_param = self.prompt1(out_dec_level2) |
| out_dec_level2 = torch.cat([out_dec_level2, dec1_param], 1) |
| out_dec_level2 = self.noise_level1(out_dec_level2) |
| out_dec_level2 = self.reduce_noise_level1(out_dec_level2) |
| |
| inp_dec_level1 = self.up2_1(out_dec_level2) |
| inp_dec_level1 = torch.cat([inp_dec_level1, out_enc_level1], 1) |
| out_dec_level1 = self.decoder_level1(inp_dec_level1) |
| out_dec_level1 = self.refinement(out_dec_level1) |
| out_dec_level1 = self.output(out_dec_level1) + inp_img |
| return out_dec_level1 |
|
|
|
|
| class ch_shuffle_high_text(nn.Module): |
| def __init__(self, ch_dim,num_heads,LayerNorm_type,ffn_expansion_factor, bias,lin_ch=512): |
| super(ch_shuffle_high_text, self).__init__() |
| self.dim = ch_dim |
| self.linear_layer1 = nn.Linear(lin_ch,lin_ch) |
| self.linear_layer3 = nn.Linear(lin_ch,2*ch_dim) |
| |
| self.conv1x1 = nn.Conv2d(ch_dim, 2*ch_dim, kernel_size=1, stride=1, padding=0) |
| self.conv_out = nn.Conv2d(2*ch_dim, ch_dim, kernel_size=1, stride=1, padding=0) |
| self.norm1 = LayerNorm(ch_dim, LayerNorm_type) |
| self.norm2 = LayerNorm(ch_dim, LayerNorm_type) |
| self.norm3 = LayerNorm(ch_dim, LayerNorm_type) |
| self.select_attn = Topm_CrossAttention_Restormer(ch_dim, num_heads, bias=False) |
| self.ffn = FeedForward(ch_dim, ffn_expansion_factor, bias) |
| def forward(self, img_featur, text_code): |
| b,c,_,_ = img_featur.shape |
| img_feature2 = img_featur |
| text_code = self.linear_layer1(text_code) |
| text_code = self.linear_layer3(text_code) |
| soft_values, soft_indices = torch.topk(text_code, k=2*self.dim) |
| img_featur = self.conv1x1(img_featur) |
| shuffled_img = img_featur[torch.arange(b).unsqueeze(1), soft_indices, :, :] |
|
|
| q = self.conv_out(shuffled_img) |
| att = self.select_attn(self.norm1(q),self.norm2(img_feature2)) |
| output = att + self.ffn(self.norm3(att)) |
| return output,img_feature2 |
|
|
| class Topm_CrossAttention_Restormer(nn.Module): |
| def __init__(self, dim, num_heads, bias): |
| super(Topm_CrossAttention_Restormer, self).__init__() |
| self.num_heads = num_heads |
| self.temperature = nn.Parameter(torch.ones(num_heads, 1, 1)) |
| self.kv = nn.Conv2d(dim, dim*2, kernel_size=1, bias=bias) |
| self.kv_dwconv = nn.Conv2d(dim*2, dim*2, kernel_size=3, stride=1, padding=1, groups=dim*2, bias=bias) |
| self.q = nn.Conv2d(dim, dim, kernel_size=1, bias=bias) |
| self.q_dwconv = nn.Conv2d(dim, dim, kernel_size=3, stride=1, padding=1, groups=dim, bias=bias) |
|
|
| self.project_out = nn.Conv2d(dim, dim, kernel_size=1, bias=bias) |
| self.attn_drop = nn.Dropout(0.) |
| self.attn4 = torch.nn.Parameter(torch.tensor([0.2]), requires_grad=True) |
|
|
| def forward(self, x_q, x_kv): |
| b,c,h,w = x_q.shape |
| q = self.q_dwconv(self.q(x_q)) |
| kv = self.kv_dwconv(self.kv(x_kv)) |
| k,v = kv.chunk(2, dim=1) |
| q = rearrange(q, 'b (head c) h w -> b head c (h w)', head=self.num_heads) |
| k = rearrange(k, 'b (head c) h w -> b head c (h w)', head=self.num_heads) |
| v = rearrange(v, 'b (head c) h w -> b head c (h w)', head=self.num_heads) |
| q = torch.nn.functional.normalize(q, dim=-1) |
| k = torch.nn.functional.normalize(k, dim=-1) |
| _, _, C, _ = q.shape |
| mask4 = torch.zeros(b, self.num_heads, C, C, device=x_q.device, requires_grad=False) |
|
|
| attn = (q @ k.transpose(-2, -1)) * self.temperature |
| index = torch.topk(attn, k=int(C*9/10), dim=-1, largest=True)[1] |
| mask4.scatter_(-1, index, 1.) |
| attn4 = torch.where(mask4 > 0, attn, torch.full_like(attn, float('-inf'))) |
| attn4 = attn4.softmax(dim=-1) |
| out4 = (attn4 @ v) |
| out = out4 * self.attn4 |
| out = rearrange(out, 'b head c (h w) -> b (head c) h w', head=self.num_heads, h=h, w=w) |
| out = self.project_out(out) |
| return out |
| |
| class ChannelShuffle_skip_textguaid(nn.Module): |
| """ChannelShuffle model. Use forward(inp_img) only; text_code is replaced by a learnable embedding (no CLIP).""" |
| def __init__(self, |
| inp_channels=3, |
| out_channels=3, |
| dim = 48, |
| num_blocks = [4,6,6,8], |
| num_refinement_blocks = 4, |
| heads = [1,2,4,8], |
| ffn_expansion_factor = 2.66, |
| bias = False, |
| LayerNorm_type = 'WithBias', |
| device = "cuda:0", |
| dual_pixel_task = False |
| ): |
| super(ChannelShuffle_skip_textguaid, self).__init__() |
| self.device = device |
| |
| self.learned_text = nn.Parameter(torch.zeros(1, 512)) |
|
|
| self.patch_embed = OverlapPatchEmbed(inp_channels, dim) |
| self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| self.encoder_shuffle_channel1 = ch_shuffle_high_text(ch_dim = dim,num_heads=heads[0],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) |
|
|
| self.down1_2 = Downsample(dim) |
| self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| self.encoder_shuffle_channel2 = ch_shuffle_high_text(ch_dim = int(dim*2**1),num_heads=heads[1],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) |
|
|
| self.down2_3 = Downsample(int(dim*2**1)) |
| self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
| self.encoder_shuffle_channel3 = ch_shuffle_high_text(ch_dim = int(dim*2**2),num_heads=heads[2],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) |
|
|
| self.down3_4 = Downsample(int(dim*2**2)) |
| self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])]) |
| self.latent_shuffle_channel = ch_shuffle_high_text(ch_dim = int(dim*2**3),num_heads=heads[3],LayerNorm_type=LayerNorm_type,ffn_expansion_factor=ffn_expansion_factor,bias=bias) |
|
|
| self.up4_3 = Upsample(int(dim*2**3)) |
| self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias) |
| self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
|
|
| self.up3_2 = Upsample(int(dim*2**2)) |
| self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias) |
| self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
|
|
| self.up2_1 = Upsample(int(dim*2**1)) |
| self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
|
|
| self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)]) |
| |
| |
| |
| |
| |
| |
| self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
| def forward(self, inp_img, text_code=None): |
| if text_code is None: |
| text_code = self.learned_text.expand(inp_img.size(0), -1) |
| inp_enc_level1 = self.patch_embed(inp_img) |
| out_enc_level1 = self.encoder_level1(inp_enc_level1) |
| |
| inp_enc_level2 = self.down1_2(out_enc_level1) |
| out_enc_level2 = self.encoder_level2(inp_enc_level2) |
|
|
| inp_enc_level3 = self.down2_3(out_enc_level2) |
| out_enc_level3 = self.encoder_level3(inp_enc_level3) |
|
|
| inp_enc_level4 = self.down3_4(out_enc_level3) |
| latent = self.latent(inp_enc_level4) |
| latent,_ = self.latent_shuffle_channel(latent,text_code) |
| |
| inp_dec_level3 = self.up4_3(latent) |
| outt1,_ = self.encoder_shuffle_channel3(out_enc_level3,text_code) |
| inp_dec_level3 = torch.cat([inp_dec_level3, outt1], 1) |
| inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3) |
| out_dec_level3 = self.decoder_level3(inp_dec_level3) |
| |
| inp_dec_level2 = self.up3_2(out_dec_level3) |
| outt2,_ = self.encoder_shuffle_channel2(out_enc_level2,text_code) |
| inp_dec_level2 = torch.cat([inp_dec_level2, outt2], 1) |
| inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2) |
| out_dec_level2 = self.decoder_level2(inp_dec_level2) |
| |
| inp_dec_level1 = self.up2_1(out_dec_level2) |
| outt3,_ = self.encoder_shuffle_channel1(out_enc_level1,text_code) |
| inp_dec_level1 = torch.cat([inp_dec_level1, outt3], 1) |
| out_dec_level1 = self.decoder_level1(inp_dec_level1) |
| |
| out_dec_level1 = self.refinement(out_dec_level1) |
|
|
| |
| |
| |
| |
| |
| |
| res = inp_img[:, :3] if inp_img.shape[1] == 4 else inp_img |
| out_dec_level1 = self.output(out_dec_level1) + res |
|
|
| return out_dec_level1 |
|
|
|
|
| |
| |
| |
| class PriorEncoder(nn.Module): |
| """对 1 通道 hue 做多尺度编码,与主网络 encoder 各层分辨率对齐,用于后续融合。""" |
| def __init__(self, dim=48, bias=False): |
| super(PriorEncoder, self).__init__() |
| self.embed = nn.Conv2d(1, dim, kernel_size=3, stride=1, padding=1, bias=bias) |
| self.down1 = Downsample(dim) |
| self.down2 = Downsample(int(dim*2**1)) |
| self.down3 = Downsample(int(dim*2**2)) |
|
|
| def forward(self, hue): |
| |
| p1 = self.embed(hue) |
| p2 = self.down1(p1) |
| p3 = self.down2(p2) |
| p4 = self.down3(p3) |
| return p1, p2, p3, p4 |
|
|
|
|
| def _prior_fuse(main_feat, prior_feat, fuse_conv): |
| """融合:concat(main, prior) -> 1x1 conv,再与 main 残差相加,引导 main 特征。""" |
| fused = torch.cat([main_feat, prior_feat], dim=1) |
| return main_feat + fuse_conv(fused) |
|
|
|
|
| |
| |
| |
| class ChannelShuffleWithGBP(nn.Module): |
| """ |
| ChannelShuffle_skip_textguaid + 无降质先验:用 HSV 的 hue 通道 (GBP) 引导复原。 |
| 输入 RGB (B,3,H,W),内部 concat(RGB, hue) 成 4 通道再送入 ChannelShuffle_skip_textguaid。 |
| """ |
| def __init__(self, inp_channels=3, out_channels=3, dim=48, num_blocks=[4,6,6,8], |
| num_refinement_blocks=4, heads=[1,2,4,8], ffn_expansion_factor=2.66, |
| bias=False, LayerNorm_type='WithBias', device='cuda:0', dual_pixel_task=False): |
| super(ChannelShuffleWithGBP, self).__init__() |
| self.net = ChannelShuffle_skip_textguaid( |
| inp_channels=4, |
| out_channels=3, |
| dim=dim, |
| num_blocks=num_blocks, |
| num_refinement_blocks=num_refinement_blocks, |
| heads=heads, |
| ffn_expansion_factor=ffn_expansion_factor, |
| bias=bias, |
| LayerNorm_type=LayerNorm_type, |
| device=device, |
| dual_pixel_task=dual_pixel_task, |
| ) |
|
|
| def forward(self, inp_img, text_code=None): |
| hue = rgb_to_hue(inp_img) |
| x = torch.cat([inp_img, hue], dim=1) |
| return self.net(x, text_code=text_code) |
|
|
|
|
| |
| |
| |
| class ChannelShuffleWithGBPDeep(nn.Module): |
| """ |
| 无降质先验 (hue) 单独经过 PriorEncoder 得到多尺度特征, |
| 在 encoder 每一层与 ChannelShuffle 特征做融合(concat + 1x1 conv 残差),引导复原。 |
| """ |
| def __init__(self, inp_channels=3, out_channels=3, dim=48, num_blocks=[4,6,6,8], |
| num_refinement_blocks=4, heads=[1,2,4,8], ffn_expansion_factor=2.66, |
| bias=False, LayerNorm_type='WithBias', device='cuda:0', dual_pixel_task=False): |
| super(ChannelShuffleWithGBPDeep, self).__init__() |
| self.device = device |
| self.learned_text = nn.Parameter(torch.zeros(1, 512)) |
|
|
| self.prior_encoder = PriorEncoder(dim=dim, bias=bias) |
| self.fuse_conv1 = nn.Conv2d(dim * 2, dim, kernel_size=1, bias=bias) |
| self.fuse_conv2 = nn.Conv2d(int(dim*2**1) * 2, int(dim*2**1), kernel_size=1, bias=bias) |
| self.fuse_conv3 = nn.Conv2d(int(dim*2**2) * 2, int(dim*2**2), kernel_size=1, bias=bias) |
| self.fuse_conv4 = nn.Conv2d(int(dim*2**3) * 2, int(dim*2**3), kernel_size=1, bias=bias) |
|
|
| self.patch_embed = OverlapPatchEmbed(inp_channels, dim) |
| self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| self.encoder_shuffle_channel1 = ch_shuffle_high_text(ch_dim=dim, num_heads=heads[0], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias) |
| self.down1_2 = Downsample(dim) |
| self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| self.encoder_shuffle_channel2 = ch_shuffle_high_text(ch_dim=int(dim*2**1), num_heads=heads[1], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias) |
| self.down2_3 = Downsample(int(dim*2**1)) |
| self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
| self.encoder_shuffle_channel3 = ch_shuffle_high_text(ch_dim=int(dim*2**2), num_heads=heads[2], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias) |
| self.down3_4 = Downsample(int(dim*2**2)) |
| self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])]) |
| self.latent_shuffle_channel = ch_shuffle_high_text(ch_dim=int(dim*2**3), num_heads=heads[3], LayerNorm_type=LayerNorm_type, ffn_expansion_factor=ffn_expansion_factor, bias=bias) |
| self.up4_3 = Upsample(int(dim*2**3)) |
| self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias) |
| self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
| self.up3_2 = Upsample(int(dim*2**2)) |
| self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias) |
| self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| self.up2_1 = Upsample(int(dim*2**1)) |
| self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)]) |
| self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
| def forward(self, inp_img, text_code=None): |
| if text_code is None: |
| text_code = self.learned_text.expand(inp_img.size(0), -1) |
| hue = rgb_to_hue(inp_img) |
| prior_1, prior_2, prior_3, prior_4 = self.prior_encoder(hue) |
|
|
| inp_enc_level1 = self.patch_embed(inp_img) |
| out_enc_level1 = self.encoder_level1(inp_enc_level1) |
| out_enc_level1 = _prior_fuse(out_enc_level1, prior_1, self.fuse_conv1) |
|
|
| inp_enc_level2 = self.down1_2(out_enc_level1) |
| out_enc_level2 = self.encoder_level2(inp_enc_level2) |
| out_enc_level2 = _prior_fuse(out_enc_level2, prior_2, self.fuse_conv2) |
|
|
| inp_enc_level3 = self.down2_3(out_enc_level2) |
| out_enc_level3 = self.encoder_level3(inp_enc_level3) |
| out_enc_level3 = _prior_fuse(out_enc_level3, prior_3, self.fuse_conv3) |
|
|
| inp_enc_level4 = self.down3_4(out_enc_level3) |
| latent = self.latent(inp_enc_level4) |
| latent = _prior_fuse(latent, prior_4, self.fuse_conv4) |
| latent, _ = self.latent_shuffle_channel(latent, text_code) |
|
|
| inp_dec_level3 = self.up4_3(latent) |
| outt1, _ = self.encoder_shuffle_channel3(out_enc_level3, text_code) |
| inp_dec_level3 = torch.cat([inp_dec_level3, outt1], 1) |
| inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3) |
| out_dec_level3 = self.decoder_level3(inp_dec_level3) |
|
|
| inp_dec_level2 = self.up3_2(out_dec_level3) |
| outt2, _ = self.encoder_shuffle_channel2(out_enc_level2, text_code) |
| inp_dec_level2 = torch.cat([inp_dec_level2, outt2], 1) |
| inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2) |
| out_dec_level2 = self.decoder_level2(inp_dec_level2) |
|
|
| inp_dec_level1 = self.up2_1(out_dec_level2) |
| outt3, _ = self.encoder_shuffle_channel1(out_enc_level1, text_code) |
| inp_dec_level1 = torch.cat([inp_dec_level1, outt3], 1) |
| out_dec_level1 = self.decoder_level1(inp_dec_level1) |
| out_dec_level1 = self.refinement(out_dec_level1) |
| out_dec_level1 = self.output(out_dec_level1) + inp_img |
| return out_dec_level1 |
|
|
|
|
| |
| class Restormer(nn.Module): |
| def __init__(self, |
| inp_channels=3, |
| out_channels=3, |
| dim = 48, |
| num_blocks = [4,6,6,8], |
| num_refinement_blocks = 4, |
| heads = [1,2,4,8], |
| ffn_expansion_factor = 2.66, |
| bias = False, |
| LayerNorm_type = 'WithBias', |
| dual_pixel_task = False |
| ): |
|
|
| super(Restormer, self).__init__() |
|
|
| self.patch_embed = OverlapPatchEmbed(inp_channels, dim) |
|
|
| self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| |
| self.down1_2 = Downsample(dim) |
| self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| |
| self.down2_3 = Downsample(int(dim*2**1)) |
| self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
|
|
| self.down3_4 = Downsample(int(dim*2**2)) |
| self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])]) |
| |
| self.up4_3 = Upsample(int(dim*2**3)) |
| self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias) |
| self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])]) |
|
|
|
|
| self.up3_2 = Upsample(int(dim*2**2)) |
| self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias) |
| self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])]) |
| |
| self.up2_1 = Upsample(int(dim*2**1)) |
|
|
| self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])]) |
| |
| self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)]) |
| |
| |
| self.dual_pixel_task = dual_pixel_task |
| if self.dual_pixel_task: |
| self.skip_conv = nn.Conv2d(dim, int(dim*2**1), kernel_size=1, bias=bias) |
| |
| |
| self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
| def forward(self, inp_img): |
|
|
| inp_enc_level1 = self.patch_embed(inp_img) |
| out_enc_level1 = self.encoder_level1(inp_enc_level1) |
| |
| inp_enc_level2 = self.down1_2(out_enc_level1) |
| out_enc_level2 = self.encoder_level2(inp_enc_level2) |
|
|
| inp_enc_level3 = self.down2_3(out_enc_level2) |
| out_enc_level3 = self.encoder_level3(inp_enc_level3) |
|
|
| inp_enc_level4 = self.down3_4(out_enc_level3) |
| latent = self.latent(inp_enc_level4) |
| |
| inp_dec_level3 = self.up4_3(latent) |
| inp_dec_level3 = torch.cat([inp_dec_level3, out_enc_level3], 1) |
| inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3) |
| out_dec_level3 = self.decoder_level3(inp_dec_level3) |
|
|
| inp_dec_level2 = self.up3_2(out_dec_level3) |
| inp_dec_level2 = torch.cat([inp_dec_level2, out_enc_level2], 1) |
| inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2) |
| out_dec_level2 = self.decoder_level2(inp_dec_level2) |
|
|
| inp_dec_level1 = self.up2_1(out_dec_level2) |
| inp_dec_level1 = torch.cat([inp_dec_level1, out_enc_level1], 1) |
| out_dec_level1 = self.decoder_level1(inp_dec_level1) |
| |
| out_dec_level1 = self.refinement(out_dec_level1) |
|
|
| |
| if self.dual_pixel_task: |
| out_dec_level1 = out_dec_level1 + self.skip_conv(inp_enc_level1) |
| out_dec_level1 = self.output(out_dec_level1) |
| |
| else: |
| |
| res = inp_img[:, :3] if inp_img.shape[1] >= 4 else inp_img |
| out_dec_level1 = self.output(out_dec_level1) + res |
|
|
| return out_dec_level1 |
|
|
|
|
| |
| |
| |
| class RestormerWithGBP(nn.Module): |
| """ |
| Restormer guided by degradation-free prior: HSV hue channel (GBP). |
| Paper: GBPG-Net: Global Background Prior-Guided Rain and Snow Image Restoration (Xiao Liu et al., IEEE TNNLS 2025). |
| Forward: input RGB (B,3,H,W) -> concat(RGB, Hue) -> Restormer(4ch) -> clean RGB. |
| """ |
| def __init__(self, inp_channels=3, out_channels=3, dim=48, num_blocks=[4,6,6,8], num_refinement_blocks=4, |
| heads=[1,2,4,8], ffn_expansion_factor=2.66, bias=False, LayerNorm_type='WithBias', dual_pixel_task=False): |
| super(RestormerWithGBP, self).__init__() |
| self.restormer = Restormer(inp_channels=4, out_channels=3, dim=dim, num_blocks=num_blocks, |
| num_refinement_blocks=num_refinement_blocks, heads=heads, |
| ffn_expansion_factor=ffn_expansion_factor, bias=bias, |
| LayerNorm_type=LayerNorm_type, dual_pixel_task=dual_pixel_task) |
|
|
| def forward(self, inp_img): |
| hue = rgb_to_hue(inp_img) |
| x = torch.cat([inp_img, hue], dim=1) |
| return self.restormer(x) |
|
|
|
|
|
|