DUALF_D / app.py
Soheib Takhtardeshir
second
7775b13
# app.py
# 1. IMPORTS
import gradio as gr
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import numpy as np
import os
import math
import warnings
from compressai.layers import GDN, conv3x3, subpel_conv3x3
from compressai.entropy_models import EntropyBottleneck, GaussianConditional
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
'''
01 - Best for Low Bit Rates ModelvLowBit
005 - Mid Level for Low Bit Rates ModelvMidBit
001 - Mid Level for High Bit Rates ModelvHighBit
0001 - Best for High Bit Rates ModelvBestHighBit
'''
'''
QP - Smaller value is worst quality but best for storage
'''
warnings.filterwarnings("ignore", "Inputs have mismatched dtype", UserWarning)
filt_n = 128
latent_channels = 128
device = "cuda:0" if torch.cuda.is_available() else "cpu"
save_path = "./checkpoint/"
# 3. MODEL DEFINITIONS (from model.py)
def get_scale_table(min_val=0.11, max_val=256, levels=64):
"""Get the scale table as a list of floats."""
return [float(f) for f in torch.exp(torch.linspace(math.log(min_val), math.log(max_val), levels))]
class SpatialEncoder(nn.Module):
def __init__(self):
super(SpatialEncoder, self).__init__()
self.conv_layers_S1 = nn.Sequential(nn.Conv2d(3, filt_n, kernel_size=5, stride=1, padding=1, dilation=3), GDN(filt_n))
self.conv_layers_S2 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n))
self.conv_layers_S3 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n))
self.conv_layers_S4 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n))
self.conv_layers_S5 = nn.Sequential(nn.Conv2d(filt_n, 64, kernel_size=5, stride=3, padding=1), GDN(64))
def forward(self, x):
x = self.conv_layers_S1(x)
x = self.conv_layers_S2(x)
x = self.conv_layers_S3(x)
x = self.conv_layers_S4(x)
x = self.conv_layers_S5(x)
return x
class AngularEncoder(nn.Module):
def __init__(self):
super(AngularEncoder, self).__init__()
self.conv_layers_A1 = nn.Sequential(nn.Conv2d(3, filt_n, kernel_size=3, stride=3, padding=1), GDN(filt_n))
self.conv_layers_A2 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n))
self.conv_layers_A3 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n))
self.conv_layers_A4 = nn.Sequential(nn.Conv2d(filt_n, 64, kernel_size=5, stride=2, padding=1), GDN(64))
def forward(self, x):
x = self.conv_layers_A1(x)
x = self.conv_layers_A2(x)
x = self.conv_layers_A3(x)
x = self.conv_layers_A4(x)
return x
class HyperpriorNetwork(nn.Module):
def __init__(self, channels):
super().__init__()
self.entropy_bottleneck = EntropyBottleneck(channels)
self.h_a = nn.Sequential(
conv3x3(channels, channels), nn.LeakyReLU(inplace=True),
conv3x3(channels, channels, stride=2), nn.LeakyReLU(inplace=True),
conv3x3(channels, channels, stride=2),
)
self.h_s = nn.Sequential(
conv3x3(channels, channels), nn.LeakyReLU(inplace=True),
subpel_conv3x3(channels, channels, 2), nn.LeakyReLU(inplace=True),
subpel_conv3x3(channels, channels, 2), nn.LeakyReLU(inplace=True),
conv3x3(channels, channels),
)
def forward(self, x):
z = self.h_a(x)
z_hat, z_likelihoods = self.entropy_bottleneck(z)
scales = torch.exp(self.h_s(z_hat))
return scales, z_likelihoods
class Encoder(nn.Module):
def __init__(self, latent_channels):
super().__init__()
self.spatial_encoder = SpatialEncoder()
self.angular_encoder = AngularEncoder()
self.spatial_hyperprior = HyperpriorNetwork(64)
self.angular_hyperprior = HyperpriorNetwork(64)
self.entropy_model_s = GaussianConditional(get_scale_table())
self.entropy_model_a = GaussianConditional(get_scale_table())
def forward(self, x):
y_s = self.spatial_encoder(x)
y_a = self.angular_encoder(x)
scales_s, z_likelihood_s = self.spatial_hyperprior(y_s)
scales_a, z_likelihood_a = self.angular_hyperprior(y_a)
z_s, likelihood_s = self.entropy_model_s(y_s, scales_s)
z_a, likelihood_a = self.entropy_model_a(y_a, scales_a)
concatenated = torch.cat((z_s, z_a), dim=1)
return {
"y_hat": concatenated,
"latents": {"y_s": y_s, "y_a": y_a},
"likelihoods": {"y_s": likelihood_s, "y_a": likelihood_a, "z_s": z_likelihood_s, "z_a": z_likelihood_a}
}
class Decoder(nn.Module):
def __init__(self, latent_channels):
super().__init__()
self.initial_layer = nn.Sequential(nn.ConvTranspose2d(latent_channels, filt_n, kernel_size=5, stride=3, padding=0), GDN(filt_n, inverse=True))
self.conv_layers_D1 = nn.Sequential(nn.ConvTranspose2d(filt_n, filt_n, kernel_size=4, stride=2, padding=0), GDN(filt_n, inverse=True))
self.conv_layers_D2 = nn.Sequential(nn.ConvTranspose2d(filt_n, filt_n, kernel_size=4, stride=2, padding=1), GDN(filt_n, inverse=True))
self.conv_layers_D3 = nn.Sequential(nn.ConvTranspose2d(filt_n, 3, kernel_size=4, stride=2, padding=1), nn.Sigmoid())
def forward(self, z):
x = self.initial_layer(z)
x = self.conv_layers_D1(x)
x = self.conv_layers_D2(x)
x = self.conv_layers_D3(x)
return x
class VAE(nn.Module):
def __init__(self, latent_channels):
super().__init__()
self.encoder = Encoder(latent_channels)
self.decoder = Decoder(latent_channels)
def forward(self, x):
enc_out = self.encoder(x)
dec_out = self.decoder(enc_out["y_hat"])
return {"x_hat": dec_out, "likelihoods": enc_out["likelihoods"], "latents": enc_out["latents"]}
def extract_patches(image, patch_size=(216, 312), step_size=(180, 260)):
patches = []
img_width, img_height = image.size
for y in range(0, img_height - patch_size[0] + 1, step_size[0]):
for x in range(0, img_width - patch_size[1] + 1, step_size[1]):
box = (x, y, x + patch_size[1], y + patch_size[0])
patch = image.crop(box)
patches.append(patch)
if len(patches) == 49:
return patches
return patches
def reassemble_image(patches, original_size, patch_size, step_size):
original_width, original_height = original_size
reconstructed = torch.zeros((3, original_height, original_width), device='cpu')
counts = torch.zeros_like(reconstructed)
patch_idx = 0
for y in range(0, original_height - patch_size[0] + 1, step_size[0]):
for x in range(0, original_width - patch_size[1] + 1, step_size[1]):
if patch_idx >= len(patches):
break
patch = patches[patch_idx]
reconstructed[:, y:y + patch_size[0], x:x + patch_size[1]] += patch
counts[:, y:y + patch_size[0], x:x + patch_size[1]] += 1
patch_idx += 1
reconstructed /= counts.clamp(min=1)
return reconstructed
def rgb_to_ycbcr(rgb_image):
if isinstance(rgb_image, torch.Tensor):
rgb_image = rgb_image.cpu().numpy()
if rgb_image.shape[0] == 3:
rgb_image = np.transpose(rgb_image, (1, 2, 0))
R, G, B = rgb_image[:, :, 0], rgb_image[:, :, 1], rgb_image[:, :, 2]
Y = 0.299 * R + 0.587 * G + 0.114 * B
return Y
def calculate_metrics(original, reconstructed):
original_np = original.cpu().numpy()
reconstructed_np = reconstructed.cpu().numpy()
if original_np.shape[0] == 3:
original_np_hwc = np.transpose(original_np, (1, 2, 0))
reconstructed_np_hwc = np.transpose(reconstructed_np, (1, 2, 0))
else:
original_np_hwc = original_np
reconstructed_np_hwc = reconstructed_np
psnr_rgb = psnr(original_np_hwc, reconstructed_np_hwc, data_range=1.0)
ssim_rgb = ssim(original_np_hwc, reconstructed_np_hwc, channel_axis=2, data_range=1.0, win_size=11)
y_original = rgb_to_ycbcr(original_np)
y_reconstructed = rgb_to_ycbcr(reconstructed_np)
psnr_y = psnr(y_original, y_reconstructed, data_range=1.0)
ssim_y = ssim(y_original, y_reconstructed, data_range=1.0, win_size=11)
return {'PSNR_RGB': psnr_rgb, 'SSIM_RGB': ssim_rgb, 'PSNR_Y': psnr_y, 'SSIM_Y': ssim_y}
def calculate_entropy(tensor):
symbols = tensor.flatten()
_, counts = torch.unique(symbols, return_counts=True)
probs = counts.float() / symbols.numel()
entropy = -torch.sum(probs * torch.log2(probs + 1e-10))
return entropy * symbols.numel()
MODEL_LIST = ['DUALF_D_v_Best_High_Bit_Rate.pth', 'DUALF_D_v_Low_Bit_Rate.pth', 'DUALF_D_v_High_Bit_Rate.pth', 'DUALF_D_v_Mid_Bit_Rate.pth']
QP_LIST = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 2.9, 3.0]
model_cache = {}
def load_model_for_gradio(model_filename):
if model_filename in model_cache:
return model_cache[model_filename]
model = VAE(latent_channels).to(device)
model_path = os.path.join(save_path, model_filename)
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found at {model_path}. Please place models in the '{save_path}' directory.")
state_dict = torch.load(model_path, map_location=device)
try:
spatial_cdf_size = state_dict['encoder.spatial_hyperprior.entropy_bottleneck._quantized_cdf'].size(1)
angular_cdf_size = state_dict['encoder.angular_hyperprior.entropy_bottleneck._quantized_cdf'].size(1)
model.encoder.spatial_hyperprior.entropy_bottleneck._offset = torch.zeros(64, device=device)
model.encoder.spatial_hyperprior.entropy_bottleneck._quantized_cdf = torch.zeros(64, spatial_cdf_size, device=device)
model.encoder.spatial_hyperprior.entropy_bottleneck._cdf_length = torch.zeros(64, dtype=torch.int32, device=device)
model.encoder.angular_hyperprior.entropy_bottleneck._offset = torch.zeros(64, device=device)
model.encoder.angular_hyperprior.entropy_bottleneck._quantized_cdf = torch.zeros(64, angular_cdf_size, device=device)
model.encoder.angular_hyperprior.entropy_bottleneck._cdf_length = torch.zeros(64, dtype=torch.int32, device=device)
except KeyError as e:
print(f"Warning: Could not find key {e} in state_dict. This may happen with older models. Trying to load without it.")
model.load_state_dict(state_dict, strict=False)
model.eval()
model_cache[model_filename] = model
return model
def compress_and_display(image_pil, model_filename, qp_value):
print(f"Processing with model: {model_filename} and QP: {qp_value}")
model = load_model_for_gradio(model_filename)
original_tensor = transforms.ToTensor()(image_pil)
patch_size_config = (216, 312)
step_size_config = (180, 260)
patches = extract_patches(image_pil, patch_size=patch_size_config, step_size=step_size_config)
patches_tensor = [transforms.ToTensor()(p) for p in patches]
total_bits = 0
reconstructed_patches = []
with torch.no_grad():
for patch in patches_tensor:
patch = patch.unsqueeze(0).to(device)
enc_out = model.encoder(patch)
y_s = enc_out["latents"]["y_s"]
y_a = enc_out["latents"]["y_a"]
step_size = 1.0 / qp_value
y_s_quantized = torch.round(y_s / step_size)
y_a_quantized = torch.round(y_a / step_size)
y_s_dequantized = y_s_quantized * step_size
y_a_dequantized = y_a_quantized * step_size
latents_dequantized = torch.cat((y_s_dequantized, y_a_dequantized), dim=1)
reconstructed = model.decoder(latents_dequantized)
reconstructed_patches.append(reconstructed.squeeze(0).cpu())
bits_spatial = calculate_entropy(y_s_quantized)
bits_angular = calculate_entropy(y_a_quantized)
total_bits += bits_spatial.item() + bits_angular.item() * 0.8
reconstructed_tensor = reassemble_image(reconstructed_patches, image_pil.size, patch_size_config, step_size_config)
reconstructed_tensor = reconstructed_tensor.clamp(0, 1)
total_pixels = image_pil.width * image_pil.height * 3
bpp = total_bits / total_pixels
metrics_dict = calculate_metrics(original_tensor, reconstructed_tensor)
metrics_dict['BPP'] = bpp
original_np = (original_tensor.cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8)
reconstructed_np = (reconstructed_tensor.cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8)
comparison_image = np.hstack((original_np, reconstructed_np))
metrics_str = (
f"Bits Per Pixel (BPP): {metrics_dict['BPP']:.4f}\n\n"
f"--- RGB Metrics ---\n"
f"PSNR (RGB): {metrics_dict['PSNR_RGB']:.2f} dB\n"
f"SSIM (RGB): {metrics_dict['SSIM_RGB']:.4f}\n\n"
f"--- Luma (Y) Metrics ---\n"
f"PSNR (Y): {metrics_dict['PSNR_Y']:.2f} dB\n"
f"SSIM (Y): {metrics_dict['SSIM_Y']:.4f}"
)
return comparison_image, metrics_str
title = "Light Field Image Compression with DUALF_D VAE"
description = """
Upload a macropixel image (e.g., a 3x3 view light field image taken with Lytro Illum 2.0) to compress and decompress it using a VAE-based neural network.
* You can select different pre-trained model checkpoints and adjust the Quantization Parameter (QP) to see its effect on quality and bitrate.
* A lower QP generally results in lower quality and a lower storage requirement, while a higher QP means better quality but requires more storage for image.
* Please refer to our [GitHub Page](https://takhtardeshirsoheib.github.io/DUALF_D/index.html) for more details (it will be public after acceptance of our paper)
"""
with gr.Blocks() as demo:
gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>")
gr.Markdown(description)
with gr.Row():
with gr.Column(scale=1):
input_image = gr.Image(type="pil", label="Upload Macropixel Image")
model_selector = gr.Dropdown(choices=MODEL_LIST, value=MODEL_LIST[3], label="Selected Model")
qp_selector = gr.Dropdown(choices=QP_LIST, value=1.0, label="Selected Quantization Parameter (QP)")
submit_button = gr.Button("Compress and Analyze")
with gr.Column(scale=2):
output_comparison = gr.Image(label="Original vs. Compressed")
output_metrics = gr.Textbox(
label="Performance Metrics",
lines=10,
max_lines=14,
interactive=False)
submit_button.click(
fn=compress_and_display,
inputs=[input_image, model_selector, qp_selector],
outputs=[output_comparison, output_metrics]
)
with gr.Row():
gr.Examples(
examples=[
["./samples/macropixel_059.png", MODEL_LIST[3], 0.5],
["./samples/macropixel_033.png", MODEL_LIST[2], 0.5],
["./samples/macropixel_028.png", MODEL_LIST[3], 2.0],
["./samples/macropixel_026.png", MODEL_LIST[3], 2.5],
["./samples/macropixel_019.png", MODEL_LIST[3], 2.6],
["./samples/macropixel_203.png", MODEL_LIST[3], 2.8],
["./samples/macropixel_923.png", MODEL_LIST[3], 3.0]
],
inputs=[input_image, model_selector, qp_selector]
)
if __name__ == "__main__":
demo.launch()