# app.py # 1. IMPORTS import gradio as gr import torch import torch.nn as nn import torch.nn.functional as F from torchvision import transforms from PIL import Image import numpy as np import os import math import warnings from compressai.layers import GDN, conv3x3, subpel_conv3x3 from compressai.entropy_models import EntropyBottleneck, GaussianConditional from skimage.metrics import structural_similarity as ssim from skimage.metrics import peak_signal_noise_ratio as psnr ''' 01 - Best for Low Bit Rates ModelvLowBit 005 - Mid Level for Low Bit Rates ModelvMidBit 001 - Mid Level for High Bit Rates ModelvHighBit 0001 - Best for High Bit Rates ModelvBestHighBit ''' ''' QP - Smaller value is worst quality but best for storage ''' warnings.filterwarnings("ignore", "Inputs have mismatched dtype", UserWarning) filt_n = 128 latent_channels = 128 device = "cuda:0" if torch.cuda.is_available() else "cpu" save_path = "./checkpoint/" # 3. MODEL DEFINITIONS (from model.py) def get_scale_table(min_val=0.11, max_val=256, levels=64): """Get the scale table as a list of floats.""" return [float(f) for f in torch.exp(torch.linspace(math.log(min_val), math.log(max_val), levels))] class SpatialEncoder(nn.Module): def __init__(self): super(SpatialEncoder, self).__init__() self.conv_layers_S1 = nn.Sequential(nn.Conv2d(3, filt_n, kernel_size=5, stride=1, padding=1, dilation=3), GDN(filt_n)) self.conv_layers_S2 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) self.conv_layers_S3 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) self.conv_layers_S4 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) self.conv_layers_S5 = nn.Sequential(nn.Conv2d(filt_n, 64, kernel_size=5, stride=3, padding=1), GDN(64)) def forward(self, x): x = self.conv_layers_S1(x) x = self.conv_layers_S2(x) x = self.conv_layers_S3(x) x = self.conv_layers_S4(x) x = self.conv_layers_S5(x) return x class AngularEncoder(nn.Module): def __init__(self): super(AngularEncoder, self).__init__() self.conv_layers_A1 = nn.Sequential(nn.Conv2d(3, filt_n, kernel_size=3, stride=3, padding=1), GDN(filt_n)) self.conv_layers_A2 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) self.conv_layers_A3 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) self.conv_layers_A4 = nn.Sequential(nn.Conv2d(filt_n, 64, kernel_size=5, stride=2, padding=1), GDN(64)) def forward(self, x): x = self.conv_layers_A1(x) x = self.conv_layers_A2(x) x = self.conv_layers_A3(x) x = self.conv_layers_A4(x) return x class HyperpriorNetwork(nn.Module): def __init__(self, channels): super().__init__() self.entropy_bottleneck = EntropyBottleneck(channels) self.h_a = nn.Sequential( conv3x3(channels, channels), nn.LeakyReLU(inplace=True), conv3x3(channels, channels, stride=2), nn.LeakyReLU(inplace=True), conv3x3(channels, channels, stride=2), ) self.h_s = nn.Sequential( conv3x3(channels, channels), nn.LeakyReLU(inplace=True), subpel_conv3x3(channels, channels, 2), nn.LeakyReLU(inplace=True), subpel_conv3x3(channels, channels, 2), nn.LeakyReLU(inplace=True), conv3x3(channels, channels), ) def forward(self, x): z = self.h_a(x) z_hat, z_likelihoods = self.entropy_bottleneck(z) scales = torch.exp(self.h_s(z_hat)) return scales, z_likelihoods class Encoder(nn.Module): def __init__(self, latent_channels): super().__init__() self.spatial_encoder = SpatialEncoder() self.angular_encoder = AngularEncoder() self.spatial_hyperprior = HyperpriorNetwork(64) self.angular_hyperprior = HyperpriorNetwork(64) self.entropy_model_s = GaussianConditional(get_scale_table()) self.entropy_model_a = GaussianConditional(get_scale_table()) def forward(self, x): y_s = self.spatial_encoder(x) y_a = self.angular_encoder(x) scales_s, z_likelihood_s = self.spatial_hyperprior(y_s) scales_a, z_likelihood_a = self.angular_hyperprior(y_a) z_s, likelihood_s = self.entropy_model_s(y_s, scales_s) z_a, likelihood_a = self.entropy_model_a(y_a, scales_a) concatenated = torch.cat((z_s, z_a), dim=1) return { "y_hat": concatenated, "latents": {"y_s": y_s, "y_a": y_a}, "likelihoods": {"y_s": likelihood_s, "y_a": likelihood_a, "z_s": z_likelihood_s, "z_a": z_likelihood_a} } class Decoder(nn.Module): def __init__(self, latent_channels): super().__init__() self.initial_layer = nn.Sequential(nn.ConvTranspose2d(latent_channels, filt_n, kernel_size=5, stride=3, padding=0), GDN(filt_n, inverse=True)) self.conv_layers_D1 = nn.Sequential(nn.ConvTranspose2d(filt_n, filt_n, kernel_size=4, stride=2, padding=0), GDN(filt_n, inverse=True)) self.conv_layers_D2 = nn.Sequential(nn.ConvTranspose2d(filt_n, filt_n, kernel_size=4, stride=2, padding=1), GDN(filt_n, inverse=True)) self.conv_layers_D3 = nn.Sequential(nn.ConvTranspose2d(filt_n, 3, kernel_size=4, stride=2, padding=1), nn.Sigmoid()) def forward(self, z): x = self.initial_layer(z) x = self.conv_layers_D1(x) x = self.conv_layers_D2(x) x = self.conv_layers_D3(x) return x class VAE(nn.Module): def __init__(self, latent_channels): super().__init__() self.encoder = Encoder(latent_channels) self.decoder = Decoder(latent_channels) def forward(self, x): enc_out = self.encoder(x) dec_out = self.decoder(enc_out["y_hat"]) return {"x_hat": dec_out, "likelihoods": enc_out["likelihoods"], "latents": enc_out["latents"]} def extract_patches(image, patch_size=(216, 312), step_size=(180, 260)): patches = [] img_width, img_height = image.size for y in range(0, img_height - patch_size[0] + 1, step_size[0]): for x in range(0, img_width - patch_size[1] + 1, step_size[1]): box = (x, y, x + patch_size[1], y + patch_size[0]) patch = image.crop(box) patches.append(patch) if len(patches) == 49: return patches return patches def reassemble_image(patches, original_size, patch_size, step_size): original_width, original_height = original_size reconstructed = torch.zeros((3, original_height, original_width), device='cpu') counts = torch.zeros_like(reconstructed) patch_idx = 0 for y in range(0, original_height - patch_size[0] + 1, step_size[0]): for x in range(0, original_width - patch_size[1] + 1, step_size[1]): if patch_idx >= len(patches): break patch = patches[patch_idx] reconstructed[:, y:y + patch_size[0], x:x + patch_size[1]] += patch counts[:, y:y + patch_size[0], x:x + patch_size[1]] += 1 patch_idx += 1 reconstructed /= counts.clamp(min=1) return reconstructed def rgb_to_ycbcr(rgb_image): if isinstance(rgb_image, torch.Tensor): rgb_image = rgb_image.cpu().numpy() if rgb_image.shape[0] == 3: rgb_image = np.transpose(rgb_image, (1, 2, 0)) R, G, B = rgb_image[:, :, 0], rgb_image[:, :, 1], rgb_image[:, :, 2] Y = 0.299 * R + 0.587 * G + 0.114 * B return Y def calculate_metrics(original, reconstructed): original_np = original.cpu().numpy() reconstructed_np = reconstructed.cpu().numpy() if original_np.shape[0] == 3: original_np_hwc = np.transpose(original_np, (1, 2, 0)) reconstructed_np_hwc = np.transpose(reconstructed_np, (1, 2, 0)) else: original_np_hwc = original_np reconstructed_np_hwc = reconstructed_np psnr_rgb = psnr(original_np_hwc, reconstructed_np_hwc, data_range=1.0) ssim_rgb = ssim(original_np_hwc, reconstructed_np_hwc, channel_axis=2, data_range=1.0, win_size=11) y_original = rgb_to_ycbcr(original_np) y_reconstructed = rgb_to_ycbcr(reconstructed_np) psnr_y = psnr(y_original, y_reconstructed, data_range=1.0) ssim_y = ssim(y_original, y_reconstructed, data_range=1.0, win_size=11) return {'PSNR_RGB': psnr_rgb, 'SSIM_RGB': ssim_rgb, 'PSNR_Y': psnr_y, 'SSIM_Y': ssim_y} def calculate_entropy(tensor): symbols = tensor.flatten() _, counts = torch.unique(symbols, return_counts=True) probs = counts.float() / symbols.numel() entropy = -torch.sum(probs * torch.log2(probs + 1e-10)) return entropy * symbols.numel() MODEL_LIST = ['DUALF_D_v_Best_High_Bit_Rate.pth', 'DUALF_D_v_Low_Bit_Rate.pth', 'DUALF_D_v_High_Bit_Rate.pth', 'DUALF_D_v_Mid_Bit_Rate.pth'] QP_LIST = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 2.9, 3.0] model_cache = {} def load_model_for_gradio(model_filename): if model_filename in model_cache: return model_cache[model_filename] model = VAE(latent_channels).to(device) model_path = os.path.join(save_path, model_filename) if not os.path.exists(model_path): raise FileNotFoundError(f"Model file not found at {model_path}. Please place models in the '{save_path}' directory.") state_dict = torch.load(model_path, map_location=device) try: spatial_cdf_size = state_dict['encoder.spatial_hyperprior.entropy_bottleneck._quantized_cdf'].size(1) angular_cdf_size = state_dict['encoder.angular_hyperprior.entropy_bottleneck._quantized_cdf'].size(1) model.encoder.spatial_hyperprior.entropy_bottleneck._offset = torch.zeros(64, device=device) model.encoder.spatial_hyperprior.entropy_bottleneck._quantized_cdf = torch.zeros(64, spatial_cdf_size, device=device) model.encoder.spatial_hyperprior.entropy_bottleneck._cdf_length = torch.zeros(64, dtype=torch.int32, device=device) model.encoder.angular_hyperprior.entropy_bottleneck._offset = torch.zeros(64, device=device) model.encoder.angular_hyperprior.entropy_bottleneck._quantized_cdf = torch.zeros(64, angular_cdf_size, device=device) model.encoder.angular_hyperprior.entropy_bottleneck._cdf_length = torch.zeros(64, dtype=torch.int32, device=device) except KeyError as e: print(f"Warning: Could not find key {e} in state_dict. This may happen with older models. Trying to load without it.") model.load_state_dict(state_dict, strict=False) model.eval() model_cache[model_filename] = model return model def compress_and_display(image_pil, model_filename, qp_value): print(f"Processing with model: {model_filename} and QP: {qp_value}") model = load_model_for_gradio(model_filename) original_tensor = transforms.ToTensor()(image_pil) patch_size_config = (216, 312) step_size_config = (180, 260) patches = extract_patches(image_pil, patch_size=patch_size_config, step_size=step_size_config) patches_tensor = [transforms.ToTensor()(p) for p in patches] total_bits = 0 reconstructed_patches = [] with torch.no_grad(): for patch in patches_tensor: patch = patch.unsqueeze(0).to(device) enc_out = model.encoder(patch) y_s = enc_out["latents"]["y_s"] y_a = enc_out["latents"]["y_a"] step_size = 1.0 / qp_value y_s_quantized = torch.round(y_s / step_size) y_a_quantized = torch.round(y_a / step_size) y_s_dequantized = y_s_quantized * step_size y_a_dequantized = y_a_quantized * step_size latents_dequantized = torch.cat((y_s_dequantized, y_a_dequantized), dim=1) reconstructed = model.decoder(latents_dequantized) reconstructed_patches.append(reconstructed.squeeze(0).cpu()) bits_spatial = calculate_entropy(y_s_quantized) bits_angular = calculate_entropy(y_a_quantized) total_bits += bits_spatial.item() + bits_angular.item() * 0.8 reconstructed_tensor = reassemble_image(reconstructed_patches, image_pil.size, patch_size_config, step_size_config) reconstructed_tensor = reconstructed_tensor.clamp(0, 1) total_pixels = image_pil.width * image_pil.height * 3 bpp = total_bits / total_pixels metrics_dict = calculate_metrics(original_tensor, reconstructed_tensor) metrics_dict['BPP'] = bpp original_np = (original_tensor.cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8) reconstructed_np = (reconstructed_tensor.cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8) comparison_image = np.hstack((original_np, reconstructed_np)) metrics_str = ( f"Bits Per Pixel (BPP): {metrics_dict['BPP']:.4f}\n\n" f"--- RGB Metrics ---\n" f"PSNR (RGB): {metrics_dict['PSNR_RGB']:.2f} dB\n" f"SSIM (RGB): {metrics_dict['SSIM_RGB']:.4f}\n\n" f"--- Luma (Y) Metrics ---\n" f"PSNR (Y): {metrics_dict['PSNR_Y']:.2f} dB\n" f"SSIM (Y): {metrics_dict['SSIM_Y']:.4f}" ) return comparison_image, metrics_str title = "Light Field Image Compression with DUALF_D VAE" description = """ Upload a macropixel image (e.g., a 3x3 view light field image taken with Lytro Illum 2.0) to compress and decompress it using a VAE-based neural network. * You can select different pre-trained model checkpoints and adjust the Quantization Parameter (QP) to see its effect on quality and bitrate. * A lower QP generally results in lower quality and a lower storage requirement, while a higher QP means better quality but requires more storage for image. * Please refer to our [GitHub Page](https://takhtardeshirsoheib.github.io/DUALF_D/index.html) for more details (it will be public after acceptance of our paper) """ with gr.Blocks() as demo: gr.Markdown(f"