| """ |
| U-Net based DIE model for cleaning document. |
| """ |
|
|
| import os |
| from typing import Callable |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import torchvision.transforms as T |
| from PIL import Image |
|
|
|
|
| class DoubleConv(nn.Module): |
| """(convolution => [BN] => ReLU) * 2""" |
|
|
| def __init__(self, in_channels, out_channels, mid_channels=None): |
| super().__init__() |
| if not mid_channels: |
| mid_channels = out_channels |
| self.double_conv = nn.Sequential( |
| nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False), |
| nn.BatchNorm2d(mid_channels), |
| nn.ReLU(inplace=True), |
| nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False), |
| nn.BatchNorm2d(out_channels), |
| nn.ReLU(inplace=True) |
| ) |
|
|
| def forward(self, x): |
| return self.double_conv(x) |
|
|
|
|
| class Down(nn.Module): |
| """Downscaling with maxpool then double conv""" |
|
|
| def __init__(self, in_channels, out_channels): |
| super().__init__() |
| self.maxpool_conv = nn.Sequential( |
| nn.MaxPool2d(2), |
| DoubleConv(in_channels, out_channels) |
| ) |
|
|
| def forward(self, x): |
| return self.maxpool_conv(x) |
|
|
|
|
| class Up(nn.Module): |
| """Upscaling then double conv""" |
|
|
| def __init__(self, in_channels, out_channels, bilinear=True): |
| super().__init__() |
|
|
| |
| if bilinear: |
| self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True) |
| self.conv = DoubleConv(in_channels, out_channels, in_channels // 2) |
| else: |
| self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2) |
| self.conv = DoubleConv(in_channels, out_channels) |
|
|
| def forward(self, x1, x2): |
| x1 = self.up(x1) |
| |
| diffY = x2.size()[2] - x1.size()[2] |
| diffX = x2.size()[3] - x1.size()[3] |
|
|
| x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2, |
| diffY // 2, diffY - diffY // 2]) |
| |
| |
| |
| x = torch.cat([x2, x1], dim=1) |
| return self.conv(x) |
|
|
|
|
| class OutConv(nn.Module): |
| def __init__(self, in_channels, out_channels): |
| super(OutConv, self).__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1) |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| x = torch.sigmoid(x) |
| return x |
|
|
|
|
| class UNet(nn.Module): |
| def __init__(self, n_channels, output_channel_dim=1, bilinear=False): |
| super(UNet, self).__init__() |
| self.n_channels = n_channels |
| self.n_classes = output_channel_dim |
| self.bilinear = bilinear |
|
|
| self.inc = DoubleConv(n_channels, 64) |
| self.down1 = Down(64, 128) |
| self.down2 = Down(128, 256) |
| self.down3 = Down(256, 512) |
| factor = 2 if bilinear else 1 |
| self.down4 = Down(512, 1024 // factor) |
| self.up1 = Up(1024, 512 // factor, bilinear) |
| self.up2 = Up(512, 256 // factor, bilinear) |
| self.up3 = Up(256, 128 // factor, bilinear) |
| self.up4 = Up(128, 64, bilinear) |
| self.outc = OutConv(64, output_channel_dim) |
|
|
| def forward(self, x): |
| x1 = self.inc(x) |
| x2 = self.down1(x1) |
| x3 = self.down2(x2) |
| x4 = self.down3(x3) |
| x5 = self.down4(x4) |
| x = self.up1(x5, x4) |
| x = self.up2(x, x3) |
| x = self.up3(x, x2) |
| x = self.up4(x, x1) |
| logits = self.outc(x) |
| return logits |
|
|
|
|
| def add_gaussian_noise( |
| data: torch.Tensor |
| ) -> torch.Tensor: |
| """ |
| Adding gaussian noise to torch tensor. |
| :param data: torch tensor |
| :return: noise perturbed tensor |
| """ |
|
|
| data_with_noise = data.clone() |
| data_with_noise += torch.normal(mean=0, std=0.05, size=data_with_noise.shape).to(data_with_noise.device) |
| data_with_noise = data_with_noise.clip(min=0, max=1) |
|
|
| return data_with_noise |
|
|
|
|
| def inference_model( |
| model: Callable, |
| model_input: torch.Tensor, |
| device: str | torch.device, |
| num_of_iterations: int = 1 |
| ) -> list[torch.Tensor, ...]: |
| """ |
| Performing model inference. |
| :param model: image pre-processing model |
| :param model_input: data to model |
| :param device: cuda device |
| :param num_of_iterations: defines how many times feed the network (recursively) |
| :return: predictions |
| """ |
|
|
| |
| with torch.no_grad(): |
|
|
| prediction_list = [] |
|
|
| model_input = model_input.to(device) |
|
|
| if len(model_input.shape) == 3: |
| model_input = model_input.unsqueeze(dim=0) |
|
|
| model_input_original_part = model_input[:, 0:3, ...] |
|
|
| for i in range(num_of_iterations): |
|
|
| if i == 0: |
| model_input = add_gaussian_noise(model_input) |
| prediction = model(model_input) |
| prediction_list.append(prediction) |
| model_input_new = torch.cat((model_input_original_part, prediction.detach()), dim=1) |
| else: |
| model_input_perturbed = add_gaussian_noise(model_input_new) |
| prediction = model(model_input_perturbed) |
| prediction_list.append(prediction) |
| model_input_new = torch.cat((model_input_original_part, prediction.detach()), dim=1) |
|
|
| return prediction_list |
|
|
|
|
| def load_unet( |
| model_path: str, |
| device: str = 'cpu', |
| eval_mode: bool = False, |
| n_channels: int = 4, |
| bilinear: bool = False, |
| output_channel_dim: int = 1 |
| ): |
|
|
| print("Loading UNet model...") |
|
|
| |
| model = UNet( |
| n_channels=n_channels, |
| bilinear=bilinear, |
| output_channel_dim=output_channel_dim |
| ) |
|
|
| |
| state_dict = torch.load(os.path.join(model_path), map_location=device) |
| new_state_dict = {key.replace('module.', ''): value for key, value in state_dict.items()} |
| model.load_state_dict(new_state_dict) |
| model.to(device) |
|
|
| if eval_mode: |
| model.eval() |
|
|
| return model |
|
|
|
|
| class UNetDIEModel: |
| """ |
| Class for Document Image Enhancement with U-Net. |
| """ |
|
|
| def __init__( |
| self, |
| *args, |
| **kwargs |
| ): |
| """ |
| Initialization. |
| """ |
|
|
| self.args = kwargs['args'] |
|
|
| |
| self.die = load_unet( |
| model_path=self.args.die_model_path, |
| device=self.args.device, |
| eval_mode=True, |
| ) |
|
|
| def enhance_document_image( |
| self, |
| image_raw_list: list[Image.Image], |
| num_of_die_iterations: int = 1, |
| ) -> list[Image.Image]: |
| """" |
| Enhance document image by removing noise. |
| :param image_raw_list: original document page to process |
| :param num_of_die_iterations: number of DIE iterations |
| :return: cleaned document page to process |
| """ |
|
|
| with torch.no_grad(): |
|
|
| |
| image_die = torch.stack(image_raw_list, dim=0) |
|
|
| |
| prediction_list = inference_model( |
| model=self.die, |
| model_input=image_die, |
| num_of_iterations=num_of_die_iterations, |
| device=self.args.device |
| ) |
|
|
| |
| last_prediction = prediction_list[-1] |
| batch_size = last_prediction.size(0) |
| image_die_list = [T.ToPILImage()(last_prediction[idx, ...]).convert('RGB') for idx in range(batch_size)] |
|
|
| return image_die_list |
|
|