|
|
|
|
|
|
|
|
import gradio as gr |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from torchvision import transforms |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
import os |
|
|
import math |
|
|
import warnings |
|
|
from compressai.layers import GDN, conv3x3, subpel_conv3x3 |
|
|
from compressai.entropy_models import EntropyBottleneck, GaussianConditional |
|
|
from skimage.metrics import structural_similarity as ssim |
|
|
from skimage.metrics import peak_signal_noise_ratio as psnr |
|
|
|
|
|
''' |
|
|
01 - Best for Low Bit Rates ModelvLowBit |
|
|
005 - Mid Level for Low Bit Rates ModelvMidBit |
|
|
001 - Mid Level for High Bit Rates ModelvHighBit |
|
|
0001 - Best for High Bit Rates ModelvBestHighBit |
|
|
''' |
|
|
|
|
|
''' |
|
|
QP - Smaller value is worst quality but best for storage |
|
|
''' |
|
|
|
|
|
warnings.filterwarnings("ignore", "Inputs have mismatched dtype", UserWarning) |
|
|
filt_n = 128 |
|
|
latent_channels = 128 |
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
|
save_path = "./checkpoint/" |
|
|
|
|
|
|
|
|
def get_scale_table(min_val=0.11, max_val=256, levels=64): |
|
|
"""Get the scale table as a list of floats.""" |
|
|
return [float(f) for f in torch.exp(torch.linspace(math.log(min_val), math.log(max_val), levels))] |
|
|
|
|
|
class SpatialEncoder(nn.Module): |
|
|
def __init__(self): |
|
|
super(SpatialEncoder, self).__init__() |
|
|
self.conv_layers_S1 = nn.Sequential(nn.Conv2d(3, filt_n, kernel_size=5, stride=1, padding=1, dilation=3), GDN(filt_n)) |
|
|
self.conv_layers_S2 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) |
|
|
self.conv_layers_S3 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) |
|
|
self.conv_layers_S4 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) |
|
|
self.conv_layers_S5 = nn.Sequential(nn.Conv2d(filt_n, 64, kernel_size=5, stride=3, padding=1), GDN(64)) |
|
|
def forward(self, x): |
|
|
x = self.conv_layers_S1(x) |
|
|
x = self.conv_layers_S2(x) |
|
|
x = self.conv_layers_S3(x) |
|
|
x = self.conv_layers_S4(x) |
|
|
x = self.conv_layers_S5(x) |
|
|
return x |
|
|
|
|
|
class AngularEncoder(nn.Module): |
|
|
def __init__(self): |
|
|
super(AngularEncoder, self).__init__() |
|
|
self.conv_layers_A1 = nn.Sequential(nn.Conv2d(3, filt_n, kernel_size=3, stride=3, padding=1), GDN(filt_n)) |
|
|
self.conv_layers_A2 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) |
|
|
self.conv_layers_A3 = nn.Sequential(nn.Conv2d(filt_n, filt_n, kernel_size=5, stride=2, padding=1), GDN(filt_n)) |
|
|
self.conv_layers_A4 = nn.Sequential(nn.Conv2d(filt_n, 64, kernel_size=5, stride=2, padding=1), GDN(64)) |
|
|
def forward(self, x): |
|
|
x = self.conv_layers_A1(x) |
|
|
x = self.conv_layers_A2(x) |
|
|
x = self.conv_layers_A3(x) |
|
|
x = self.conv_layers_A4(x) |
|
|
return x |
|
|
|
|
|
class HyperpriorNetwork(nn.Module): |
|
|
def __init__(self, channels): |
|
|
super().__init__() |
|
|
self.entropy_bottleneck = EntropyBottleneck(channels) |
|
|
self.h_a = nn.Sequential( |
|
|
conv3x3(channels, channels), nn.LeakyReLU(inplace=True), |
|
|
conv3x3(channels, channels, stride=2), nn.LeakyReLU(inplace=True), |
|
|
conv3x3(channels, channels, stride=2), |
|
|
) |
|
|
self.h_s = nn.Sequential( |
|
|
conv3x3(channels, channels), nn.LeakyReLU(inplace=True), |
|
|
subpel_conv3x3(channels, channels, 2), nn.LeakyReLU(inplace=True), |
|
|
subpel_conv3x3(channels, channels, 2), nn.LeakyReLU(inplace=True), |
|
|
conv3x3(channels, channels), |
|
|
) |
|
|
def forward(self, x): |
|
|
z = self.h_a(x) |
|
|
z_hat, z_likelihoods = self.entropy_bottleneck(z) |
|
|
scales = torch.exp(self.h_s(z_hat)) |
|
|
return scales, z_likelihoods |
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__(self, latent_channels): |
|
|
super().__init__() |
|
|
self.spatial_encoder = SpatialEncoder() |
|
|
self.angular_encoder = AngularEncoder() |
|
|
self.spatial_hyperprior = HyperpriorNetwork(64) |
|
|
self.angular_hyperprior = HyperpriorNetwork(64) |
|
|
self.entropy_model_s = GaussianConditional(get_scale_table()) |
|
|
self.entropy_model_a = GaussianConditional(get_scale_table()) |
|
|
def forward(self, x): |
|
|
y_s = self.spatial_encoder(x) |
|
|
y_a = self.angular_encoder(x) |
|
|
scales_s, z_likelihood_s = self.spatial_hyperprior(y_s) |
|
|
scales_a, z_likelihood_a = self.angular_hyperprior(y_a) |
|
|
z_s, likelihood_s = self.entropy_model_s(y_s, scales_s) |
|
|
z_a, likelihood_a = self.entropy_model_a(y_a, scales_a) |
|
|
concatenated = torch.cat((z_s, z_a), dim=1) |
|
|
return { |
|
|
"y_hat": concatenated, |
|
|
"latents": {"y_s": y_s, "y_a": y_a}, |
|
|
"likelihoods": {"y_s": likelihood_s, "y_a": likelihood_a, "z_s": z_likelihood_s, "z_a": z_likelihood_a} |
|
|
} |
|
|
|
|
|
class Decoder(nn.Module): |
|
|
def __init__(self, latent_channels): |
|
|
super().__init__() |
|
|
self.initial_layer = nn.Sequential(nn.ConvTranspose2d(latent_channels, filt_n, kernel_size=5, stride=3, padding=0), GDN(filt_n, inverse=True)) |
|
|
self.conv_layers_D1 = nn.Sequential(nn.ConvTranspose2d(filt_n, filt_n, kernel_size=4, stride=2, padding=0), GDN(filt_n, inverse=True)) |
|
|
self.conv_layers_D2 = nn.Sequential(nn.ConvTranspose2d(filt_n, filt_n, kernel_size=4, stride=2, padding=1), GDN(filt_n, inverse=True)) |
|
|
self.conv_layers_D3 = nn.Sequential(nn.ConvTranspose2d(filt_n, 3, kernel_size=4, stride=2, padding=1), nn.Sigmoid()) |
|
|
def forward(self, z): |
|
|
x = self.initial_layer(z) |
|
|
x = self.conv_layers_D1(x) |
|
|
x = self.conv_layers_D2(x) |
|
|
x = self.conv_layers_D3(x) |
|
|
return x |
|
|
|
|
|
class VAE(nn.Module): |
|
|
def __init__(self, latent_channels): |
|
|
super().__init__() |
|
|
self.encoder = Encoder(latent_channels) |
|
|
self.decoder = Decoder(latent_channels) |
|
|
def forward(self, x): |
|
|
enc_out = self.encoder(x) |
|
|
dec_out = self.decoder(enc_out["y_hat"]) |
|
|
return {"x_hat": dec_out, "likelihoods": enc_out["likelihoods"], "latents": enc_out["latents"]} |
|
|
|
|
|
def extract_patches(image, patch_size=(216, 312), step_size=(180, 260)): |
|
|
patches = [] |
|
|
img_width, img_height = image.size |
|
|
for y in range(0, img_height - patch_size[0] + 1, step_size[0]): |
|
|
for x in range(0, img_width - patch_size[1] + 1, step_size[1]): |
|
|
box = (x, y, x + patch_size[1], y + patch_size[0]) |
|
|
patch = image.crop(box) |
|
|
patches.append(patch) |
|
|
if len(patches) == 49: |
|
|
return patches |
|
|
return patches |
|
|
|
|
|
def reassemble_image(patches, original_size, patch_size, step_size): |
|
|
original_width, original_height = original_size |
|
|
reconstructed = torch.zeros((3, original_height, original_width), device='cpu') |
|
|
counts = torch.zeros_like(reconstructed) |
|
|
patch_idx = 0 |
|
|
for y in range(0, original_height - patch_size[0] + 1, step_size[0]): |
|
|
for x in range(0, original_width - patch_size[1] + 1, step_size[1]): |
|
|
if patch_idx >= len(patches): |
|
|
break |
|
|
patch = patches[patch_idx] |
|
|
reconstructed[:, y:y + patch_size[0], x:x + patch_size[1]] += patch |
|
|
counts[:, y:y + patch_size[0], x:x + patch_size[1]] += 1 |
|
|
patch_idx += 1 |
|
|
reconstructed /= counts.clamp(min=1) |
|
|
return reconstructed |
|
|
|
|
|
def rgb_to_ycbcr(rgb_image): |
|
|
if isinstance(rgb_image, torch.Tensor): |
|
|
rgb_image = rgb_image.cpu().numpy() |
|
|
if rgb_image.shape[0] == 3: |
|
|
rgb_image = np.transpose(rgb_image, (1, 2, 0)) |
|
|
R, G, B = rgb_image[:, :, 0], rgb_image[:, :, 1], rgb_image[:, :, 2] |
|
|
Y = 0.299 * R + 0.587 * G + 0.114 * B |
|
|
return Y |
|
|
|
|
|
def calculate_metrics(original, reconstructed): |
|
|
original_np = original.cpu().numpy() |
|
|
reconstructed_np = reconstructed.cpu().numpy() |
|
|
if original_np.shape[0] == 3: |
|
|
original_np_hwc = np.transpose(original_np, (1, 2, 0)) |
|
|
reconstructed_np_hwc = np.transpose(reconstructed_np, (1, 2, 0)) |
|
|
else: |
|
|
original_np_hwc = original_np |
|
|
reconstructed_np_hwc = reconstructed_np |
|
|
|
|
|
psnr_rgb = psnr(original_np_hwc, reconstructed_np_hwc, data_range=1.0) |
|
|
ssim_rgb = ssim(original_np_hwc, reconstructed_np_hwc, channel_axis=2, data_range=1.0, win_size=11) |
|
|
|
|
|
y_original = rgb_to_ycbcr(original_np) |
|
|
y_reconstructed = rgb_to_ycbcr(reconstructed_np) |
|
|
|
|
|
psnr_y = psnr(y_original, y_reconstructed, data_range=1.0) |
|
|
ssim_y = ssim(y_original, y_reconstructed, data_range=1.0, win_size=11) |
|
|
|
|
|
return {'PSNR_RGB': psnr_rgb, 'SSIM_RGB': ssim_rgb, 'PSNR_Y': psnr_y, 'SSIM_Y': ssim_y} |
|
|
|
|
|
def calculate_entropy(tensor): |
|
|
symbols = tensor.flatten() |
|
|
_, counts = torch.unique(symbols, return_counts=True) |
|
|
probs = counts.float() / symbols.numel() |
|
|
entropy = -torch.sum(probs * torch.log2(probs + 1e-10)) |
|
|
return entropy * symbols.numel() |
|
|
|
|
|
MODEL_LIST = ['DUALF_D_v_Best_High_Bit_Rate.pth', 'DUALF_D_v_Low_Bit_Rate.pth', 'DUALF_D_v_High_Bit_Rate.pth', 'DUALF_D_v_Mid_Bit_Rate.pth'] |
|
|
QP_LIST = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 2.9, 3.0] |
|
|
model_cache = {} |
|
|
|
|
|
def load_model_for_gradio(model_filename): |
|
|
if model_filename in model_cache: |
|
|
return model_cache[model_filename] |
|
|
|
|
|
model = VAE(latent_channels).to(device) |
|
|
model_path = os.path.join(save_path, model_filename) |
|
|
if not os.path.exists(model_path): |
|
|
raise FileNotFoundError(f"Model file not found at {model_path}. Please place models in the '{save_path}' directory.") |
|
|
|
|
|
state_dict = torch.load(model_path, map_location=device) |
|
|
|
|
|
try: |
|
|
spatial_cdf_size = state_dict['encoder.spatial_hyperprior.entropy_bottleneck._quantized_cdf'].size(1) |
|
|
angular_cdf_size = state_dict['encoder.angular_hyperprior.entropy_bottleneck._quantized_cdf'].size(1) |
|
|
|
|
|
model.encoder.spatial_hyperprior.entropy_bottleneck._offset = torch.zeros(64, device=device) |
|
|
model.encoder.spatial_hyperprior.entropy_bottleneck._quantized_cdf = torch.zeros(64, spatial_cdf_size, device=device) |
|
|
model.encoder.spatial_hyperprior.entropy_bottleneck._cdf_length = torch.zeros(64, dtype=torch.int32, device=device) |
|
|
|
|
|
model.encoder.angular_hyperprior.entropy_bottleneck._offset = torch.zeros(64, device=device) |
|
|
model.encoder.angular_hyperprior.entropy_bottleneck._quantized_cdf = torch.zeros(64, angular_cdf_size, device=device) |
|
|
model.encoder.angular_hyperprior.entropy_bottleneck._cdf_length = torch.zeros(64, dtype=torch.int32, device=device) |
|
|
except KeyError as e: |
|
|
print(f"Warning: Could not find key {e} in state_dict. This may happen with older models. Trying to load without it.") |
|
|
|
|
|
model.load_state_dict(state_dict, strict=False) |
|
|
model.eval() |
|
|
model_cache[model_filename] = model |
|
|
return model |
|
|
|
|
|
def compress_and_display(image_pil, model_filename, qp_value): |
|
|
print(f"Processing with model: {model_filename} and QP: {qp_value}") |
|
|
|
|
|
model = load_model_for_gradio(model_filename) |
|
|
|
|
|
original_tensor = transforms.ToTensor()(image_pil) |
|
|
patch_size_config = (216, 312) |
|
|
step_size_config = (180, 260) |
|
|
patches = extract_patches(image_pil, patch_size=patch_size_config, step_size=step_size_config) |
|
|
patches_tensor = [transforms.ToTensor()(p) for p in patches] |
|
|
|
|
|
total_bits = 0 |
|
|
reconstructed_patches = [] |
|
|
with torch.no_grad(): |
|
|
for patch in patches_tensor: |
|
|
patch = patch.unsqueeze(0).to(device) |
|
|
enc_out = model.encoder(patch) |
|
|
y_s = enc_out["latents"]["y_s"] |
|
|
y_a = enc_out["latents"]["y_a"] |
|
|
|
|
|
step_size = 1.0 / qp_value |
|
|
y_s_quantized = torch.round(y_s / step_size) |
|
|
y_a_quantized = torch.round(y_a / step_size) |
|
|
|
|
|
y_s_dequantized = y_s_quantized * step_size |
|
|
y_a_dequantized = y_a_quantized * step_size |
|
|
latents_dequantized = torch.cat((y_s_dequantized, y_a_dequantized), dim=1) |
|
|
|
|
|
reconstructed = model.decoder(latents_dequantized) |
|
|
reconstructed_patches.append(reconstructed.squeeze(0).cpu()) |
|
|
|
|
|
bits_spatial = calculate_entropy(y_s_quantized) |
|
|
bits_angular = calculate_entropy(y_a_quantized) |
|
|
total_bits += bits_spatial.item() + bits_angular.item() * 0.8 |
|
|
|
|
|
reconstructed_tensor = reassemble_image(reconstructed_patches, image_pil.size, patch_size_config, step_size_config) |
|
|
reconstructed_tensor = reconstructed_tensor.clamp(0, 1) |
|
|
|
|
|
total_pixels = image_pil.width * image_pil.height * 3 |
|
|
bpp = total_bits / total_pixels |
|
|
metrics_dict = calculate_metrics(original_tensor, reconstructed_tensor) |
|
|
metrics_dict['BPP'] = bpp |
|
|
|
|
|
original_np = (original_tensor.cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8) |
|
|
reconstructed_np = (reconstructed_tensor.cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8) |
|
|
|
|
|
comparison_image = np.hstack((original_np, reconstructed_np)) |
|
|
metrics_str = ( |
|
|
f"Bits Per Pixel (BPP): {metrics_dict['BPP']:.4f}\n\n" |
|
|
f"--- RGB Metrics ---\n" |
|
|
f"PSNR (RGB): {metrics_dict['PSNR_RGB']:.2f} dB\n" |
|
|
f"SSIM (RGB): {metrics_dict['SSIM_RGB']:.4f}\n\n" |
|
|
f"--- Luma (Y) Metrics ---\n" |
|
|
f"PSNR (Y): {metrics_dict['PSNR_Y']:.2f} dB\n" |
|
|
f"SSIM (Y): {metrics_dict['SSIM_Y']:.4f}" |
|
|
) |
|
|
return comparison_image, metrics_str |
|
|
|
|
|
title = "Light Field Image Compression with DUALF_D VAE" |
|
|
description = """ |
|
|
Upload a macropixel image (e.g., a 3x3 view light field image taken with Lytro Illum 2.0) to compress and decompress it using a VAE-based neural network. |
|
|
|
|
|
* You can select different pre-trained model checkpoints and adjust the Quantization Parameter (QP) to see its effect on quality and bitrate. |
|
|
* A lower QP generally results in lower quality and a lower storage requirement, while a higher QP means better quality but requires more storage for image. |
|
|
* Please refer to our [GitHub Page](https://takhtardeshirsoheib.github.io/DUALF_D/index.html) for more details (it will be public after acceptance of our paper) |
|
|
""" |
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>") |
|
|
gr.Markdown(description) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
input_image = gr.Image(type="pil", label="Upload Macropixel Image") |
|
|
model_selector = gr.Dropdown(choices=MODEL_LIST, value=MODEL_LIST[3], label="Selected Model") |
|
|
qp_selector = gr.Dropdown(choices=QP_LIST, value=1.0, label="Selected Quantization Parameter (QP)") |
|
|
submit_button = gr.Button("Compress and Analyze") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
output_comparison = gr.Image(label="Original vs. Compressed") |
|
|
output_metrics = gr.Textbox( |
|
|
label="Performance Metrics", |
|
|
lines=10, |
|
|
max_lines=14, |
|
|
interactive=False) |
|
|
|
|
|
submit_button.click( |
|
|
fn=compress_and_display, |
|
|
inputs=[input_image, model_selector, qp_selector], |
|
|
outputs=[output_comparison, output_metrics] |
|
|
) |
|
|
with gr.Row(): |
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["./samples/macropixel_059.png", MODEL_LIST[3], 0.5], |
|
|
["./samples/macropixel_033.png", MODEL_LIST[2], 0.5], |
|
|
["./samples/macropixel_028.png", MODEL_LIST[3], 2.0], |
|
|
["./samples/macropixel_026.png", MODEL_LIST[3], 2.5], |
|
|
["./samples/macropixel_019.png", MODEL_LIST[3], 2.6], |
|
|
["./samples/macropixel_203.png", MODEL_LIST[3], 2.8], |
|
|
["./samples/macropixel_923.png", MODEL_LIST[3], 3.0] |
|
|
], |
|
|
inputs=[input_image, model_selector, qp_selector] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |