abhivsh's picture
Upload app.py
baf8731 verified
"""
app.py β€” POWERGRID Document Auditor (single-file HuggingFace Spaces build)
=============================================================================
Single-file Gradio app for AI-powered engineering drawing comparison.
Designed for POWERGRID (765/400/132kV AIS/GIS vendor drawing audits).
Pipeline:
Stage 1 β€” Global Alignment : Phase Correlation + ORB/RANSAC homography
Stage 2 β€” Region Extraction : Content-aware morphology (no pretrained detector)
Stage 3 β€” Semantic Matching : ResNet50 embeddings + cosine similarity (position-agnostic)
Stage 4 β€” Siamese Comparison : ResNet50 patch embeddings + GradCAM heatmaps
Run locally:
python app.py
"""
# ══════════════════════════════════════════════════════════════════════
# IMPORTS
# ══════════════════════════════════════════════════════════════════════
import base64
import io
import logging
import os
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import cv2
import fitz # PyMuPDF
import gradio as gr
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
from scipy.optimize import linear_sum_assignment
from skimage.metrics import structural_similarity as ssim
from torchvision import models, transforms
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# ── Logo: embed as base64 so it works on HuggingFace Spaces (no static folder) ──
def _load_logo_b64(filename: str = "logo_0.png") -> str:
"""Return a data-URI string for the logo, or empty string if file not found."""
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename)
if os.path.exists(logo_path):
with open(logo_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
ext = filename.rsplit(".", 1)[-1].lower()
mime = "image/png" if ext == "png" else f"image/{ext}"
return f"data:{mime};base64,{b64}"
return ""
_LOGO_URI = _load_logo_b64("logo_0.png")
# ══════════════════════════════════════════════════════════════════════
# DATA STRUCTURES
# ══════════════════════════════════════════════════════════════════════
@dataclass
class Region:
"""A detected layout region (axis-aligned bounding box)."""
x: int
y: int
w: int
h: int
label: str = "text_block" # text_block | figure | table | margin
confidence: float = 1.0
@property
def bbox(self) -> Tuple[int, int, int, int]:
return (self.x, self.y, self.x + self.w, self.y + self.h)
@property
def area(self) -> int:
return self.w * self.h
@property
def center(self) -> Tuple[float, float]:
return (self.x + self.w / 2.0, self.y + self.h / 2.0)
def iou(self, other: "Region") -> float:
xa = max(self.x, other.x)
ya = max(self.y, other.y)
xb = min(self.x + self.w, other.x + other.w)
yb = min(self.y + self.h, other.y + other.h)
inter = max(0, xb - xa) * max(0, yb - ya)
union = self.area + other.area - inter
return inter / union if union > 0 else 0.0
@dataclass
class MatchedPair:
"""A matched region pair between old and new documents."""
region_old: Region
region_new: Region
match_score: float
position_cost: float
appearance_cost: float
pixel_diff: float = 0.0
ssim_score: float = 1.0
semantic_diff: float = 0.0
total_change: float = 0.0
heatmap: Optional[np.ndarray] = None
@dataclass
class ComparisonResult:
"""Full comparison result for one document page."""
matched_pairs: List[MatchedPair]
unmatched_old: List[Region]
unmatched_new: List[Region]
global_transform: Optional[np.ndarray]
total_change_pct: float
heatmap: np.ndarray
img_old_aligned: Optional[np.ndarray] = None # aligned OLD, same coord-space as NEW
def summary(self) -> str:
lines = [
f" Global Alignment : {'Applied' if self.global_transform is not None else 'Skipped'}",
f" Matched Pairs : {len(self.matched_pairs)}",
f" Deleted Regions : {len(self.unmatched_old)}",
f" Added Regions : {len(self.unmatched_new)}",
f" Total Change : {self.total_change_pct:.1f}%",
]
changed = [p for p in self.matched_pairs if p.total_change > 0.05]
if changed:
avg_chg = np.mean([p.total_change for p in changed])
lines.append(f" Avg Change (modified regions): {avg_chg:.2f}")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════
# STAGE 1 β€” GLOBAL ALIGNER
# ══════════════════════════════════════════════════════════════════════
class GlobalAligner:
def __init__(self, orb_features: int = 2000, ransac_threshold: float = 5.0):
self.orb_features = orb_features
self.ransac_threshold = ransac_threshold
def _phase_correlation_shift(self, gray1: np.ndarray, gray2: np.ndarray) -> Tuple[float, float]:
f1 = np.fft.fft2(gray1.astype(np.float32))
f2 = np.fft.fft2(gray2.astype(np.float32))
denom = np.abs(f1 * np.conj(f2)) + 1e-10
cross = (f1 * np.conj(f2)) / denom
corr = np.fft.ifft2(cross).real
y_shift, x_shift = np.unravel_index(np.argmax(corr), corr.shape)
h, w = gray1.shape
if y_shift > h // 2:
y_shift -= h
if x_shift > w // 2:
x_shift -= w
return float(-x_shift), float(-y_shift)
def _orb_affine(self, gray_old: np.ndarray, gray_new: np.ndarray) -> Optional[np.ndarray]:
orb = cv2.ORB_create(nfeatures=self.orb_features)
kp1, des1 = orb.detectAndCompute(gray_old, None)
kp2, des2 = orb.detectAndCompute(gray_new, None)
if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
return None
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
matches = sorted(bf.match(des1, des2), key=lambda m: m.distance)
if len(matches) < 10:
return None
top_k = min(200, len(matches))
# src = OLD keypoints, dst = NEW keypoints
# → M maps OLD→NEW (forward transform), which is what warpAffine expects:
# warpAffine(img_old, M, size) correctly places OLD pixels at their NEW positions.
# BUG that was here: src/dst were swapped (kp2/NEW as src, kp1/OLD as dst),
# giving M that mapped NEW→OLD. warpAffine then doubled the displacement
# instead of correcting it, causing the full-image red/cyan fringe seen in
# the Alignment Check view.
src_pts = np.float32([kp1[m.queryIdx].pt for m in matches[:top_k]]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches[:top_k]]).reshape(-1, 1, 2)
M, mask = cv2.estimateAffinePartial2D(
src_pts, dst_pts, method=cv2.RANSAC,
ransacReprojThreshold=self.ransac_threshold,
)
return M
def align(self, img_old: np.ndarray, img_new: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
g_old = cv2.cvtColor(img_old, cv2.COLOR_RGB2GRAY)
g_new = cv2.cvtColor(img_new, cv2.COLOR_RGB2GRAY)
dx, dy = self._phase_correlation_shift(g_old, g_new)
M = self._orb_affine(g_old, g_new)
if M is None:
M = np.array([[1.0, 0.0, dx], [0.0, 1.0, dy]], dtype=np.float32)
h, w = img_old.shape[:2]
aligned = cv2.warpAffine(
img_old, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(255, 255, 255),
)
return aligned, M
# ══════════════════════════════════════════════════════════════════════
# STAGE 2 β€” LAYOUT REGION EXTRACTOR
# ══════════════════════════════════════════════════════════════════════
class LayoutRegionExtractor:
def __init__(
self,
min_area_ratio: float = 0.0003,
max_area_ratio: float = 0.92,
dilation_kernel: Tuple[int, int] = (8, 2),
dilation_iters: int = 2,
merge_iou_threshold: float = 0.40,
):
self.min_area_ratio = min_area_ratio
self.max_area_ratio = max_area_ratio
self.dilation_kernel = dilation_kernel
self.dilation_iters = dilation_iters
self.merge_iou_threshold = merge_iou_threshold
def _binarise(self, gray: np.ndarray) -> np.ndarray:
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
_, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
return binary
def _dilate(self, binary: np.ndarray) -> np.ndarray:
k = cv2.getStructuringElement(cv2.MORPH_RECT, self.dilation_kernel)
dilated = cv2.dilate(binary, k, iterations=self.dilation_iters)
k_line = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 1))
dilated = cv2.dilate(dilated, k_line, iterations=1)
k_vert = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 8))
return cv2.morphologyEx(dilated, cv2.MORPH_CLOSE, k_vert)
def _classify(self, patch_gray: np.ndarray, w: int, h: int) -> str:
aspect = w / max(h, 1)
_, binary = cv2.threshold(patch_gray, 127, 255, cv2.THRESH_BINARY_INV)
density = np.sum(binary > 0) / max(w * h, 1)
if density < 0.02:
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) < 3:
return "margin"
if aspect > 4.0 and density > 0.06:
return "text_block"
if 0.4 < aspect < 2.8 and density < 0.25:
return "figure"
if density > 0.18 and aspect > 1.0:
return "table"
return "text_block"
def _merge_overlapping(self, regions: List[Region]) -> List[Region]:
changed = True
while changed:
changed = False
used = [False] * len(regions)
merged: List[Region] = []
for i, r1 in enumerate(regions):
if used[i]:
continue
x0, y0 = r1.x, r1.y
x1, y1 = r1.x + r1.w, r1.y + r1.h
for j, r2 in enumerate(regions):
if i == j or used[j]:
continue
expanded = Region(x0, y0, x1 - x0, y1 - y0)
if expanded.iou(r2) > self.merge_iou_threshold:
x0 = min(x0, r2.x)
y0 = min(y0, r2.y)
x1 = max(x1, r2.x + r2.w)
y1 = max(y1, r2.y + r2.h)
used[j] = True
changed = True
merged.append(Region(x0, y0, x1 - x0, y1 - y0))
used[i] = True
regions = merged
return regions
def extract(self, img_rgb: np.ndarray) -> List[Region]:
h, w = img_rgb.shape[:2]
page_area = h * w
gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
binary = self._binarise(gray)
dilated = self._dilate(binary)
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
candidates: List[Region] = []
for cnt in contours:
rx, ry, rw, rh = cv2.boundingRect(cnt)
area = rw * rh
if area < page_area * self.min_area_ratio:
continue
if area > page_area * self.max_area_ratio:
continue
patch = gray[ry: ry + rh, rx: rx + rw]
label = self._classify(patch, rw, rh)
if label == "margin":
continue
candidates.append(Region(rx, ry, rw, rh, label=label))
regions = self._merge_overlapping(candidates)
regions.sort(key=lambda r: (r.y // 50, r.x))
logger.info("LayoutExtractor: %d regions detected", len(regions))
return regions
# ══════════════════════════════════════════════════════════════════════
# STAGE 3 β€” HUNGARIAN REGION MATCHER
# ══════════════════════════════════════════════════════════════════════
# ══════════════════════════════════════════════════════════════════════
# STAGE 3 β€” SEMANTIC RETRIEVAL MATCHER (position-agnostic)
# ══════════════════════════════════════════════════════════════════════
class SemanticRetrievalMatcher:
"""
Replaces HungarianRegionMatcher for layout-shift-robust document comparison.
Strategy
--------
For every region in the NEW page:
1. Extract the patch image from the NEW document.
2. Encode it with the shared ResNet50 backbone β†’ 128-d L2-normalised vector.
Simultaneously encode every OLD region patch.
Build an (N_new Γ— N_old) cosine-similarity matrix.
Run scipy.linear_sum_assignment on βˆ’similarity (maximise similarity).
Accept a pair only when similarity β‰₯ min_similarity.
This means a region that has *moved* (different x/y) but is otherwise
identical will still get similarity β‰ˆ 1.0 and be matched correctly.
"""
def __init__(
self,
encoder: "_SiameseEncoder",
device: torch.device,
min_similarity: float = 0.50,
thumbnail_size: Tuple[int, int] = (224, 224),
):
self.encoder = encoder
self.device = device
self.min_similarity = min_similarity
self._transform = transforms.Compose([
transforms.Resize(thumbnail_size),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# ------------------------------------------------------------------
def _patch(self, region: Region, img: np.ndarray) -> np.ndarray:
"""Crop a region from the image; returns white 64Γ—64 if empty."""
p = img[region.y: region.y + region.h, region.x: region.x + region.w]
if p.size == 0:
p = np.full((64, 64, 3), 255, dtype=np.uint8)
return p
def _embed(self, patches: List[np.ndarray]) -> torch.Tensor:
"""
Batch-encode a list of patches β†’ (N, 128) normalised embedding tensor.
Runs entirely on self.device with no gradient.
"""
tensors = [
self._transform(Image.fromarray(p)) for p in patches
]
batch = torch.stack(tensors).to(self.device) # (N, 3, 224, 224)
with torch.no_grad():
embeddings, _ = self.encoder.encode(batch) # (N, 128) β€” already L2-normed
return embeddings
# ------------------------------------------------------------------
def match(
self,
regions_old: List[Region],
regions_new: List[Region],
img_old: np.ndarray,
img_new: np.ndarray,
) -> Tuple[List[MatchedPair], List[Region], List[Region]]:
n_old, n_new = len(regions_old), len(regions_new)
if n_old == 0 or n_new == 0:
return [], list(regions_old), list(regions_new)
# ── 1. Encode both sets of patches ─────────────────────────
patches_old = [self._patch(r, img_old) for r in regions_old]
patches_new = [self._patch(r, img_new) for r in regions_new]
emb_old = self._embed(patches_old) # (n_old, 128)
emb_new = self._embed(patches_new) # (n_new, 128)
# ── 2. Cosine similarity matrix: rows=NEW, cols=OLD ─────────
# L2-normed β†’ dot product == cosine similarity
sim_mat = torch.mm(emb_new, emb_old.T).cpu().numpy() # (n_new, n_old)
# ── 3. Hungarian assignment on βˆ’similarity ──────────────────
row_ind, col_ind = linear_sum_assignment(-sim_mat) # maximise sim
matched_pairs: List[MatchedPair] = []
matched_old_idx: set = set()
matched_new_idx: set = set()
for ri, ci in zip(row_ind, col_ind):
sim = float(sim_mat[ri, ci])
if sim < self.min_similarity:
continue # below threshold β†’ treat as unmatched
matched_pairs.append(MatchedPair(
region_old = regions_old[ci],
region_new = regions_new[ri],
match_score = sim,
position_cost = 0.0, # no position penalty
appearance_cost= max(0.0, 1.0 - sim),
))
matched_old_idx.add(ci)
matched_new_idx.add(ri)
unmatched_old = [regions_old[i] for i in range(n_old) if i not in matched_old_idx]
unmatched_new = [regions_new[j] for j in range(n_new) if j not in matched_new_idx]
logger.info(
"SemanticRetrieval: %d matched | %d deleted | %d added "
"(min_sim=%.2f)",
len(matched_pairs), len(unmatched_old), len(unmatched_new),
self.min_similarity,
)
return matched_pairs, unmatched_old, unmatched_new
# ══════════════════════════════════════════════════════════════════════
# STAGE 4 β€” SIAMESE PATCH COMPARATOR
# ══════════════════════════════════════════════════════════════════════
class _SiameseEncoder(nn.Module):
def __init__(self):
super().__init__()
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
self.features = nn.Sequential(*list(resnet.children())[:-2])
self.pool = resnet.avgpool
self.embed = nn.Sequential(
nn.Linear(2048, 512), nn.ReLU(),
nn.Linear(512, 128),
)
def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
feat_map = self.features(x)
pooled = torch.flatten(self.pool(feat_map), 1)
embed = F.normalize(self.embed(pooled), p=2, dim=1)
return embed, feat_map
def forward(self, x1: torch.Tensor, x2: torch.Tensor):
e1, f1 = self.encode(x1)
e2, f2 = self.encode(x2)
return e1, e2, f1, f2
class SiamesePatchComparator:
def __init__(
self,
device: Optional[torch.device] = None,
encoder: Optional[_SiameseEncoder] = None, # ← shared encoder
):
if device is None:
if torch.cuda.is_available():
device = torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
self.device = device
# Reuse the encoder from SemanticRetrievalMatcher if provided β€”
# avoids loading ResNet50 weights a second time.
if encoder is not None:
self.model = encoder
logger.info("SiamesePatchComparator: reusing shared encoder on %s", device)
else:
self.model = _SiameseEncoder().to(device).eval()
logger.info("SiamesePatchComparator: created new encoder on %s", device)
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _to_tensor(self, patch_rgb: np.ndarray) -> torch.Tensor:
return self.transform(Image.fromarray(patch_rgb)).unsqueeze(0).to(self.device)
def _grad_cam(
self,
patch_old: np.ndarray,
patch_new: np.ndarray,
target_hw: Tuple[int, int],
) -> np.ndarray:
"""
Grad-CAM spatial change map β€” WHERE inside the patch the embedding differs.
Method
------
1. Forward patch_old (no grad) β†’ embedding e_old.
2. Forward patch_new (with grad, hooks on last conv block) β†’ embedding e_new
+ feature map F captured by forward hook.
3. Scalar loss = pairwise_distance(e_old.detach(), e_new).
4. loss.backward() β†’ βˆ‚loss/βˆ‚F captured by backward hook.
5. Grad-CAM = ReLU( mean_c(βˆ‚loss/βˆ‚F) Β· F ) β†’ (7Γ—7) β†’ upsample to patch size.
Pixels with HIGH activation changed the embedding the most β†’ the actual edits.
Returns
-------
np.ndarray shape (target_hw[0], target_hw[1]), float32, values in [0, 1].
"""
t_old = self._to_tensor(patch_old)
t_new = self._to_tensor(patch_new)
feat_store: Dict[str, torch.Tensor] = {}
grad_store: Dict[str, torch.Tensor] = {}
# Hook on the last convolutional block of the shared ResNet50
last_block = self.model.features[-1]
def _fwd(module, inp, out):
feat_store["f"] = out # (1, 2048, 7, 7)
def _bwd(module, grad_in, grad_out):
grad_store["g"] = grad_out[0] # (1, 2048, 7, 7)
h_fwd = last_block.register_forward_hook(_fwd)
h_bwd = last_block.register_full_backward_hook(_bwd)
try:
# e_old β€” no gradient needed, just a reference point
with torch.no_grad():
e_old, _ = self.model.encode(t_old)
# e_new β€” gradient flows through this path only
with torch.enable_grad():
self.model.zero_grad()
e_new, _ = self.model.encode(t_new)
dist = F.pairwise_distance(e_old.detach(), e_new)
dist.backward()
finally:
h_fwd.remove()
h_bwd.remove()
if "f" not in feat_store or "g" not in grad_store:
return np.zeros(target_hw, dtype=np.float32)
# Grad-CAM: global-average-pool the gradients, weight feature maps
weights = grad_store["g"].mean(dim=[2, 3], keepdim=True) # (1,2048,1,1)
cam = (weights * feat_store["f"]).sum(dim=1).squeeze() # (7, 7)
cam = F.relu(cam)
cam_max = cam.max()
if cam_max < 1e-8:
return np.zeros(target_hw, dtype=np.float32)
cam = (cam / cam_max).detach().cpu().numpy() # (7, 7) in [0, 1]
# Upsample to original patch resolution
h, w = target_hw
cam_up = cv2.resize(cam, (w, h), interpolation=cv2.INTER_LINEAR)
return np.clip(cam_up, 0.0, 1.0).astype(np.float32)
def compare(self, patch_old: np.ndarray, patch_new: np.ndarray) -> Dict[str, object]:
g_old = cv2.cvtColor(patch_old, cv2.COLOR_RGB2GRAY).astype(np.float32)
g_new = cv2.cvtColor(patch_new, cv2.COLOR_RGB2GRAY).astype(np.float32)
diff_map = np.abs(g_old - g_new)
# Threshold of 8 (was 15) β€” CAD drawings have fine lines and small
# text; a dimension change may shift only a handful of pixels slightly.
changed_pixels = np.sum(diff_map > 8.0)
pixel_diff = float(changed_pixels) / max(g_old.size, 1)
ssim_val = float(ssim(g_old, g_new, data_range=255.0))
ssim_cost = max(0.0, 1.0 - ssim_val)
with torch.no_grad():
t1 = self._to_tensor(patch_old)
t2 = self._to_tensor(patch_new)
e1, e2, _, _ = self.model(t1, t2)
l2_dist = float(F.pairwise_distance(e1, e2).item())
semantic_diff = min(l2_dist / 10.0, 1.0)
total = 0.30 * pixel_diff + 0.40 * ssim_cost + 0.30 * semantic_diff
# Grad-CAM: spatial map showing WHERE inside this patch the change is
h, w = patch_new.shape[:2]
grad_cam_map = self._grad_cam(patch_old, patch_new, (h, w))
return {
"pixel_diff": pixel_diff,
"ssim_score": ssim_val,
"semantic_diff":semantic_diff,
"total_change": min(float(total), 1.0),
"grad_cam": grad_cam_map, # (h, w) float32 [0,1] ← new
}
def compare_pair(self, pair: MatchedPair, img_old: np.ndarray, img_new: np.ndarray) -> MatchedPair:
ro, rn = pair.region_old, pair.region_new
patch_old = img_old[ro.y: ro.y + ro.h, ro.x: ro.x + ro.w]
patch_new = img_new[rn.y: rn.y + rn.h, rn.x: rn.x + rn.w]
if patch_old.size == 0 or patch_new.size == 0:
return pair
target_h = max(patch_old.shape[0], patch_new.shape[0])
target_w = max(patch_old.shape[1], patch_new.shape[1])
def _pad_white(patch: np.ndarray, th: int, tw: int) -> np.ndarray:
canvas = np.full((th, tw, patch.shape[2]), 255, dtype=np.uint8)
canvas[:patch.shape[0], :patch.shape[1]] = patch
return canvas
patch_old_p = _pad_white(patch_old, target_h, target_w)
patch_new_p = _pad_white(patch_new, target_h, target_w)
metrics = self.compare(patch_old_p, patch_new_p)
pair.pixel_diff = metrics["pixel_diff"]
pair.ssim_score = metrics["ssim_score"]
pair.semantic_diff = metrics["semantic_diff"]
pair.total_change = metrics["total_change"]
# Store Grad-CAM map (sized to the new patch, not the padded version)
raw_cam = metrics.get("grad_cam")
if raw_cam is not None:
rn = pair.region_new
pair.heatmap = cv2.resize(raw_cam, (rn.w, rn.h),
interpolation=cv2.INTER_LINEAR)
return pair
# ══════════════════════════════════════════════════════════════════════
# HEATMAP GENERATOR
# ══════════════════════════════════════════════════════════════════════
class HeatmapGenerator:
_COLOUR_CHANGED = np.array([255, 220, 0], dtype=np.float32)
_COLOUR_MAJOR = np.array([230, 30, 30], dtype=np.float32)
_COLOUR_ADDED = np.array([ 30, 200, 60], dtype=np.float32)
_COLOUR_DELETED = np.array([200, 30, 200], dtype=np.float32)
@staticmethod
def _project_region(r: Region, M_inv: Optional[np.ndarray], w: int, h: int) -> Tuple[int, int, int, int]:
if M_inv is not None:
corners = np.array([
[r.x, r.y ],
[r.x + r.w, r.y ],
[r.x, r.y + r.h],
[r.x + r.w, r.y + r.h],
], dtype=np.float32)
ones = np.ones((4, 1), dtype=np.float32)
projected = (M_inv @ np.hstack([corners, ones]).T).T
x0 = int(np.clip(projected[:, 0].min(), 0, w - 1))
y0 = int(np.clip(projected[:, 1].min(), 0, h - 1))
x1 = int(np.clip(projected[:, 0].max(), 0, w - 1))
y1 = int(np.clip(projected[:, 1].max(), 0, h - 1))
else:
x0, y0, x1, y1 = r.x, r.y, r.x + r.w, r.y + r.h
return x0, y0, x1, y1
@staticmethod
def generate(
img_shape: Tuple[int, int],
matched_pairs: List[MatchedPair],
unmatched_old: List[Region],
unmatched_new: List[Region],
smooth_kernel: int = 11,
M_inv: Optional[np.ndarray] = None,
change_threshold: float = 0.05,
) -> np.ndarray:
h, w = img_shape
layers = np.zeros((h, w, 4), dtype=np.float32)
for pair in matched_pairs:
chg = float(pair.total_change)
if chg <= change_threshold:
continue
r = pair.region_new
ch = 0 if chg <= 0.40 else 1 # yellow channel vs red channel
if pair.heatmap is not None:
# ── Grad-CAM path: paint only the pixels that actually changed ──
# pair.heatmap is (r.h, r.w) float32 in [0,1]
# Scale by total_change so brighter = more changed
cam = pair.heatmap
if cam.shape != (r.h, r.w):
cam = cv2.resize(cam, (r.w, r.h),
interpolation=cv2.INTER_LINEAR)
intensity = np.clip(cam * chg, 0.0, 1.0)
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch] = np.maximum(
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch], intensity)
else:
# ── Fallback: flood the whole bounding box (no Grad-CAM available) ──
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch] = np.maximum(
layers[r.y:r.y + r.h, r.x:r.x + r.w, ch], chg)
# Channels 2 (added/green) and 3 (deleted/purple) intentionally omitted.
# The Heatmap tab shows only modification intensity via yellow gradient.
# Added / deleted regions are visible in the Match Canvas thermal view.
if smooth_kernel > 0:
ksize = smooth_kernel if smooth_kernel % 2 == 1 else smooth_kernel + 1
for ch in range(4):
if layers[:, :, ch].max() > 0:
layers[:, :, ch] = cv2.GaussianBlur(layers[:, :, ch], (ksize, ksize), sigmaX=3.0)
for ch in range(2):
if layers[:, :, ch].max() > 0:
layers[:, :, ch] = np.power(layers[:, :, ch], 0.6)
return layers
# ══════════════════════════════════════════════════════════════════════
# VISUALISER
# ══════════════════════════════════════════════════════════════════════
class Visualiser:
COLOURS: Dict[str, Tuple[int, int, int]] = {
"text_block": (30, 144, 255),
"figure": (255, 165, 0),
"table": (50, 205, 50),
"unknown": (180, 180, 180),
"deleted": (220, 50, 50),
"added": (50, 220, 80),
"changed": (255, 200, 0),
"unchanged": (80, 220, 80),
}
@staticmethod
def draw_alignment_check(
img_old_aligned: np.ndarray,
img_new: np.ndarray,
) -> np.ndarray:
"""
Red-cyan overlay β€” Alignment Check tab.
How to read it
--------------
OLD aligned β†’ Red channel
NEW doc β†’ Green + Blue channels (= Cyan)
β€’ Lines present at the SAME pixel in both β†’ gray (Rβ‰ˆGβ‰ˆB)
β€’ Lines in OLD that drifted β†’ RED fringe
β€’ Lines in NEW that drifted β†’ CYAN fringe
β€’ White background on both β†’ white
If the overlay looks mostly gray/white with no fringes, alignment is
good. Red/cyan colour fringes indicate residual misalignment.
"""
g_old = cv2.cvtColor(img_old_aligned, cv2.COLOR_RGB2GRAY)
g_new = cv2.cvtColor(img_new, cv2.COLOR_RGB2GRAY)
# Stack: R = old, G = new, B = new β†’ cyan for new, red for old
return np.stack([g_old, g_new, g_new], axis=2)
# ══════════════════════════════════════════════════════════════════════
# HELPER β€” unmatched region visual-change check
# ══════════════════════════════════════════════════════════════════════
# Mean-abs pixel diff below this threshold β†’ region is visually identical
# despite not being paired by the matcher; excluded from the change score.
_UNMATCHED_PIXEL_THR: float = 12.0 # on 0–255 grayscale scale
def _region_mean_diff(
r: Region,
img_a: np.ndarray,
candidates: List[Region],
img_b: np.ndarray,
thumb: int = 64,
) -> float:
"""
Return the *minimum* mean-abs-diff (grayscale, 0–255) between region `r`
in `img_a` and the spatially closest candidate region in `img_b`.
"Spatially closest" = smallest Euclidean centre-to-centre distance.
If there are no candidates, return 255.0 (maximally different).
"""
if not candidates:
return 255.0
pa = img_a[r.y: r.y + r.h, r.x: r.x + r.w]
if pa.size == 0:
return 255.0
ga = cv2.resize(cv2.cvtColor(pa, cv2.COLOR_RGB2GRAY), (thumb, thumb)).astype(np.float32)
cx_r, cy_r = r.center
# Sort candidates by centre distance β€” only check the 3 nearest for speed
candidates_sorted = sorted(
candidates,
key=lambda c: (c.center[0] - cx_r) ** 2 + (c.center[1] - cy_r) ** 2,
)[:3]
best = 255.0
for cand in candidates_sorted:
pb = img_b[cand.y: cand.y + cand.h, cand.x: cand.x + cand.w]
if pb.size == 0:
continue
gb = cv2.resize(
cv2.cvtColor(pb, cv2.COLOR_RGB2GRAY), (thumb, thumb)
).astype(np.float32)
diff = float(np.mean(np.abs(ga - gb)))
if diff < best:
best = diff
return best
def _is_truly_changed(
r: Region,
candidates: List[Region],
img_a: np.ndarray,
img_b: np.ndarray,
) -> bool:
"""
Return True only when region `r` (from img_a) is visually *different*
from its nearest spatial counterpart in candidates (from img_b).
Used to distinguish "matcher failed to pair identical regions" from
"content was genuinely added or deleted."
"""
return _region_mean_diff(r, img_a, candidates, img_b) >= _UNMATCHED_PIXEL_THR
# ══════════════════════════════════════════════════════════════════════
# MAIN PIPELINE
# ══════════════════════════════════════════════════════════════════════
class CoarseToFinePipeline:
def __init__(
self,
align: bool = True,
device: Optional[torch.device] = None,
region_extractor: Optional[LayoutRegionExtractor] = None,
matcher=None, # SemanticRetrievalMatcher or HungarianRegionMatcher
comparator: Optional[SiamesePatchComparator] = None,
min_similarity: float = 0.50, # used only when matcher=None (auto-build)
):
# Resolve device once here so both sub-modules share it
if device is None:
if torch.cuda.is_available():
device = torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
self._device = device
self.aligner = GlobalAligner() if align else None
self.extractor = region_extractor or LayoutRegionExtractor()
if matcher is not None:
# Caller supplied a custom matcher β€” use it as-is
self.matcher = matcher
self.comparator = comparator or SiamesePatchComparator(device=device)
else:
# ── Default path: shared ResNet50 encoder ──────────────
# Build the encoder once; hand the same object to both
# SemanticRetrievalMatcher (Stage 3) and SiamesePatchComparator (Stage 4).
# This halves model-load time and GPU/CPU RAM usage.
shared_encoder = _SiameseEncoder().to(device).eval()
logger.info("Pipeline: shared ResNet50 encoder on %s", device)
self.matcher = SemanticRetrievalMatcher(
encoder = shared_encoder,
device = device,
min_similarity = min_similarity,
)
self.comparator = comparator or SiamesePatchComparator(
device = device,
encoder = shared_encoder, # ← reuse, no second load
)
def compare(self, img_old: np.ndarray, img_new: np.ndarray, verbose: bool = True) -> ComparisonResult:
timings: Dict[str, float] = {}
t = time.time()
M = None
if self.aligner is not None:
img_old_aligned, M = self.aligner.align(img_old, img_new)
else:
img_old_aligned = img_old.copy()
timings["alignment"] = time.time() - t
t = time.time()
regions_old = self.extractor.extract(img_old_aligned)
regions_new = self.extractor.extract(img_new)
timings["extraction"] = time.time() - t
t = time.time()
matched, unmatched_old, unmatched_new = self.matcher.match(
regions_old, regions_new, img_old_aligned, img_new)
timings["matching"] = time.time() - t
t = time.time()
for i, pair in enumerate(matched):
matched[i] = self.comparator.compare_pair(pair, img_old_aligned, img_new)
timings["siamese"] = time.time() - t
if verbose:
logger.info("Timings β†’ align: %.2fs | extract: %.2fs | match: %.2fs | siamese: %.2fs",
timings["alignment"], timings["extraction"],
timings["matching"], timings["siamese"])
h, w = img_new.shape[:2]
# After the ORB fix, M maps OLD→NEW (forward).
# _project_region uses this matrix to map unmatched OLD region corners
# into NEW-page coordinates for heatmap rendering β€” so pass M directly,
# NOT its inverse. (Previously M mapped NEW→OLD so the inverse was
# needed; now the roles are corrected.)
heatmap = HeatmapGenerator.generate(
(h, w), matched, unmatched_old, unmatched_new,
M_inv=M, change_threshold=0.05,
)
# ── Change % calculation (two-part fix) ────────────────────────
#
# Part A β€” pixel-diff gate on unmatched regions
# Unmatched regions are NOT automatically "added/deleted".
# They may simply be regions the matcher failed to pair even though
# the content is identical. We compare each unmatched region to its
# nearest spatial counterpart in the opposite list; only those whose
# pixel diff exceeds _UNMATCHED_PIXEL_THR are counted as truly changed.
#
# Part B β€” normalise against full page area (not just detected regions)
# Using content_area as denominator collapses to 100% when all regions
# are unmatched. Using h*w gives a stable baseline independent of
# how many regions were detected or matched.
truly_deleted = [
r for r in unmatched_old
if _is_truly_changed(r, unmatched_new, img_old_aligned, img_new)
]
truly_added = [
r for r in unmatched_new
if _is_truly_changed(r, unmatched_old, img_new, img_old_aligned)
]
page_area = max(h * w, 1) # Part B denominator
changed_area = sum(p.region_new.area for p in matched if p.total_change > 0.05)
deleted_area = sum(r.area for r in truly_deleted)
added_area = sum(r.area for r in truly_added)
total_pct = min(100.0 * (changed_area + added_area + deleted_area) / page_area, 100.0)
return ComparisonResult(
matched_pairs=matched,
unmatched_old=unmatched_old,
unmatched_new=unmatched_new,
global_transform=M,
total_change_pct=total_pct,
heatmap=heatmap,
img_old_aligned=img_old_aligned, # ← stored for thermal overlay
)
# ══════════════════════════════════════════════════════════════════════
# GRADIO APP β€” HELPERS
# ══════════════════════════════════════════════════════════════════════
def _pick_device() -> torch.device:
if torch.cuda.is_available():
return torch.device("cuda")
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return torch.device("mps")
return torch.device("cpu")
def _page_to_rgb(doc: fitz.Document, idx: int, dpi: int) -> np.ndarray:
pix = doc[idx].get_pixmap(dpi=dpi)
return np.frombuffer(pix.samples, np.uint8).reshape(pix.height, pix.width, 3)
def _build_summary(
page_results: list,
aligned: bool,
skip_old_p1: bool = False,
skip_new_p1: bool = False,
) -> str:
total_changes = [pr["total_change_pct"] for pr in page_results]
lines = [
"╔══════════════════════════════════════════════════════════╗",
"β•‘ POWERGRID DOCUMENT AUDIT β€” CHANGE REPORT β•‘",
"β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•",
"",
f" Total Pages Analysed : {len(page_results)}",
f" Overall Avg Change : {np.mean(total_changes):.2f}%",
"",
"──────────────────────────────────────────────────────────",
" PAGE-WISE CHANGE SUMMARY",
"──────────────────────────────────────────────────────────",
]
for pr in page_results:
pct = pr["total_change_pct"]
status = "βœ… MINIMAL" if pct < 5 else "⚠️ MODERATE" if pct < 20 else "πŸ”΄ SIGNIFICANT"
lines.append(f" Page {pr['page']:>3} β”‚ {pct:>5.1f}% β”‚ {status}")
significant = [pr["page"] for pr in page_results if pr["total_change_pct"] > 20]
if significant:
lines += [
"",
f" ⚠️ Pages with significant changes (>20%): {significant}",
]
return "\n".join(lines)
def _build_output_pdf(page_results: list, output_path: str,
process_dpi: int = 400) -> str:
"""
Build the output PDF at full pixel depth.
PyMuPDF page dimensions are in points (1 pt = 1/72 inch).
The overlay images are rendered at process_dpi. To preserve every
pixel without resampling, set the page size so that 1 image pixel = 1 pt
scaled by (72 / process_dpi):
page_width_pts = img_width_px * 72 / process_dpi
page_height_pts = img_height_px * 72 / process_dpi
insert_image() maps the image 1:1 onto the page rect, so no
downsampling or upsampling occurs β€” full pixel depth is preserved.
"""
doc_out = fitz.open()
for pr in page_results:
img = pr["align_check"].convert("RGB")
px_w, px_h = img.size
# Convert pixel dimensions to PDF points at the process DPI
pt_w = px_w * 72.0 / process_dpi
pt_h = px_h * 72.0 / process_dpi
page_out = doc_out.new_page(width=pt_w, height=pt_h)
buf = io.BytesIO()
img.save(buf, format="PNG", optimize=True) # lossless β€” no JPEG ringing
buf.seek(0)
page_out.insert_image(page_out.rect, stream=buf.read())
doc_out.save(output_path, deflate=True, garbage=4, clean=True)
doc_out.close()
return output_path
# ══════════════════════════════════════════════════════════════════════
# SPECIFIC-REGION HELPER β€” semantic global search in OLD document
# ══════════════════════════════════════════════════════════════════════
# ImageNet normalisation reused from SemanticRetrievalMatcher
_REGION_TRANSFORM = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def _embed_patch(patch_rgb: np.ndarray,
encoder: "_SiameseEncoder",
device: torch.device) -> torch.Tensor:
"""Encode a single RGB numpy patch β†’ (128,) L2-normalised embedding."""
t = _REGION_TRANSFORM(Image.fromarray(patch_rgb)).unsqueeze(0).to(device)
with torch.no_grad():
emb, _ = encoder.encode(t) # (1, 128)
return emb[0] # (128,)
def _find_matching_region_in_old(
new_crop: np.ndarray,
img_old_full: np.ndarray,
encoder: "_SiameseEncoder",
device: torch.device,
) -> Tuple[int, int, int, int]:
"""
Locate where new_crop (user-selected patch from NEW page) sits inside
img_old_full (the complete OLD page).
Method β€” Semantic sliding-window search
----------------------------------------
1. Encode new_crop with the shared ResNet50 encoder β†’ 128-d embedding.
2. Slide a window across img_old_full at multiple scales (Β±30 % of the
crop size, preserving aspect ratio). Step = 50 % of window size so
adjacent windows overlap and the true location is never missed.
3. Encode every window patch and compute cosine similarity with the
query embedding. Pick the window with the highest similarity.
4. Clamp the winning box to page bounds and return it.
Why semantic (not pixel-level):
β€’ ResNet50 encodes *what* is in a region (shapes, structure, symbols),
not pixel values. Two revisions of the same table/panel/diagram will
have near-identical embeddings even if text values changed slightly.
β€’ Scale-invariant: the multi-scale sweep handles content that was
enlarged or shrunk between revisions.
β€’ Position-invariant: the full-page sweep finds content anywhere on the
OLD page regardless of how far it moved.
Returns (x1, y1, x2, y2) in img_old_full pixel space.
"""
crop_h, crop_w = new_crop.shape[:2]
old_h, old_w = img_old_full.shape[:2]
def _clamp_box(bx: int, by: int, bw: int, bh: int
) -> Tuple[int, int, int, int]:
bx = max(0, min(bx, old_w - 1))
by = max(0, min(by, old_h - 1))
bw = max(1, min(bw, old_w - bx))
bh = max(1, min(bh, old_h - by))
return bx, by, bx + bw, by + bh
# ── Step 1: encode the query (NEW crop) ──────────────────────────
q_emb = _embed_patch(new_crop, encoder, device) # (128,)
# ── Step 2: build candidate windows across scales ────────────────
# Scales relative to the crop's own size. For a 400-DPI page a crop
# that is, say, 600 px wide is tested at 420 … 780 px widths.
scales = (0.70, 0.85, 1.00, 1.15, 1.30)
aspect = crop_h / max(crop_w, 1)
candidates: List[Tuple[int, int, int, int]] = [] # (x, y, w, h)
for sc in scales:
win_w = max(32, int(crop_w * sc))
win_h = max(32, int(crop_h * sc))
if win_w > old_w or win_h > old_h:
continue
step_x = max(1, win_w // 2)
step_y = max(1, win_h // 2)
for y in range(0, old_h - win_h + 1, step_y):
for x in range(0, old_w - win_w + 1, step_x):
candidates.append((x, y, win_w, win_h))
logger.info(
"_find_matching_region_in_old: %d candidate windows across %d scales",
len(candidates), len(scales),
)
if not candidates:
# Entire crop is bigger than the old page β€” return full page
logger.warning("_find_matching_region_in_old: crop >= page; returning full page box.")
return _clamp_box(0, 0, old_w, old_h)
# ── Step 3: batch-encode all windows, find best cosine similarity ─
# Process in mini-batches of 64 to avoid OOM on large pages.
BATCH = 64
best_sim: float = -1.0
best_box: Tuple[int, int, int, int] = candidates[0]
for start in range(0, len(candidates), BATCH):
batch_cands = candidates[start: start + BATCH]
patches = []
for (cx, cy, cw, ch) in batch_cands:
patch = img_old_full[cy: cy + ch, cx: cx + cw]
patches.append(patch)
tensors = [
_REGION_TRANSFORM(Image.fromarray(p)) for p in patches
]
batch_t = torch.stack(tensors).to(device) # (B, 3, 224, 224)
with torch.no_grad():
embs, _ = encoder.encode(batch_t) # (B, 128)
# Cosine similarity: q_emb is already L2-normed, embs are L2-normed
sims = (embs @ q_emb).cpu().numpy() # (B,)
idx = int(sims.argmax())
if sims[idx] > best_sim:
best_sim = float(sims[idx])
best_box = batch_cands[idx]
bx, by, bw, bh = best_box
x1o, y1o, x2o, y2o = _clamp_box(bx, by, bw, bh)
logger.info(
"_find_matching_region_in_old: best cosine=%.4f OLD box (%d,%d)–(%d,%d)",
best_sim, x1o, y1o, x2o, y2o,
)
return (x1o, y1o, x2o, y2o)
# ══════════════════════════════════════════════════════════════════════
# CORE PROCESSING
# ══════════════════════════════════════════════════════════════════════
def run_comparison(
pdf_old_file,
pdf_new_file,
skip_old_p1: bool,
skip_new_p1: bool,
enable_align: bool,
compare_mode: str,
page_old_input: int,
page_new_input: int,
page_compare_mode: str = "Full Page",
region_coords=None,
display_dpi: int = 72,
progress=gr.Progress(),
):
dpi = 400 # process DPI β€” higher = more pixel depth in overlay output
if pdf_old_file is None or pdf_new_file is None:
raise gr.Error("Please upload both Previous Revision and New Document PDF files.")
device = _pick_device()
pipeline = CoarseToFinePipeline(
align = enable_align,
device = device,
min_similarity = 0.50,
)
progress(0, desc="Opening PDF files …")
doc_old = fitz.open(pdf_old_file.name)
doc_new = fitz.open(pdf_new_file.name)
# ── Build the list of (old_page_idx, new_page_idx) pairs to process ──
if compare_mode == "Specific Pages":
# Convert 1-based user input to 0-based index
old_idx_req = int(page_old_input or 1) - 1
new_idx_req = int(page_new_input or 1) - 1
# Clamp to valid range
old_idx_req = max(0, min(old_idx_req, len(doc_old) - 1))
new_idx_req = max(0, min(new_idx_req, len(doc_new) - 1))
page_pairs = [(old_idx_req, new_idx_req)]
else:
# Full document mode
old_start = 1 if skip_old_p1 else 0
new_start = 1 if skip_new_p1 else 0
old_pages = len(doc_old) - old_start
new_pages = len(doc_new) - new_start
num_pages = min(old_pages, new_pages)
if skip_old_p1:
gr.Info("Skipping cover page of Previous Revision.")
if skip_new_p1:
gr.Info("Skipping cover page of New Document.")
if old_pages != new_pages:
gr.Warning(
f"Page count mismatch: Previous Revision={old_pages}, New Document={new_pages}. "
f"Processing {num_pages} pages."
)
page_pairs = [(pg + old_start, pg + new_start) for pg in range(num_pages)]
num_pairs = len(page_pairs)
page_results = []
for i, (old_idx, new_idx) in enumerate(page_pairs):
progress(i / num_pairs, desc=f"Processing page {i + 1} / {num_pairs} …")
img_old = _page_to_rgb(doc_old, old_idx, dpi)
img_new = _page_to_rgb(doc_new, new_idx, dpi)
# ── Normalise page dimensions before any cropping ─────────────
# Both pages must have the same native DPI dimensions so that the
# same pixel box selects the same physical region in both docs.
if img_old.shape != img_new.shape:
img_old = cv2.resize(img_old, (img_new.shape[1], img_new.shape[0]))
# ── Specific-region crop ──────────────────────────────────────
# The user drew a box on the NEW-doc preview (at display_dpi).
# Steps:
# 1. Scale the drag coordinates from preview pixels β†’ process DPI pixels.
# 2. Crop the same pixel box from BOTH old and new pages.
# (Engineering drawings keep the same layout between revisions β€”
# same position = same physical area. The ORB aligner inside
# pipeline.compare() handles any sub-pixel drift between the two.)
# 3. Replace img_old / img_new with the two crops β†’ overlay is
# scoped to only the selected region.
if (compare_mode == "Specific Pages"
and page_compare_mode == "Specific Region"
and region_coords):
rx = region_coords.get("x", 0)
ry = region_coords.get("y", 0)
rw = region_coords.get("width", img_new.shape[1])
rh = region_coords.get("height", img_new.shape[0])
sf = dpi / float(display_dpi or 72) # preview px β†’ process DPI px
x1 = max(0, int(rx * sf))
y1 = max(0, int(ry * sf))
x2 = min(img_new.shape[1], int((rx + rw) * sf))
y2 = min(img_new.shape[0], int((ry + rh) * sf))
logger.info(
"Specific Region: display_dpi=%d sf=%.3f "
"preview-box (%d,%d,%d,%d) β†’ process-px (%d,%d)–(%d,%d)",
display_dpi, sf, rx, ry, rw, rh, x1, y1, x2, y2,
)
if x2 > x1 and y2 > y1:
# Step 1 β€” crop the selected region from NEW page
img_new_crop = img_new[y1:y2, x1:x2]
# Step 2 β€” semantic global search: encode the NEW crop with
# ResNet50, slide windows over the FULL OLD page at
# multiple scales, pick the highest cosine-similarity
# window as the matching region in OLD.
ox1, oy1, ox2, oy2 = _find_matching_region_in_old(
new_crop = img_new_crop,
img_old_full = img_old,
encoder = pipeline.matcher.encoder,
device = device,
)
logger.info(
"Specific Region: NEW (%d,%d)–(%d,%d) β†’ OLD (%d,%d)–(%d,%d)",
x1, y1, x2, y2, ox1, oy1, ox2, oy2,
)
# Step 3 β€” crop OLD at found location; resize to exactly match
# NEW crop so pipeline.compare() gets equal-size inputs
img_old_raw = img_old[oy1:oy2, ox1:ox2]
nh, nw = img_new_crop.shape[:2]
if img_old_raw.shape[:2] != (nh, nw):
img_old_crop = cv2.resize(
img_old_raw, (nw, nh), interpolation=cv2.INTER_LINEAR,
)
else:
img_old_crop = img_old_raw
# Step 4 β€” overlay is scoped to the selected region only
img_old = img_old_crop
img_new = img_new_crop
result = pipeline.compare(img_old, img_new)
old_aligned_for_check = (
result.img_old_aligned if result.img_old_aligned is not None
else img_old
)
align_check = Visualiser.draw_alignment_check(old_aligned_for_check, img_new)
page_results.append({
"page": i + 1,
"result": result,
"align_check": Image.fromarray(align_check),
"original": Image.fromarray(img_old),
"revised": Image.fromarray(img_new),
"total_change_pct": result.total_change_pct,
})
doc_old.close()
doc_new.close()
progress(0.95, desc="Generating report PDF …")
output_pdf = _build_output_pdf(page_results, "ctf_output.pdf", process_dpi=dpi)
summary = _build_summary(page_results, enable_align, skip_old_p1, skip_new_p1)
progress(1.0, desc="Done!")
return page_results, summary, output_pdf, 1, gr.update(maximum=num_pairs, value=1)
def get_page_view(page_num, pages_data, view_mode, rotation: int = 0,
nudge_x: int = 0, nudge_y: int = 0, nudge_scale: float = 1.0):
if not pages_data:
return None
idx = int(page_num) - 1
idx = max(0, min(idx, len(pages_data) - 1))
pr = pages_data[idx]
key_map = {
"Auto-Overlay": "align_check",
"Previous Revision": "original",
"New Document": "revised",
}
img = pr.get(key_map.get(view_mode, "align_check"))
if img is None:
return None
# Manual fine-tune: only applies to Auto-Overlay view
ns = float(nudge_scale) if nudge_scale else 1.0
if view_mode == "Auto-Overlay" and (nudge_x != 0 or nudge_y != 0 or abs(ns - 1.0) > 1e-4):
img = _apply_nudge_overlay(pr, nudge_x, nudge_y, ns)
if img is not None and rotation % 360 != 0:
img = img.rotate(rotation, expand=True)
return img
def _apply_nudge_overlay(pr: dict, dx: int, dy: int, scale: float = 1.0) -> Image.Image:
"""
Re-render the Auto-Overlay with the NEW (red) layer shifted by (dx, dy) pixels
and scaled by `scale` around the image centre.
Cyan channel stays fixed (Previous Revision aligned).
Red channel = New Doc with nudge translate + scale applied.
"""
if pr.get("align_check") is None:
return None
# Extract channels from the stored align_check image
align_check_arr = np.array(pr["align_check"].convert("RGB"))
g_old_aligned = align_check_arr[:, :, 0] # cyan source (Previous Revision)
g_new_orig = align_check_arr[:, :, 1] # red source (New Doc)
h, w = g_old_aligned.shape
cx, cy = w / 2.0, h / 2.0
# Build combined affine: scale about centre + translate
# M = T(cx,cy) Β· S(scale) Β· T(-cx,-cy) Β· T(dx,dy)
scale = float(scale) if scale and scale > 0 else 1.0
# Combined 2Γ—3 affine matrix
M = np.float32([
[scale, 0, dx + cx * (1 - scale)],
[0, scale, dy + cy * (1 - scale)],
])
g_new_transformed = cv2.warpAffine(
g_new_orig, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=255,
)
# Stack: R=old_aligned (cyan base), G=new_transformed, B=new_transformed (β†’ red fringe)
overlay = np.stack([g_old_aligned, g_new_transformed, g_new_transformed], axis=2)
return Image.fromarray(overlay.astype(np.uint8))
# ══════════════════════════════════════════════════════════════════════
# GRADIO UI
# ══════════════════════════════════════════════════════════════════════
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "styles.css"),
encoding="utf-8") as _css_f:
_CSS = _css_f.read()
_THEME = gr.themes.Base(
primary_hue=gr.themes.colors.blue,
neutral_hue=gr.themes.colors.gray,
font=[gr.themes.GoogleFont("Inter"), "sans-serif"],
)
# Gradio 6+: theme & css are passed to launch(), not Blocks()
with gr.Blocks(title="POWERGRID Document Auditor") as demo:
# ── Header ─────────────────────────────────────────────────────────
_logo_tag = (
f'<img src="{_LOGO_URI}" alt="POWERGRID Logo" />'
if _LOGO_URI else
'<span style="font-size:1.4rem;font-weight:900;color:#003087;letter-spacing:-1px;">PG</span>'
)
gr.HTML(f"""
<div id="app-header">
<div id="app-header-inner">
<div id="app-header-logo">{_logo_tag}</div>
<div id="app-header-text">
<h1>POWERGRID Document Auditor</h1>
<p>Power Grid Corporation of India Limited &nbsp;&mdash;&nbsp; AI-Powered Document Comparison</p>
</div>
</div>
</div>
""")
# (JS injected via demo.load below β€” see end of Blocks context)
# ── Shared State ───────────────────────────────────────────────────
pages_state = gr.State(value=None)
rotation_state = gr.State(value=0)
nudge_x_state = gr.State(value=0) # manual X offset for red (New Doc) layer
nudge_y_state = gr.State(value=0) # manual Y offset for red (New Doc) layer
nudge_scale_state = gr.State(value=1.0) # manual scale for red (New Doc) layer
region_coords_state = gr.State(value=None) # {x,y,width,height} in preview px; None = full page
display_dpi_state = gr.State(value=72) # DPI used when rendering the region preview
# ── Layout ─────────────────────────────────────────────────────────
with gr.Row(equal_height=False):
# ════════════════════════════════════════════════════════════
# LEFT PANE β€” inputs
# ════════════════════════════════════════════════════════════
with gr.Column(scale=1, min_width=290, elem_id="left-panel"):
gr.HTML('<div class="section-label">Documents</div>')
pdf_old = gr.File(label="Previous Revision PDF", file_types=[".pdf"])
skip_old_p1 = gr.Checkbox(
value=False,
label="Skip cover page of Previous Revision",
interactive=False,
elem_classes=["skip-cb"],
)
gr.HTML('<div class="section-divider"></div>')
pdf_new = gr.File(label="Revised (New) PDF", file_types=[".pdf"])
skip_new_p1 = gr.Checkbox(
value=False,
label="Skip cover page of New Revision",
interactive=False,
elem_classes=["skip-cb"],
)
gr.HTML('<div class="section-divider"></div>')
gr.HTML('<div class="section-label">Options</div>')
enable_align = gr.Checkbox(
value=True,
label="Auto-align pages before comparing",
info="Enable if documents were scanned or printed at different positions or scales.",
)
gr.HTML('<div class="section-divider"></div>')
gr.HTML('<div class="section-label">Compare Mode</div>')
compare_mode = gr.Radio(
choices=["Full Document", "Specific Pages"],
value="Full Document",
label="Compare Mode",
show_label=False,
elem_id="compare-mode-radio",
)
with gr.Row(visible=False, elem_id="specific-pages-row") as specific_pages_row:
page_old_input = gr.Number(
value=1, minimum=1, step=1, precision=0,
label="Prev. Revision Page",
elem_id="page-old-input",
)
page_new_input = gr.Number(
value=1, minimum=1, step=1, precision=0,
label="New Document Page",
elem_id="page-new-input",
)
# Sub-options shown when "Specific Pages" is selected
with gr.Column(visible=False, elem_id="region-col") as region_col:
page_compare_mode = gr.Radio(
choices=["Full Page", "Specific Region"],
value="Full Page",
label="Page Comparison",
show_label=True,
elem_id="page-compare-mode-radio",
)
# Region selection β€” gr.Image shows the page; canvas overlay captures bbox drag
with gr.Column(visible=False, elem_id="region-preview-col") as region_preview_col:
region_readout = gr.HTML(
value='<div id="region-readout">No region selected β€” full page will be used</div>',
elem_id="region-readout",
)
# gr.Image: Python pushes the page PIL image here (always visible in DOM)
region_page_img = gr.Image(
value=None,
label=None,
show_label=False,
type="pil",
interactive=False,
elem_id="region-page-img",
height=380,
)
# Coords textbox: JS→Python bridge — visible but CSS-collapsed to 0px
region_coords_txt = gr.Textbox(
value="",
label=None,
show_label=False,
elem_id="region-coords-txt",
elem_classes=["region-coords-hidden"],
)
clear_region_btn = gr.Button(
"βœ• Clear Region",
size="sm",
elem_id="clear-region-btn",
)
gr.HTML('<div class="section-divider"></div>')
run_btn = gr.Button("Run Audit", variant="primary", size="lg", elem_id="run-btn")
gr.HTML('<div class="section-divider"></div>')
gr.HTML('<div class="section-label">Fine-Tune Alignment</div>')
# ── MacBook-style arrow key D-pad ─────────────────────────
# Row 1: [ β–² ] (centred, half-row)
with gr.Row(equal_height=True, elem_id="nudge-row-top"):
gr.HTML('<div style="flex:1;min-width:0"></div>')
nudge_up_btn = gr.Button("β–²", elem_id="nudge-up", min_width=44, scale=0)
gr.HTML('<div style="flex:1;min-width:0"></div>')
# Row 2: [ β—€ ][ β–Ό ][ β–Ά ]
with gr.Row(equal_height=True, elem_id="nudge-row-bot"):
nudge_left_btn = gr.Button("β—€", elem_id="nudge-left", min_width=44, scale=0)
nudge_down_btn = gr.Button("β–Ό", elem_id="nudge-down", min_width=44, scale=0)
nudge_right_btn = gr.Button("β–Ά", elem_id="nudge-right", min_width=44, scale=0)
gr.HTML('<p class="nudge-tip">Tip: Run Audit resets alignment</p>')
nudge_step = gr.Number(
value=1, minimum=1, maximum=100, step=1,
label="Step Size (px)", precision=0,
elem_id="nudge-step",
)
nudge_scale = gr.Number(
value=1.0, minimum=0.10, maximum=10.0, step=0.005,
label="Scale β€” Red Layer", precision=3,
elem_id="nudge-scale",
)
nudge_readout = gr.HTML(
value='<div id="nudge-readout-wrap">x&nbsp;=&nbsp;+0 px<br>y&nbsp;=&nbsp;+0 px<br>scale&nbsp;=&nbsp;1.000</div>',
elem_id="nudge-readout",
)
# ════════════════════════════════════════════════════════════
# RIGHT PANE β€” results
# ════════════════════════════════════════════════════════════
with gr.Column(scale=3, elem_id="right-panel"):
# ── Toolbar: view tabs | rotation buttons ──
with gr.Row(elem_id="toolbar-row"):
view_mode = gr.Radio(
choices=["Auto-Overlay", "Previous Revision", "New Document"],
value="Auto-Overlay",
label="View",
show_label=False,
scale=1,
min_width=320,
elem_id="view-mode-radio",
)
gr.HTML('<div class="toolbar-sep"></div>')
rot_left_btn = gr.Button("β†Ί", scale=0, elem_id="rot-left", min_width=38)
rot_right_btn = gr.Button("↻", scale=0, elem_id="rot-right", min_width=38)
# ── Page slider (shown only after audit runs) ──────────────
page_slider = gr.Slider(
minimum=1, maximum=1, value=1, step=1,
label="Page",
visible=False,
elem_id="page-slider",
)
# Hidden state
page_num_state = gr.State(value=1)
total_pages_state = gr.State(value=1)
result_image = gr.Image(
label="",
type="pil",
height=720,
interactive=False,
elem_id="result-image",
)
gr.HTML("""
<div id="legend-bar" style="display:flex; gap:18px; flex-wrap:wrap; align-items:center;">
<span style="font-size:0.60rem;font-weight:700;color:#8BA0BB;text-transform:uppercase;
letter-spacing:0.11em;white-space:nowrap;flex-shrink:0;">Overlay Legend</span>
<span style="display:flex;align-items:center;gap:6px;">
<span style="width:12px;height:12px;border-radius:3px;background:#7A7A7A;
flex-shrink:0;display:inline-block;box-shadow:0 1px 2px rgba(0,0,0,0.15);"></span>
<span style="font-size:0.75rem;color:#4A6585;white-space:nowrap;">
<b style="color:#0F1C2E;font-weight:600;">Gray</b>&nbsp;&mdash;&nbsp;Unchanged</span>
</span>
<span style="display:flex;align-items:center;gap:6px;">
<span style="width:12px;height:12px;border-radius:3px;background:#00BBBB;
flex-shrink:0;display:inline-block;box-shadow:0 1px 2px rgba(0,0,0,0.15);"></span>
<span style="font-size:0.75rem;color:#4A6585;white-space:nowrap;">
<b style="color:#007070;font-weight:600;">Cyan</b>&nbsp;&mdash;&nbsp;Previous Revision</span>
</span>
<span style="display:flex;align-items:center;gap:6px;">
<span style="width:12px;height:12px;border-radius:3px;background:#EE3333;
flex-shrink:0;display:inline-block;box-shadow:0 1px 2px rgba(0,0,0,0.15);"></span>
<span style="font-size:0.75rem;color:#4A6585;white-space:nowrap;">
<b style="color:#BB0000;font-weight:600;">Red</b>&nbsp;&mdash;&nbsp;New Document</span>
</span>
</div>
""")
with gr.Row():
pdf_output = gr.File(label="⬇️ Download Result PDF")
# ══════════════════════════════════════════════════════════════════
# EVENT HANDLERS
# ══════════════════════════════════════════════════════════════════
def on_pdf_upload(pdf_file):
"""Disable skip-cover-page checkbox when uploaded PDF has only 1 page."""
if pdf_file is None:
return gr.update(interactive=False, value=False)
try:
doc = fitz.open(pdf_file.name)
n = len(doc)
doc.close()
if n <= 1:
return gr.update(interactive=False, value=False)
else:
return gr.update(interactive=True)
except Exception:
return gr.update(interactive=True)
def _readout_html(nx: int, ny: int, ns: float) -> str:
return (
f'<div id="nudge-readout-wrap">'
f'x&nbsp;=&nbsp;{nx:+d}&thinsp;px<br>'
f'y&nbsp;=&nbsp;{ny:+d}&thinsp;px<br>'
f'scale&nbsp;=&nbsp;{ns:.3f}'
f'</div>'
)
def on_compare_mode_change(mode):
"""Show/hide the specific-page number inputs and region sub-options."""
show = (mode == "Specific Pages")
return gr.update(visible=show), gr.update(visible=show)
def on_load_preview(pdf_new_f, pg_new):
"""Render the New Doc page at 72 DPI and return as PIL image for inline display."""
if pdf_new_f is None:
raise gr.Error("Please upload the New Document PDF first.")
preview_dpi = 72
doc = fitz.open(pdf_new_f.name)
idx = max(0, int(pg_new or 1) - 1)
idx = min(idx, len(doc) - 1)
arr = _page_to_rgb(doc, idx, preview_dpi)
doc.close()
pil_img = Image.fromarray(arr)
readout = '<div id="region-readout">Draw a box on the image below to select a region</div>'
# returns: pil_img, coords_txt_reset, coords_state_reset, display_dpi, readout
return pil_img, "", None, preview_dpi, readout
def on_region_coords_change(coords_txt):
"""Parse 'x,y,w,h' string written by JS canvas into region_coords_state dict."""
if not coords_txt or coords_txt.strip() == "":
return None, '<div id="region-readout">No region selected β€” full page will be used</div>'
try:
parts = [float(v) for v in coords_txt.strip().split(",")]
x, y, w, h = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3])
if w < 5 or h < 5:
return None, '<div id="region-readout">Region too small β€” drag a larger area</div>'
coords = {"x": x, "y": y, "width": w, "height": h}
readout = (
f'<div id="region-readout">'
f'βœ… Region: ({x}, {y}) β†’ ({x+w}, {y+h})'
f'&nbsp;|&nbsp;{w}&times;{h} px'
f'</div>'
)
return coords, readout
except Exception:
return None, '<div id="region-readout">Invalid region β€” drag again</div>'
def on_clear_region():
"""Reset region β€” clear coords textbox and state (image stays, JS clears the overlay)."""
return "", None, '<div id="region-readout">Draw a box on the image below to select a region</div>'
def on_run(pdf_old_f, pdf_new_f, skip_old, skip_new, align,
cmp_mode, pg_old, pg_new,
pg_cmp_mode, region_coords, display_dpi,
progress=gr.Progress()):
page_results, _summary, pdf_path, _, _ = run_comparison(
pdf_old_f, pdf_new_f, skip_old, skip_new, align,
cmp_mode, pg_old, pg_new,
pg_cmp_mode, region_coords, display_dpi,
progress
)
n_pages = len(page_results)
first_img = page_results[0]["align_check"] if page_results else None
return (
page_results,
0, # rotation reset
0, # nudge_x reset
0, # nudge_y reset
1.0, # nudge_scale reset
1, # page_num reset to 1
n_pages,# total_pages
pdf_path,
first_img,
_readout_html(0, 0, 1.0),
gr.update(visible=n_pages > 1, minimum=1, maximum=n_pages, value=1),
)
def on_view_change(view, pg, total, pages_data, rot, nx, ny, ns):
return get_page_view(pg, pages_data, view, 0, nx, ny, ns), 0
def on_rot_left(pg, total, pages_data, view, rot, nx, ny, ns):
new_rot = (rot + 90) % 360
return get_page_view(pg, pages_data, view, new_rot, nx, ny, ns), new_rot
def on_rot_right(pg, total, pages_data, view, rot, nx, ny, ns):
new_rot = (rot - 90) % 360
return get_page_view(pg, pages_data, view, new_rot, nx, ny, ns), new_rot
def on_pg_slide(pg, total, pages_data, view, rot, nx, ny, ns):
pg = int(pg or 1)
img = get_page_view(pg, pages_data, view, rot, nx, ny, ns)
return img, pg
# ── Nudge handlers (arrow buttons + scale change) ─────────────────
def on_nudge(direction: str, pg, total, pages_data, view, rot, nx, ny, ns, step):
step = int(step or 1)
if direction == "left": nx -= step
elif direction == "right": nx += step
elif direction == "up": ny -= step
elif direction == "down": ny += step
img = get_page_view(pg, pages_data, view, rot, nx, ny, ns)
return img, nx, ny, ns, _readout_html(nx, ny, ns)
def on_scale_change(sc, pg, total, pages_data, view, rot, nx, ny):
ns = float(sc) if sc else 1.0
img = get_page_view(pg, pages_data, view, rot, nx, ny, ns)
return img, ns, _readout_html(nx, ny, ns)
pdf_old.change(fn=on_pdf_upload, inputs=[pdf_old], outputs=[skip_old_p1])
pdf_new.change(fn=on_pdf_upload, inputs=[pdf_new], outputs=[skip_new_p1])
# Show / hide specific-page inputs and region sub-options when compare mode changes
compare_mode.change(
fn=on_compare_mode_change,
inputs=[compare_mode],
outputs=[specific_pages_row, region_col],
)
# Show / hide the region preview block AND auto-load the preview
# _preview_outputs: [region_page_img, region_coords_txt, coords_state, display_dpi_state, region_readout]
_preview_outputs = [region_page_img, region_coords_txt,
region_coords_state, display_dpi_state, region_readout]
def on_page_compare_mode_change(sub_mode, pdf_new_f, pg_new):
show = (sub_mode == "Specific Region")
col_update = gr.update(visible=show)
if show:
try:
pil_img, ctxt, coords, dpi, rdout = on_load_preview(pdf_new_f, pg_new)
return col_update, pil_img, ctxt, coords, dpi, rdout
except Exception:
pass
blank_readout = '<div id="region-readout">No region selected β€” full page will be used</div>'
return col_update, None, "", None, 72, blank_readout
page_compare_mode.change(
fn=on_page_compare_mode_change,
inputs=[page_compare_mode, pdf_new, page_new_input],
outputs=[region_preview_col] + _preview_outputs,
)
# Re-load preview when the New Doc page number changes (if Specific Region is active)
def on_page_new_change(pg_new, pdf_new_f, sub_mode):
if sub_mode == "Specific Region" and pdf_new_f is not None:
try:
return on_load_preview(pdf_new_f, pg_new)
except Exception:
pass
blank_readout = '<div id="region-readout">No region selected β€” full page will be used</div>'
return None, "", None, 72, blank_readout
page_new_input.change(
fn=on_page_new_change,
inputs=[page_new_input, pdf_new, page_compare_mode],
outputs=_preview_outputs,
)
# JS canvas overlay writes "x,y,w,h" into region_coords_txt when drag ends β†’ parse to dict
region_coords_txt.change(
fn=on_region_coords_change,
inputs=[region_coords_txt],
outputs=[region_coords_state, region_readout],
show_progress="hidden",
show_progress_on=[],
)
# Clear region button β€” clear coords, JS overlay self-clears on next poll
clear_region_btn.click(
fn=on_clear_region,
inputs=None,
outputs=[region_coords_txt, region_coords_state, region_readout],
)
run_btn.click(
fn=on_run,
inputs=[pdf_old, pdf_new, skip_old_p1, skip_new_p1, enable_align,
compare_mode, page_old_input, page_new_input,
page_compare_mode, region_coords_state, display_dpi_state],
outputs=[pages_state, rotation_state, nudge_x_state, nudge_y_state, nudge_scale_state,
page_num_state, total_pages_state,
pdf_output, result_image, nudge_readout, page_slider],
)
# View-mode tab change
view_mode.change(
fn=on_view_change,
inputs=[view_mode, page_num_state, total_pages_state, pages_state, rotation_state,
nudge_x_state, nudge_y_state, nudge_scale_state],
outputs=[result_image, rotation_state],
show_progress="hidden",
show_progress_on=[],
)
# Rotation buttons
rot_left_btn.click(
fn=on_rot_left,
inputs=[page_num_state, total_pages_state, pages_state, view_mode, rotation_state,
nudge_x_state, nudge_y_state, nudge_scale_state],
outputs=[result_image, rotation_state],
show_progress="hidden",
show_progress_on=[],
)
rot_right_btn.click(
fn=on_rot_right,
inputs=[page_num_state, total_pages_state, pages_state, view_mode, rotation_state,
nudge_x_state, nudge_y_state, nudge_scale_state],
outputs=[result_image, rotation_state],
show_progress="hidden",
show_progress_on=[],
)
# Page slider
page_slider.change(
fn=on_pg_slide,
inputs=[page_slider, total_pages_state, pages_state, view_mode,
rotation_state, nudge_x_state, nudge_y_state, nudge_scale_state],
outputs=[result_image, page_num_state],
show_progress="hidden",
show_progress_on=[],
)
# ── Nudge arrow buttons ───────────────────────────────────────────
_nudge_inputs = [page_num_state, total_pages_state, pages_state, view_mode, rotation_state,
nudge_x_state, nudge_y_state, nudge_scale_state, nudge_step]
_nudge_outputs = [result_image, nudge_x_state, nudge_y_state,
nudge_scale_state, nudge_readout]
nudge_left_btn.click(
fn=lambda *a: on_nudge("left", *a), inputs=_nudge_inputs, outputs=_nudge_outputs,
show_progress="hidden", show_progress_on=[])
nudge_right_btn.click(
fn=lambda *a: on_nudge("right", *a), inputs=_nudge_inputs, outputs=_nudge_outputs,
show_progress="hidden", show_progress_on=[])
nudge_up_btn.click(
fn=lambda *a: on_nudge("up", *a), inputs=_nudge_inputs, outputs=_nudge_outputs,
show_progress="hidden", show_progress_on=[])
nudge_down_btn.click(
fn=lambda *a: on_nudge("down", *a), inputs=_nudge_inputs, outputs=_nudge_outputs,
show_progress="hidden", show_progress_on=[])
# ── Scale number input (live update on change) ────────────────────
nudge_scale.change(
fn=on_scale_change,
inputs=[nudge_scale, page_num_state, total_pages_state, pages_state, view_mode,
rotation_state, nudge_x_state, nudge_y_state],
outputs=[result_image, nudge_scale_state, nudge_readout],
show_progress="hidden",
show_progress_on=[],
)
# ── Inline canvas JS β€” overlays a transparent draw canvas on the gr.Image ──
_INLINE_CANVAS_JS = """
() => {
let _overlay = null, _ctx = null;
let _dragging = false, _sx = 0, _sy = 0, _sel = null;
let _lastCoords = '';
function getImgEl() {
// The rendered <img> inside the gr.Image component
const wrap = document.getElementById('region-page-img');
return wrap ? wrap.querySelector('img') : null;
}
function getCoordsEl() {
const wrap = document.getElementById('region-coords-txt');
return wrap ? wrap.querySelector('textarea') : null;
}
function syncOverlay() {
if (!_overlay) return;
const img = getImgEl();
if (!img || !img.src || img.src.startsWith('data:image/gif')) return;
const r = img.getBoundingClientRect();
const pr = img.parentElement.getBoundingClientRect();
_overlay.style.left = (r.left - pr.left) + 'px';
_overlay.style.top = (r.top - pr.top) + 'px';
_overlay.style.width = r.width + 'px';
_overlay.style.height = r.height + 'px';
if (_overlay.width !== Math.round(r.width) || _overlay.height !== Math.round(r.height)) {
_overlay.width = Math.round(r.width);
_overlay.height = Math.round(r.height);
redraw();
}
}
function toCanvas(cx, cy) {
const r = _overlay.getBoundingClientRect();
return { x: (cx - r.left) * _overlay.width / r.width,
y: (cy - r.top) * _overlay.height / r.height };
}
function redraw() {
if (!_ctx || !_overlay.width) return;
_ctx.clearRect(0, 0, _overlay.width, _overlay.height);
if (_sel) {
_ctx.strokeStyle = '#00BBBB';
_ctx.lineWidth = Math.max(2, _overlay.width / 400);
_ctx.strokeRect(_sel.x, _sel.y, _sel.w, _sel.h);
_ctx.fillStyle = 'rgba(0,187,187,0.15)';
_ctx.fillRect(_sel.x, _sel.y, _sel.w, _sel.h);
}
}
function pushCoords() {
const el = getCoordsEl();
if (!el || !_sel) return;
// Scale from display px back to natural image px
const img = getImgEl();
if (!img) return;
const scaleX = img.naturalWidth / _overlay.width;
const scaleY = img.naturalHeight / _overlay.height;
const val = Math.round(_sel.x * scaleX) + ',' +
Math.round(_sel.y * scaleY) + ',' +
Math.round(_sel.w * scaleX) + ',' +
Math.round(_sel.h * scaleY);
const setter = Object.getOwnPropertyDescriptor(HTMLTextAreaElement.prototype, 'value').set;
setter.call(el, val);
el.dispatchEvent(new Event('input', { bubbles: true }));
}
function setupOverlay() {
const imgWrap = document.getElementById('region-page-img');
if (!imgWrap) return false;
// Make sure parent is positioned
const parent = imgWrap.querySelector('.image-container') || imgWrap;
if (getComputedStyle(parent).position === 'static') parent.style.position = 'relative';
if (!_overlay) {
_overlay = document.createElement('canvas');
_overlay.id = 'region-draw-overlay';
_overlay.style.cssText = 'position:absolute;top:0;left:0;cursor:crosshair;z-index:10;pointer-events:all;';
parent.appendChild(_overlay);
_ctx = _overlay.getContext('2d');
_overlay.addEventListener('mousedown', function(e) {
const p = toCanvas(e.clientX, e.clientY);
_sx = p.x; _sy = p.y; _sel = null; _dragging = true; e.preventDefault();
});
_overlay.addEventListener('mousemove', function(e) {
if (!_dragging) return;
const p = toCanvas(e.clientX, e.clientY);
_sel = { x: Math.min(_sx, p.x), y: Math.min(_sy, p.y),
w: Math.abs(p.x - _sx), h: Math.abs(p.y - _sy) };
redraw(); e.preventDefault();
});
_overlay.addEventListener('mouseup', function(e) {
if (!_dragging) return; _dragging = false;
if (!_sel || _sel.w < 5 || _sel.h < 5) { _sel = null; redraw(); return; }
redraw(); pushCoords(); e.preventDefault();
});
}
return true;
}
// Poll every 300ms: sync overlay size, watch for cleared coords
setInterval(function() {
setupOverlay();
syncOverlay();
// Clear overlay when coords textbox is wiped by Clear button
const el = getCoordsEl();
if (el) {
const cur = el.value;
if (cur !== _lastCoords) {
_lastCoords = cur;
if (cur === '') { _sel = null; redraw(); }
}
}
}, 300);
}
"""
demo.load(fn=None, js=_INLINE_CANVAS_JS)
# ══════════════════════════════════════════════════════════════════════
# ENTRY POINT
# ══════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import socket as _socket
def _find_free_port(start: int = 7860, end: int = 7880) -> int:
for p in range(start, end + 1):
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as s:
try:
s.bind(("", p))
return p
except OSError:
continue
return start # fallback β€” Gradio will error with a clear message
_port = _find_free_port()
print(f"\nπŸš€ POWERGRID Document Auditor β†’ http://localhost:{_port}\n")
demo.queue(default_concurrency_limit=20).launch(
server_name="0.0.0.0",
server_port=_port,
share=False,
show_error=True,
theme=_THEME,
css=_CSS,
)