"""
app.py — POWERGRID Document Auditor (single-file HuggingFace Spaces build)
=============================================================================
Single-file Gradio app for AI-powered engineering drawing comparison.
Designed for POWERGRID (765/400/132kV AIS/GIS vendor drawing audits).
Pipeline:
Stage 1 — Global Alignment : Phase Correlation + ORB/RANSAC homography
Stage 2 — Region Extraction : Content-aware morphology (no pretrained detector)
Stage 3 — Semantic Matching : ResNet50 embeddings + cosine similarity (position-agnostic)
Stage 4 — Siamese Comparison : ResNet50 patch embeddings + GradCAM heatmaps
Run locally:
python app.py
"""
# ══════════════════════════════════════════════════════════════════════
# IMPORTS
# ══════════════════════════════════════════════════════════════════════
import base64
import io
import logging
import os
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import cv2
import fitz # PyMuPDF
import gradio as gr
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
from scipy.optimize import linear_sum_assignment
from skimage.metrics import structural_similarity as ssim
from torchvision import models, transforms
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# ── Logo: embed as base64 so it works on HuggingFace Spaces (no static folder) ──
def _load_logo_b64(filename: str = "logo_0.png") -> str:
"""Return a data-URI string for the logo, or empty string if file not found."""
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename)
if os.path.exists(logo_path):
with open(logo_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
ext = filename.rsplit(".", 1)[-1].lower()
mime = "image/png" if ext == "png" else f"image/{ext}"
return f"data:{mime};base64,{b64}"
return ""
_LOGO_URI = _load_logo_b64("logo_0.png")
# ══════════════════════════════════════════════════════════════════════
# DATA STRUCTURES
# ══════════════════════════════════════════════════════════════════════
@dataclass
class Region:
"""A detected layout region (axis-aligned bounding box)."""
x: int
y: int
w: int
h: int
label: str = "text_block" # text_block | figure | table | margin
confidence: float = 1.0
@property
def bbox(self) -> Tuple[int, int, int, int]:
return (self.x, self.y, self.x + self.w, self.y + self.h)
@property
def area(self) -> int:
return self.w * self.h
@property
def center(self) -> Tuple[float, float]:
return (self.x + self.w / 2.0, self.y + self.h / 2.0)
def iou(self, other: "Region") -> float:
xa = max(self.x, other.x)
ya = max(self.y, other.y)
xb = min(self.x + self.w, other.x + other.w)
yb = min(self.y + self.h, other.y + other.h)
inter = max(0, xb - xa) * max(0, yb - ya)
union = self.area + other.area - inter
return inter / union if union > 0 else 0.0
@dataclass
class MatchedPair:
"""A matched region pair between old and new documents."""
region_old: Region
region_new: Region
match_score: float
position_cost: float
appearance_cost: float
pixel_diff: float = 0.0
ssim_score: float = 1.0
semantic_diff: float = 0.0
total_change: float = 0.0
heatmap: Optional[np.ndarray] = None
@dataclass
class ComparisonResult:
"""Full comparison result for one document page."""
matched_pairs: List[MatchedPair]
unmatched_old: List[Region]
unmatched_new: List[Region]
global_transform: Optional[np.ndarray]
total_change_pct: float
heatmap: np.ndarray
img_old_aligned: Optional[np.ndarray] = None # aligned OLD, same coord-space as NEW
def summary(self) -> str:
lines = [
f" Global Alignment : {'Applied' if self.global_transform is not None else 'Skipped'}",
f" Matched Pairs : {len(self.matched_pairs)}",
f" Deleted Regions : {len(self.unmatched_old)}",
f" Added Regions : {len(self.unmatched_new)}",
f" Total Change : {self.total_change_pct:.1f}%",
]
changed = [p for p in self.matched_pairs if p.total_change > 0.05]
if changed:
avg_chg = np.mean([p.total_change for p in changed])
lines.append(f" Avg Change (modified regions): {avg_chg:.2f}")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════
# STAGE 1 — GLOBAL ALIGNER
# ══════════════════════════════════════════════════════════════════════
class GlobalAligner:
def __init__(self, orb_features: int = 2000, ransac_threshold: float = 5.0):
self.orb_features = orb_features
self.ransac_threshold = ransac_threshold
def _phase_correlation_shift(self, gray1: np.ndarray, gray2: np.ndarray) -> Tuple[float, float]:
f1 = np.fft.fft2(gray1.astype(np.float32))
f2 = np.fft.fft2(gray2.astype(np.float32))
denom = np.abs(f1 * np.conj(f2)) + 1e-10
cross = (f1 * np.conj(f2)) / denom
corr = np.fft.ifft2(cross).real
y_shift, x_shift = np.unravel_index(np.argmax(corr), corr.shape)
h, w = gray1.shape
if y_shift > h // 2:
y_shift -= h
if x_shift > w // 2:
x_shift -= w
return float(-x_shift), float(-y_shift)
def _orb_affine(self, gray_old: np.ndarray, gray_new: np.ndarray) -> Optional[np.ndarray]:
orb = cv2.ORB_create(nfeatures=self.orb_features)
kp1, des1 = orb.detectAndCompute(gray_old, None)
kp2, des2 = orb.detectAndCompute(gray_new, None)
if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
return None
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
matches = sorted(bf.match(des1, des2), key=lambda m: m.distance)
if len(matches) < 10:
return None
top_k = min(200, len(matches))
# src = OLD keypoints, dst = NEW keypoints
# → M maps OLD→NEW (forward transform), which is what warpAffine expects:
# warpAffine(img_old, M, size) correctly places OLD pixels at their NEW positions.
# BUG that was here: src/dst were swapped (kp2/NEW as src, kp1/OLD as dst),
# giving M that mapped NEW→OLD. warpAffine then doubled the displacement
# instead of correcting it, causing the full-image red/cyan fringe seen in
# the Alignment Check view.
src_pts = np.float32([kp1[m.queryIdx].pt for m in matches[:top_k]]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches[:top_k]]).reshape(-1, 1, 2)
M, mask = cv2.estimateAffinePartial2D(
src_pts, dst_pts, method=cv2.RANSAC,
ransacReprojThreshold=self.ransac_threshold,
)
return M
def align(self, img_old: np.ndarray, img_new: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
g_old = cv2.cvtColor(img_old, cv2.COLOR_RGB2GRAY)
g_new = cv2.cvtColor(img_new, cv2.COLOR_RGB2GRAY)
dx, dy = self._phase_correlation_shift(g_old, g_new)
M = self._orb_affine(g_old, g_new)
if M is None:
M = np.array([[1.0, 0.0, dx], [0.0, 1.0, dy]], dtype=np.float32)
h, w = img_old.shape[:2]
aligned = cv2.warpAffine(
img_old, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(255, 255, 255),
)
return aligned, M
# ══════════════════════════════════════════════════════════════════════
# STAGE 2 — LAYOUT REGION EXTRACTOR
# ══════════════════════════════════════════════════════════════════════
class LayoutRegionExtractor:
def __init__(
self,
min_area_ratio: float = 0.0003,
max_area_ratio: float = 0.92,
dilation_kernel: Tuple[int, int] = (8, 2),
dilation_iters: int = 2,
merge_iou_threshold: float = 0.40,
):
self.min_area_ratio = min_area_ratio
self.max_area_ratio = max_area_ratio
self.dilation_kernel = dilation_kernel
self.dilation_iters = dilation_iters
self.merge_iou_threshold = merge_iou_threshold
def _binarise(self, gray: np.ndarray) -> np.ndarray:
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
_, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
return binary
def _dilate(self, binary: np.ndarray) -> np.ndarray:
k = cv2.getStructuringElement(cv2.MORPH_RECT, self.dilation_kernel)
dilated = cv2.dilate(binary, k, iterations=self.dilation_iters)
k_line = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 1))
dilated = cv2.dilate(dilated, k_line, iterations=1)
k_vert = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 8))
return cv2.morphologyEx(dilated, cv2.MORPH_CLOSE, k_vert)
def _classify(self, patch_gray: np.ndarray, w: int, h: int) -> str:
aspect = w / max(h, 1)
_, binary = cv2.threshold(patch_gray, 127, 255, cv2.THRESH_BINARY_INV)
density = np.sum(binary > 0) / max(w * h, 1)
if density < 0.02:
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) < 3:
return "margin"
if aspect > 4.0 and density > 0.06:
return "text_block"
if 0.4 < aspect < 2.8 and density < 0.25:
return "figure"
if density > 0.18 and aspect > 1.0:
return "table"
return "text_block"
def _merge_overlapping(self, regions: List[Region]) -> List[Region]:
changed = True
while changed:
changed = False
used = [False] * len(regions)
merged: List[Region] = []
for i, r1 in enumerate(regions):
if used[i]:
continue
x0, y0 = r1.x, r1.y
x1, y1 = r1.x + r1.w, r1.y + r1.h
for j, r2 in enumerate(regions):
if i == j or used[j]:
continue
expanded = Region(x0, y0, x1 - x0, y1 - y0)
if expanded.iou(r2) > self.merge_iou_threshold:
x0 = min(x0, r2.x)
y0 = min(y0, r2.y)
x1 = max(x1, r2.x + r2.w)
y1 = max(y1, r2.y + r2.h)
used[j] = True
changed = True
merged.append(Region(x0, y0, x1 - x0, y1 - y0))
used[i] = True
regions = merged
return regions
def extract(self, img_rgb: np.ndarray) -> List[Region]:
h, w = img_rgb.shape[:2]
page_area = h * w
gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
binary = self._binarise(gray)
dilated = self._dilate(binary)
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
candidates: List[Region] = []
for cnt in contours:
rx, ry, rw, rh = cv2.boundingRect(cnt)
area = rw * rh
if area < page_area * self.min_area_ratio:
continue
if area > page_area * self.max_area_ratio:
continue
patch = gray[ry: ry + rh, rx: rx + rw]
label = self._classify(patch, rw, rh)
if label == "margin":
continue
candidates.append(Region(rx, ry, rw, rh, label=label))
regions = self._merge_overlapping(candidates)
regions.sort(key=lambda r: (r.y // 50, r.x))
logger.info("LayoutExtractor: %d regions detected", len(regions))
return regions
# ══════════════════════════════════════════════════════════════════════
# STAGE 3 — HUNGARIAN REGION MATCHER
# ══════════════════════════════════════════════════════════════════════
# ══════════════════════════════════════════════════════════════════════
# STAGE 3 — SEMANTIC RETRIEVAL MATCHER (position-agnostic)
# ══════════════════════════════════════════════════════════════════════
class SemanticRetrievalMatcher:
"""
Replaces HungarianRegionMatcher for layout-shift-robust document comparison.
Strategy
--------
For every region in the NEW page:
1. Extract the patch image from the NEW document.
2. Encode it with the shared ResNet50 backbone → 128-d L2-normalised vector.
Simultaneously encode every OLD region patch.
Build an (N_new × N_old) cosine-similarity matrix.
Run scipy.linear_sum_assignment on −similarity (maximise similarity).
Accept a pair only when similarity ≥ min_similarity.
This means a region that has *moved* (different x/y) but is otherwise
identical will still get similarity ≈ 1.0 and be matched correctly.
"""
def __init__(
self,
encoder: "_SiameseEncoder",
device: torch.device,
min_similarity: float = 0.50,
thumbnail_size: Tuple[int, int] = (224, 224),
):
self.encoder = encoder
self.device = device
self.min_similarity = min_similarity
self._transform = transforms.Compose([
transforms.Resize(thumbnail_size),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# ------------------------------------------------------------------
def _patch(self, region: Region, img: np.ndarray) -> np.ndarray:
"""Crop a region from the image; returns white 64×64 if empty."""
p = img[region.y: region.y + region.h, region.x: region.x + region.w]
if p.size == 0:
p = np.full((64, 64, 3), 255, dtype=np.uint8)
return p
def _embed(self, patches: List[np.ndarray]) -> torch.Tensor:
"""
Batch-encode a list of patches → (N, 128) normalised embedding tensor.
Runs entirely on self.device with no gradient.
"""
tensors = [
self._transform(Image.fromarray(p)) for p in patches
]
batch = torch.stack(tensors).to(self.device) # (N, 3, 224, 224)
with torch.no_grad():
embeddings, _ = self.encoder.encode(batch) # (N, 128) — already L2-normed
return embeddings
# ------------------------------------------------------------------
def match(
self,
regions_old: List[Region],
regions_new: List[Region],
img_old: np.ndarray,
img_new: np.ndarray,
) -> Tuple[List[MatchedPair], List[Region], List[Region]]:
n_old, n_new = len(regions_old), len(regions_new)
if n_old == 0 or n_new == 0:
return [], list(regions_old), list(regions_new)
# ── 1. Encode both sets of patches ─────────────────────────
patches_old = [self._patch(r, img_old) for r in regions_old]
patches_new = [self._patch(r, img_new) for r in regions_new]
emb_old = self._embed(patches_old) # (n_old, 128)
emb_new = self._embed(patches_new) # (n_new, 128)
# ── 2. Cosine similarity matrix: rows=NEW, cols=OLD ─────────
# L2-normed → dot product == cosine similarity
sim_mat = torch.mm(emb_new, emb_old.T).cpu().numpy() # (n_new, n_old)
# ── 3. Hungarian assignment on −similarity ──────────────────
row_ind, col_ind = linear_sum_assignment(-sim_mat) # maximise sim
matched_pairs: List[MatchedPair] = []
matched_old_idx: set = set()
matched_new_idx: set = set()
for ri, ci in zip(row_ind, col_ind):
sim = float(sim_mat[ri, ci])
if sim < self.min_similarity:
continue # below threshold → treat as unmatched
matched_pairs.append(MatchedPair(
region_old = regions_old[ci],
region_new = regions_new[ri],
match_score = sim,
position_cost = 0.0, # no position penalty
appearance_cost= max(0.0, 1.0 - sim),
))
matched_old_idx.add(ci)
matched_new_idx.add(ri)
unmatched_old = [regions_old[i] for i in range(n_old) if i not in matched_old_idx]
unmatched_new = [regions_new[j] for j in range(n_new) if j not in matched_new_idx]
logger.info(
"SemanticRetrieval: %d matched | %d deleted | %d added "
"(min_sim=%.2f)",
len(matched_pairs), len(unmatched_old), len(unmatched_new),
self.min_similarity,
)
return matched_pairs, unmatched_old, unmatched_new
# ══════════════════════════════════════════════════════════════════════
# STAGE 4 — SIAMESE PATCH COMPARATOR
# ══════════════════════════════════════════════════════════════════════
class _SiameseEncoder(nn.Module):
def __init__(self):
super().__init__()
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
self.features = nn.Sequential(*list(resnet.children())[:-2])
self.pool = resnet.avgpool
self.embed = nn.Sequential(
nn.Linear(2048, 512), nn.ReLU(),
nn.Linear(512, 128),
)
def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
feat_map = self.features(x)
pooled = torch.flatten(self.pool(feat_map), 1)
embed = F.normalize(self.embed(pooled), p=2, dim=1)
return embed, feat_map
def forward(self, x1: torch.Tensor, x2: torch.Tensor):
e1, f1 = self.encode(x1)
e2, f2 = self.encode(x2)
return e1, e2, f1, f2
class SiamesePatchComparator:
def __init__(
self,
device: Optional[torch.device] = None,
encoder: Optional[_SiameseEncoder] = None, # ← shared encoder
):
if device is None:
if torch.cuda.is_available():
device = torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
self.device = device
# Reuse the encoder from SemanticRetrievalMatcher if provided —
# avoids loading ResNet50 weights a second time.
if encoder is not None:
self.model = encoder
logger.info("SiamesePatchComparator: reusing shared encoder on %s", device)
else:
self.model = _SiameseEncoder().to(device).eval()
logger.info("SiamesePatchComparator: created new encoder on %s", device)
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _to_tensor(self, patch_rgb: np.ndarray) -> torch.Tensor:
return self.transform(Image.fromarray(patch_rgb)).unsqueeze(0).to(self.device)
def _grad_cam(
self,
patch_old: np.ndarray,
patch_new: np.ndarray,
target_hw: Tuple[int, int],
) -> np.ndarray:
"""
Grad-CAM spatial change map — WHERE inside the patch the embedding differs.
Method
------
1. Forward patch_old (no grad) → embedding e_old.
2. Forward patch_new (with grad, hooks on last conv block) → embedding e_new
+ feature map F captured by forward hook.
3. Scalar loss = pairwise_distance(e_old.detach(), e_new).
4. loss.backward() → ∂loss/∂F captured by backward hook.
5. Grad-CAM = ReLU( mean_c(∂loss/∂F) · F ) → (7×7) → upsample to patch size.
Pixels with HIGH activation changed the embedding the most → the actual edits.
Returns
-------
np.ndarray shape (target_hw[0], target_hw[1]), float32, values in [0, 1].
"""
t_old = self._to_tensor(patch_old)
t_new = self._to_tensor(patch_new)
feat_store: Dict[str, torch.Tensor] = {}
grad_store: Dict[str, torch.Tensor] = {}
# Hook on the last convolutional block of the shared ResNet50
last_block = self.model.features[-1]
def _fwd(module, inp, out):
feat_store["f"] = out # (1, 2048, 7, 7)
def _bwd(module, grad_in, grad_out):
grad_store["g"] = grad_out[0] # (1, 2048, 7, 7)
h_fwd = last_block.register_forward_hook(_fwd)
h_bwd = last_block.register_full_backward_hook(_bwd)
try:
# e_old — no gradient needed, just a reference point
with torch.no_grad():
e_old, _ = self.model.encode(t_old)
# e_new — gradient flows through this path only
with torch.enable_grad():
self.model.zero_grad()
e_new, _ = self.model.encode(t_new)
dist = F.pairwise_distance(e_old.detach(), e_new)
dist.backward()
finally:
h_fwd.remove()
h_bwd.remove()
if "f" not in feat_store or "g" not in grad_store:
return np.zeros(target_hw, dtype=np.float32)
# Grad-CAM: global-average-pool the gradients, weight feature maps
weights = grad_store["g"].mean(dim=[2, 3], keepdim=True) # (1,2048,1,1)
cam = (weights * feat_store["f"]).sum(dim=1).squeeze() # (7, 7)
cam = F.relu(cam)
cam_max = cam.max()
if cam_max < 1e-8:
return np.zeros(target_hw, dtype=np.float32)
cam = (cam / cam_max).detach().cpu().numpy() # (7, 7) in [0, 1]
# Upsample to original patch resolution
h, w = target_hw
cam_up = cv2.resize(cam, (w, h), interpolation=cv2.INTER_LINEAR)
return np.clip(cam_up, 0.0, 1.0).astype(np.float32)
def compare(self, patch_old: np.ndarray, patch_new: np.ndarray) -> Dict[str, object]:
g_old = cv2.cvtColor(patch_old, cv2.COLOR_RGB2GRAY).astype(np.float32)
g_new = cv2.cvtColor(patch_new, cv2.COLOR_RGB2GRAY).astype(np.float32)
diff_map = np.abs(g_old - g_new)
# Threshold of 8 (was 15) — CAD drawings have fine lines and small
# text; a dimension change may shift only a handful of pixels slightly.
changed_pixels = np.sum(diff_map > 8.0)
pixel_diff = float(changed_pixels) / max(g_old.size, 1)
ssim_val = float(ssim(g_old, g_new, data_range=255.0))
ssim_cost = max(0.0, 1.0 - ssim_val)
with torch.no_grad():
t1 = self._to_tensor(patch_old)
t2 = self._to_tensor(patch_new)
e1, e2, _, _ = self.model(t1, t2)
l2_dist = float(F.pairwise_distance(e1, e2).item())
semantic_diff = min(l2_dist / 10.0, 1.0)
total = 0.30 * pixel_diff + 0.40 * ssim_cost + 0.30 * semantic_diff
# Grad-CAM: spatial map showing WHERE inside this patch the change is
h, w = patch_new.shape[:2]
grad_cam_map = self._grad_cam(patch_old, patch_new, (h, w))
return {
"pixel_diff": pixel_diff,
"ssim_score": ssim_val,
"semantic_diff":semantic_diff,
"total_change": min(float(total), 1.0),
"grad_cam": grad_cam_map, # (h, w) float32 [0,1] ← new
}
def compare_pair(self, pair: MatchedPair, img_old: np.ndarray, img_new: np.ndarray) -> MatchedPair:
ro, rn = pair.region_old, pair.region_new
patch_old = img_old[ro.y: ro.y + ro.h, ro.x: ro.x + ro.w]
patch_new = img_new[rn.y: rn.y + rn.h, rn.x: rn.x + rn.w]
if patch_old.size == 0 or patch_new.size == 0:
return pair
target_h = max(patch_old.shape[0], patch_new.shape[0])
target_w = max(patch_old.shape[1], patch_new.shape[1])
def _pad_white(patch: np.ndarray, th: int, tw: int) -> np.ndarray:
canvas = np.full((th, tw, patch.shape[2]), 255, dtype=np.uint8)
canvas[:patch.shape[0], :patch.shape[1]] = patch
return canvas
patch_old_p = _pad_white(patch_old, target_h, target_w)
patch_new_p = _pad_white(patch_new, target_h, target_w)
metrics = self.compare(patch_old_p, patch_new_p)
pair.pixel_diff = metrics["pixel_diff"]
pair.ssim_score = metrics["ssim_score"]
pair.semantic_diff = metrics["semantic_diff"]
pair.total_change = metrics["total_change"]
# Store Grad-CAM map (sized to the new patch, not the padded version)
raw_cam = metrics.get("grad_cam")
if raw_cam is not None:
rn = pair.region_new
pair.heatmap = cv2.resize(raw_cam, (rn.w, rn.h),
interpolation=cv2.INTER_LINEAR)
return pair
# ══════════════════════════════════════════════════════════════════════
# HEATMAP GENERATOR
# ══════════════════════════════════════════════════════════════════════
class HeatmapGenerator:
_COLOUR_CHANGED = np.array([255, 220, 0], dtype=np.float32)
_COLOUR_MAJOR = np.array([230, 30, 30], dtype=np.float32)
_COLOUR_ADDED = np.array([ 30, 200, 60], dtype=np.float32)
_COLOUR_DELETED = np.array([200, 30, 200], dtype=np.float32)
@staticmethod
def _project_region(r: Region, M_inv: Optional[np.ndarray], w: int, h: int) -> Tuple[int, int, int, int]:
if M_inv is not None:
corners = np.array([
[r.x, r.y ],
[r.x + r.w, r.y ],
[r.x, r.y + r.h],
[r.x + r.w, r.y + r.h],
], dtype=np.float32)
ones = np.ones((4, 1), dtype=np.float32)
projected = (M_inv @ np.hstack([corners, ones]).T).T
x0 = int(np.clip(projected[:, 0].min(), 0, w - 1))
y0 = int(np.clip(projected[:, 1].min(), 0, h - 1))
x1 = int(np.clip(projected[:, 0].max(), 0, w - 1))
y1 = int(np.clip(projected[:, 1].max(), 0, h - 1))
else:
x0, y0, x1, y1 = r.x, r.y, r.x + r.w, r.y + r.h
return x0, y0, x1, y1
@staticmethod
def generate(
img_shape: Tuple[int, int],
matched_pairs: List[MatchedPair],
unmatched_old: List[Region],
unmatched_new: List[Region],
smooth_kernel: int = 11,
M_inv: Optional[np.ndarray] = None,
change_threshold: float = 0.05,
) -> np.ndarray:
h, w = img_shape
layers = np.zeros((h, w, 4), dtype=np.float32)
for pair in matched_pairs:
chg = float(pair.total_change)
if chg <= change_threshold:
continue
r = pair.region_new
ch = 0 if chg <= 0.40 else 1 # yellow channel vs red channel
if pair.heatmap is not None:
# ── Grad-CAM path: paint only the pixels that actually changed ──
# pair.heatmap is (r.h, r.w) float32 in [0,1]
# Scale by total_change so brighter = more changed
cam = pair.heatmap
if cam.shape != (r.h, r.w):
cam = cv2.resize(cam, (r.w, r.h),
interpolation=cv2.INTER_LINEAR)
intensity = np.clip(cam * chg, 0.0, 1.0)
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch] = np.maximum(
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch], intensity)
else:
# ── Fallback: flood the whole bounding box (no Grad-CAM available) ──
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch] = np.maximum(
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch], chg)
# Channels 2 (added/green) and 3 (deleted/purple) intentionally omitted.
# The Heatmap tab shows only modification intensity via yellow gradient.
# Added / deleted regions are visible in the Match Canvas thermal view.
if smooth_kernel > 0:
ksize = smooth_kernel if smooth_kernel % 2 == 1 else smooth_kernel + 1
for ch in range(4):
if layers[:, :, ch].max() > 0:
layers[:, :, ch] = cv2.GaussianBlur(layers[:, :, ch], (ksize, ksize), sigmaX=3.0)
for ch in range(2):
if layers[:, :, ch].max() > 0:
layers[:, :, ch] = np.power(layers[:, :, ch], 0.6)
return layers
# ══════════════════════════════════════════════════════════════════════
# VISUALISER
# ══════════════════════════════════════════════════════════════════════
class Visualiser:
COLOURS: Dict[str, Tuple[int, int, int]] = {
"text_block": (30, 144, 255),
"figure": (255, 165, 0),
"table": (50, 205, 50),
"unknown": (180, 180, 180),
"deleted": (220, 50, 50),
"added": (50, 220, 80),
"changed": (255, 200, 0),
"unchanged": (80, 220, 80),
}
@staticmethod
def draw_alignment_check(
img_old_aligned: np.ndarray,
img_new: np.ndarray,
) -> np.ndarray:
"""
Red-cyan overlay — Alignment Check tab.
How to read it
--------------
OLD aligned → Red channel
NEW doc → Green + Blue channels (= Cyan)
• Lines present at the SAME pixel in both → gray (R≈G≈B)
• Lines in OLD that drifted → RED fringe
• Lines in NEW that drifted → CYAN fringe
• White background on both → white
If the overlay looks mostly gray/white with no fringes, alignment is
good. Red/cyan colour fringes indicate residual misalignment.
"""
g_old = cv2.cvtColor(img_old_aligned, cv2.COLOR_RGB2GRAY)
g_new = cv2.cvtColor(img_new, cv2.COLOR_RGB2GRAY)
# Stack: R = old, G = new, B = new → cyan for new, red for old
return np.stack([g_old, g_new, g_new], axis=2)
# ══════════════════════════════════════════════════════════════════════
# HELPER — unmatched region visual-change check
# ══════════════════════════════════════════════════════════════════════
# Mean-abs pixel diff below this threshold → region is visually identical
# despite not being paired by the matcher; excluded from the change score.
_UNMATCHED_PIXEL_THR: float = 12.0 # on 0–255 grayscale scale
def _region_mean_diff(
r: Region,
img_a: np.ndarray,
candidates: List[Region],
img_b: np.ndarray,
thumb: int = 64,
) -> float:
"""
Return the *minimum* mean-abs-diff (grayscale, 0–255) between region `r`
in `img_a` and the spatially closest candidate region in `img_b`.
"Spatially closest" = smallest Euclidean centre-to-centre distance.
If there are no candidates, return 255.0 (maximally different).
"""
if not candidates:
return 255.0
pa = img_a[r.y: r.y + r.h, r.x: r.x + r.w]
if pa.size == 0:
return 255.0
ga = cv2.resize(cv2.cvtColor(pa, cv2.COLOR_RGB2GRAY), (thumb, thumb)).astype(np.float32)
cx_r, cy_r = r.center
# Sort candidates by centre distance — only check the 3 nearest for speed
candidates_sorted = sorted(
candidates,
key=lambda c: (c.center[0] - cx_r) ** 2 + (c.center[1] - cy_r) ** 2,
)[:3]
best = 255.0
for cand in candidates_sorted:
pb = img_b[cand.y: cand.y + cand.h, cand.x: cand.x + cand.w]
if pb.size == 0:
continue
gb = cv2.resize(
cv2.cvtColor(pb, cv2.COLOR_RGB2GRAY), (thumb, thumb)
).astype(np.float32)
diff = float(np.mean(np.abs(ga - gb)))
if diff < best:
best = diff
return best
def _is_truly_changed(
r: Region,
candidates: List[Region],
img_a: np.ndarray,
img_b: np.ndarray,
) -> bool:
"""
Return True only when region `r` (from img_a) is visually *different*
from its nearest spatial counterpart in candidates (from img_b).
Used to distinguish "matcher failed to pair identical regions" from
"content was genuinely added or deleted."
"""
return _region_mean_diff(r, img_a, candidates, img_b) >= _UNMATCHED_PIXEL_THR
# ══════════════════════════════════════════════════════════════════════
# MAIN PIPELINE
# ══════════════════════════════════════════════════════════════════════
class CoarseToFinePipeline:
def __init__(
self,
align: bool = True,
device: Optional[torch.device] = None,
region_extractor: Optional[LayoutRegionExtractor] = None,
matcher=None, # SemanticRetrievalMatcher or HungarianRegionMatcher
comparator: Optional[SiamesePatchComparator] = None,
min_similarity: float = 0.50, # used only when matcher=None (auto-build)
):
# Resolve device once here so both sub-modules share it
if device is None:
if torch.cuda.is_available():
device = torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
self._device = device
self.aligner = GlobalAligner() if align else None
self.extractor = region_extractor or LayoutRegionExtractor()
if matcher is not None:
# Caller supplied a custom matcher — use it as-is
self.matcher = matcher
self.comparator = comparator or SiamesePatchComparator(device=device)
else:
# ── Default path: shared ResNet50 encoder ──────────────
# Build the encoder once; hand the same object to both
# SemanticRetrievalMatcher (Stage 3) and SiamesePatchComparator (Stage 4).
# This halves model-load time and GPU/CPU RAM usage.
shared_encoder = _SiameseEncoder().to(device).eval()
logger.info("Pipeline: shared ResNet50 encoder on %s", device)
self.matcher = SemanticRetrievalMatcher(
encoder = shared_encoder,
device = device,
min_similarity = min_similarity,
)
self.comparator = comparator or SiamesePatchComparator(
device = device,
encoder = shared_encoder, # ← reuse, no second load
)
def compare(self, img_old: np.ndarray, img_new: np.ndarray, verbose: bool = True) -> ComparisonResult:
timings: Dict[str, float] = {}
t = time.time()
M = None
if self.aligner is not None:
img_old_aligned, M = self.aligner.align(img_old, img_new)
else:
img_old_aligned = img_old.copy()
timings["alignment"] = time.time() - t
t = time.time()
regions_old = self.extractor.extract(img_old_aligned)
regions_new = self.extractor.extract(img_new)
timings["extraction"] = time.time() - t
t = time.time()
matched, unmatched_old, unmatched_new = self.matcher.match(
regions_old, regions_new, img_old_aligned, img_new)
timings["matching"] = time.time() - t
t = time.time()
for i, pair in enumerate(matched):
matched[i] = self.comparator.compare_pair(pair, img_old_aligned, img_new)
timings["siamese"] = time.time() - t
if verbose:
logger.info("Timings → align: %.2fs | extract: %.2fs | match: %.2fs | siamese: %.2fs",
timings["alignment"], timings["extraction"],
timings["matching"], timings["siamese"])
h, w = img_new.shape[:2]
# After the ORB fix, M maps OLD→NEW (forward).
# _project_region uses this matrix to map unmatched OLD region corners
# into NEW-page coordinates for heatmap rendering — so pass M directly,
# NOT its inverse. (Previously M mapped NEW→OLD so the inverse was
# needed; now the roles are corrected.)
heatmap = HeatmapGenerator.generate(
(h, w), matched, unmatched_old, unmatched_new,
M_inv=M, change_threshold=0.05,
)
# ── Change % calculation (two-part fix) ────────────────────────
#
# Part A — pixel-diff gate on unmatched regions
# Unmatched regions are NOT automatically "added/deleted".
# They may simply be regions the matcher failed to pair even though
# the content is identical. We compare each unmatched region to its
# nearest spatial counterpart in the opposite list; only those whose
# pixel diff exceeds _UNMATCHED_PIXEL_THR are counted as truly changed.
#
# Part B — normalise against full page area (not just detected regions)
# Using content_area as denominator collapses to 100% when all regions
# are unmatched. Using h*w gives a stable baseline independent of
# how many regions were detected or matched.
truly_deleted = [
r for r in unmatched_old
if _is_truly_changed(r, unmatched_new, img_old_aligned, img_new)
]
truly_added = [
r for r in unmatched_new
if _is_truly_changed(r, unmatched_old, img_new, img_old_aligned)
]
page_area = max(h * w, 1) # Part B denominator
changed_area = sum(p.region_new.area for p in matched if p.total_change > 0.05)
deleted_area = sum(r.area for r in truly_deleted)
added_area = sum(r.area for r in truly_added)
total_pct = min(100.0 * (changed_area + added_area + deleted_area) / page_area, 100.0)
return ComparisonResult(
matched_pairs=matched,
unmatched_old=unmatched_old,
unmatched_new=unmatched_new,
global_transform=M,
total_change_pct=total_pct,
heatmap=heatmap,
img_old_aligned=img_old_aligned, # ← stored for thermal overlay
)
# ══════════════════════════════════════════════════════════════════════
# GRADIO APP — HELPERS
# ══════════════════════════════════════════════════════════════════════
def _pick_device() -> torch.device:
if torch.cuda.is_available():
return torch.device("cuda")
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return torch.device("mps")
return torch.device("cpu")
def _page_to_rgb(doc: fitz.Document, idx: int, dpi: int) -> np.ndarray:
pix = doc[idx].get_pixmap(dpi=dpi)
return np.frombuffer(pix.samples, np.uint8).reshape(pix.height, pix.width, 3)
def _build_summary(
page_results: list,
aligned: bool,
skip_old_p1: bool = False,
skip_new_p1: bool = False,
) -> str:
total_changes = [pr["total_change_pct"] for pr in page_results]
lines = [
"╔══════════════════════════════════════════════════════════╗",
"║ POWERGRID DOCUMENT AUDIT — CHANGE REPORT ║",
"╚══════════════════════════════════════════════════════════╝",
"",
f" Total Pages Analysed : {len(page_results)}",
f" Overall Avg Change : {np.mean(total_changes):.2f}%",
"",
"──────────────────────────────────────────────────────────",
" PAGE-WISE CHANGE SUMMARY",
"──────────────────────────────────────────────────────────",
]
for pr in page_results:
pct = pr["total_change_pct"]
status = "✅ MINIMAL" if pct < 5 else "⚠️ MODERATE" if pct < 20 else "🔴 SIGNIFICANT"
lines.append(f" Page {pr['page']:>3} │ {pct:>5.1f}% │ {status}")
significant = [pr["page"] for pr in page_results if pr["total_change_pct"] > 20]
if significant:
lines += [
"",
f" ⚠️ Pages with significant changes (>20%): {significant}",
]
return "\n".join(lines)
def _build_output_pdf(page_results: list, output_path: str,
process_dpi: int = 400) -> str:
"""
Build the output PDF at full pixel depth.
PyMuPDF page dimensions are in points (1 pt = 1/72 inch).
The overlay images are rendered at process_dpi. To preserve every
pixel without resampling, set the page size so that 1 image pixel = 1 pt
scaled by (72 / process_dpi):
page_width_pts = img_width_px * 72 / process_dpi
page_height_pts = img_height_px * 72 / process_dpi
insert_image() maps the image 1:1 onto the page rect, so no
downsampling or upsampling occurs — full pixel depth is preserved.
"""
doc_out = fitz.open()
for pr in page_results:
img = pr["align_check"].convert("RGB")
px_w, px_h = img.size
# Convert pixel dimensions to PDF points at the process DPI
pt_w = px_w * 72.0 / process_dpi
pt_h = px_h * 72.0 / process_dpi
page_out = doc_out.new_page(width=pt_w, height=pt_h)
buf = io.BytesIO()
img.save(buf, format="PNG", optimize=True) # lossless — no JPEG ringing
buf.seek(0)
page_out.insert_image(page_out.rect, stream=buf.read())
doc_out.save(output_path, deflate=True, garbage=4, clean=True)
doc_out.close()
return output_path
# ══════════════════════════════════════════════════════════════════════
# SPECIFIC-REGION HELPER — semantic global search in OLD document
# ══════════════════════════════════════════════════════════════════════
# ImageNet normalisation reused from SemanticRetrievalMatcher
_REGION_TRANSFORM = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def _embed_patch(patch_rgb: np.ndarray,
encoder: "_SiameseEncoder",
device: torch.device) -> torch.Tensor:
"""Encode a single RGB numpy patch → (128,) L2-normalised embedding."""
t = _REGION_TRANSFORM(Image.fromarray(patch_rgb)).unsqueeze(0).to(device)
with torch.no_grad():
emb, _ = encoder.encode(t) # (1, 128)
return emb[0] # (128,)
def _find_matching_region_in_old(
new_crop: np.ndarray,
img_old_full: np.ndarray,
encoder: "_SiameseEncoder",
device: torch.device,
) -> Tuple[int, int, int, int]:
"""
Locate where new_crop (user-selected patch from NEW page) sits inside
img_old_full (the complete OLD page).
Method — Semantic sliding-window search
----------------------------------------
1. Encode new_crop with the shared ResNet50 encoder → 128-d embedding.
2. Slide a window across img_old_full at multiple scales (±30 % of the
crop size, preserving aspect ratio). Step = 50 % of window size so
adjacent windows overlap and the true location is never missed.
3. Encode every window patch and compute cosine similarity with the
query embedding. Pick the window with the highest similarity.
4. Clamp the winning box to page bounds and return it.
Why semantic (not pixel-level):
• ResNet50 encodes *what* is in a region (shapes, structure, symbols),
not pixel values. Two revisions of the same table/panel/diagram will
have near-identical embeddings even if text values changed slightly.
• Scale-invariant: the multi-scale sweep handles content that was
enlarged or shrunk between revisions.
• Position-invariant: the full-page sweep finds content anywhere on the
OLD page regardless of how far it moved.
Returns (x1, y1, x2, y2) in img_old_full pixel space.
"""
crop_h, crop_w = new_crop.shape[:2]
old_h, old_w = img_old_full.shape[:2]
def _clamp_box(bx: int, by: int, bw: int, bh: int
) -> Tuple[int, int, int, int]:
bx = max(0, min(bx, old_w - 1))
by = max(0, min(by, old_h - 1))
bw = max(1, min(bw, old_w - bx))
bh = max(1, min(bh, old_h - by))
return bx, by, bx + bw, by + bh
# ── Step 1: encode the query (NEW crop) ──────────────────────────
q_emb = _embed_patch(new_crop, encoder, device) # (128,)
# ── Step 2: build candidate windows across scales ────────────────
# Scales relative to the crop's own size. For a 400-DPI page a crop
# that is, say, 600 px wide is tested at 420 … 780 px widths.
scales = (0.70, 0.85, 1.00, 1.15, 1.30)
aspect = crop_h / max(crop_w, 1)
candidates: List[Tuple[int, int, int, int]] = [] # (x, y, w, h)
for sc in scales:
win_w = max(32, int(crop_w * sc))
win_h = max(32, int(crop_h * sc))
if win_w > old_w or win_h > old_h:
continue
step_x = max(1, win_w // 2)
step_y = max(1, win_h // 2)
for y in range(0, old_h - win_h + 1, step_y):
for x in range(0, old_w - win_w + 1, step_x):
candidates.append((x, y, win_w, win_h))
logger.info(
"_find_matching_region_in_old: %d candidate windows across %d scales",
len(candidates), len(scales),
)
if not candidates:
# Entire crop is bigger than the old page — return full page
logger.warning("_find_matching_region_in_old: crop >= page; returning full page box.")
return _clamp_box(0, 0, old_w, old_h)
# ── Step 3: batch-encode all windows, find best cosine similarity ─
# Process in mini-batches of 64 to avoid OOM on large pages.
BATCH = 64
best_sim: float = -1.0
best_box: Tuple[int, int, int, int] = candidates[0]
for start in range(0, len(candidates), BATCH):
batch_cands = candidates[start: start + BATCH]
patches = []
for (cx, cy, cw, ch) in batch_cands:
patch = img_old_full[cy: cy + ch, cx: cx + cw]
patches.append(patch)
tensors = [
_REGION_TRANSFORM(Image.fromarray(p)) for p in patches
]
batch_t = torch.stack(tensors).to(device) # (B, 3, 224, 224)
with torch.no_grad():
embs, _ = encoder.encode(batch_t) # (B, 128)
# Cosine similarity: q_emb is already L2-normed, embs are L2-normed
sims = (embs @ q_emb).cpu().numpy() # (B,)
idx = int(sims.argmax())
if sims[idx] > best_sim:
best_sim = float(sims[idx])
best_box = batch_cands[idx]
bx, by, bw, bh = best_box
x1o, y1o, x2o, y2o = _clamp_box(bx, by, bw, bh)
logger.info(
"_find_matching_region_in_old: best cosine=%.4f OLD box (%d,%d)–(%d,%d)",
best_sim, x1o, y1o, x2o, y2o,
)
return (x1o, y1o, x2o, y2o)
# ══════════════════════════════════════════════════════════════════════
# CORE PROCESSING
# ══════════════════════════════════════════════════════════════════════
def run_comparison(
pdf_old_file,
pdf_new_file,
skip_old_p1: bool,
skip_new_p1: bool,
enable_align: bool,
compare_mode: str,
page_old_input: int,
page_new_input: int,
page_compare_mode: str = "Full Page",
region_coords=None,
display_dpi: int = 72,
progress=gr.Progress(),
):
dpi = 400 # process DPI — higher = more pixel depth in overlay output
if pdf_old_file is None or pdf_new_file is None:
raise gr.Error("Please upload both Previous Revision and New Document PDF files.")
device = _pick_device()
pipeline = CoarseToFinePipeline(
align = enable_align,
device = device,
min_similarity = 0.50,
)
progress(0, desc="Opening PDF files …")
doc_old = fitz.open(pdf_old_file.name)
doc_new = fitz.open(pdf_new_file.name)
# ── Build the list of (old_page_idx, new_page_idx) pairs to process ──
if compare_mode == "Specific Pages":
# Convert 1-based user input to 0-based index
old_idx_req = int(page_old_input or 1) - 1
new_idx_req = int(page_new_input or 1) - 1
# Clamp to valid range
old_idx_req = max(0, min(old_idx_req, len(doc_old) - 1))
new_idx_req = max(0, min(new_idx_req, len(doc_new) - 1))
page_pairs = [(old_idx_req, new_idx_req)]
else:
# Full document mode
old_start = 1 if skip_old_p1 else 0
new_start = 1 if skip_new_p1 else 0
old_pages = len(doc_old) - old_start
new_pages = len(doc_new) - new_start
num_pages = min(old_pages, new_pages)
if skip_old_p1:
gr.Info("Skipping cover page of Previous Revision.")
if skip_new_p1:
gr.Info("Skipping cover page of New Document.")
if old_pages != new_pages:
gr.Warning(
f"Page count mismatch: Previous Revision={old_pages}, New Document={new_pages}. "
f"Processing {num_pages} pages."
)
page_pairs = [(pg + old_start, pg + new_start) for pg in range(num_pages)]
num_pairs = len(page_pairs)
page_results = []
for i, (old_idx, new_idx) in enumerate(page_pairs):
progress(i / num_pairs, desc=f"Processing page {i + 1} / {num_pairs} …")
img_old = _page_to_rgb(doc_old, old_idx, dpi)
img_new = _page_to_rgb(doc_new, new_idx, dpi)
# ── Normalise page dimensions before any cropping ─────────────
# Both pages must have the same native DPI dimensions so that the
# same pixel box selects the same physical region in both docs.
if img_old.shape != img_new.shape:
img_old = cv2.resize(img_old, (img_new.shape[1], img_new.shape[0]))
# ── Specific-region crop ──────────────────────────────────────
# The user drew a box on the NEW-doc preview (at display_dpi).
# Steps:
# 1. Scale the drag coordinates from preview pixels → process DPI pixels.
# 2. Crop the same pixel box from BOTH old and new pages.
# (Engineering drawings keep the same layout between revisions —
# same position = same physical area. The ORB aligner inside
# pipeline.compare() handles any sub-pixel drift between the two.)
# 3. Replace img_old / img_new with the two crops → overlay is
# scoped to only the selected region.
if (compare_mode == "Specific Pages"
and page_compare_mode == "Specific Region"
and region_coords):
rx = region_coords.get("x", 0)
ry = region_coords.get("y", 0)
rw = region_coords.get("width", img_new.shape[1])
rh = region_coords.get("height", img_new.shape[0])
sf = dpi / float(display_dpi or 72) # preview px → process DPI px
x1 = max(0, int(rx * sf))
y1 = max(0, int(ry * sf))
x2 = min(img_new.shape[1], int((rx + rw) * sf))
y2 = min(img_new.shape[0], int((ry + rh) * sf))
logger.info(
"Specific Region: display_dpi=%d sf=%.3f "
"preview-box (%d,%d,%d,%d) → process-px (%d,%d)–(%d,%d)",
display_dpi, sf, rx, ry, rw, rh, x1, y1, x2, y2,
)
if x2 > x1 and y2 > y1:
# Step 1 — crop the selected region from NEW page
img_new_crop = img_new[y1:y2, x1:x2]
# Step 2 — semantic global search: encode the NEW crop with
# ResNet50, slide windows over the FULL OLD page at
# multiple scales, pick the highest cosine-similarity
# window as the matching region in OLD.
ox1, oy1, ox2, oy2 = _find_matching_region_in_old(
new_crop = img_new_crop,
img_old_full = img_old,
encoder = pipeline.matcher.encoder,
device = device,
)
logger.info(
"Specific Region: NEW (%d,%d)–(%d,%d) → OLD (%d,%d)–(%d,%d)",
x1, y1, x2, y2, ox1, oy1, ox2, oy2,
)
# Step 3 — crop OLD at found location; resize to exactly match
# NEW crop so pipeline.compare() gets equal-size inputs
img_old_raw = img_old[oy1:oy2, ox1:ox2]
nh, nw = img_new_crop.shape[:2]
if img_old_raw.shape[:2] != (nh, nw):
img_old_crop = cv2.resize(
img_old_raw, (nw, nh), interpolation=cv2.INTER_LINEAR,
)
else:
img_old_crop = img_old_raw
# Step 4 — overlay is scoped to the selected region only
img_old = img_old_crop
img_new = img_new_crop
result = pipeline.compare(img_old, img_new)
old_aligned_for_check = (
result.img_old_aligned if result.img_old_aligned is not None
else img_old
)
align_check = Visualiser.draw_alignment_check(old_aligned_for_check, img_new)
page_results.append({
"page": i + 1,
"result": result,
"align_check": Image.fromarray(align_check),
"original": Image.fromarray(img_old),
"revised": Image.fromarray(img_new),
"total_change_pct": result.total_change_pct,
})
doc_old.close()
doc_new.close()
progress(0.95, desc="Generating report PDF …")
output_pdf = _build_output_pdf(page_results, "ctf_output.pdf", process_dpi=dpi)
summary = _build_summary(page_results, enable_align, skip_old_p1, skip_new_p1)
progress(1.0, desc="Done!")
return page_results, summary, output_pdf, 1, gr.update(maximum=num_pairs, value=1)
def get_page_view(page_num, pages_data, view_mode, rotation: int = 0,
nudge_x: int = 0, nudge_y: int = 0, nudge_scale: float = 1.0):
if not pages_data:
return None
idx = int(page_num) - 1
idx = max(0, min(idx, len(pages_data) - 1))
pr = pages_data[idx]
key_map = {
"Auto-Overlay": "align_check",
"Previous Revision": "original",
"New Document": "revised",
}
img = pr.get(key_map.get(view_mode, "align_check"))
if img is None:
return None
# Manual fine-tune: only applies to Auto-Overlay view
ns = float(nudge_scale) if nudge_scale else 1.0
if view_mode == "Auto-Overlay" and (nudge_x != 0 or nudge_y != 0 or abs(ns - 1.0) > 1e-4):
img = _apply_nudge_overlay(pr, nudge_x, nudge_y, ns)
if img is not None and rotation % 360 != 0:
img = img.rotate(rotation, expand=True)
return img
def _apply_nudge_overlay(pr: dict, dx: int, dy: int, scale: float = 1.0) -> Image.Image:
"""
Re-render the Auto-Overlay with the NEW (red) layer shifted by (dx, dy) pixels
and scaled by `scale` around the image centre.
Cyan channel stays fixed (Previous Revision aligned).
Red channel = New Doc with nudge translate + scale applied.
"""
if pr.get("align_check") is None:
return None
# Extract channels from the stored align_check image
align_check_arr = np.array(pr["align_check"].convert("RGB"))
g_old_aligned = align_check_arr[:, :, 0] # cyan source (Previous Revision)
g_new_orig = align_check_arr[:, :, 1] # red source (New Doc)
h, w = g_old_aligned.shape
cx, cy = w / 2.0, h / 2.0
# Build combined affine: scale about centre + translate
# M = T(cx,cy) · S(scale) · T(-cx,-cy) · T(dx,dy)
scale = float(scale) if scale and scale > 0 else 1.0
# Combined 2×3 affine matrix
M = np.float32([
[scale, 0, dx + cx * (1 - scale)],
[0, scale, dy + cy * (1 - scale)],
])
g_new_transformed = cv2.warpAffine(
g_new_orig, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=255,
)
# Stack: R=old_aligned (cyan base), G=new_transformed, B=new_transformed (→ red fringe)
overlay = np.stack([g_old_aligned, g_new_transformed, g_new_transformed], axis=2)
return Image.fromarray(overlay.astype(np.uint8))
# ══════════════════════════════════════════════════════════════════════
# GRADIO UI
# ══════════════════════════════════════════════════════════════════════
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "styles.css"),
encoding="utf-8") as _css_f:
_CSS = _css_f.read()
_THEME = gr.themes.Base(
primary_hue=gr.themes.colors.blue,
neutral_hue=gr.themes.colors.gray,
font=[gr.themes.GoogleFont("Inter"), "sans-serif"],
)
# Gradio 6+: theme & css are passed to launch(), not Blocks()
with gr.Blocks(title="POWERGRID Document Auditor") as demo:
# ── Header ─────────────────────────────────────────────────────────
_logo_tag = (
f''
if _LOGO_URI else
'PG'
)
gr.HTML(f"""
Power Grid Corporation of India Limited — AI-Powered Document Comparison
Tip: Run Audit resets alignment
') nudge_step = gr.Number( value=1, minimum=1, maximum=100, step=1, label="Step Size (px)", precision=0, elem_id="nudge-step", ) nudge_scale = gr.Number( value=1.0, minimum=0.10, maximum=10.0, step=0.005, label="Scale — Red Layer", precision=3, elem_id="nudge-scale", ) nudge_readout = gr.HTML( value='