| """ |
| app.py β POWERGRID Document Auditor (single-file HuggingFace Spaces build) |
| ============================================================================= |
| Single-file Gradio app for AI-powered engineering drawing comparison. |
| Designed for POWERGRID (765/400/132kV AIS/GIS vendor drawing audits). |
| |
| Pipeline: |
| Stage 1 β Global Alignment : Phase Correlation + ORB/RANSAC homography |
| Stage 2 β Region Extraction : Content-aware morphology (no pretrained detector) |
| Stage 3 β Semantic Matching : ResNet50 embeddings + cosine similarity (position-agnostic) |
| Stage 4 β Siamese Comparison : ResNet50 patch embeddings + GradCAM heatmaps |
| |
| Run locally: |
| python app.py |
| """ |
|
|
| |
| |
| |
|
|
| import base64 |
| import io |
| import logging |
| import os |
| import time |
| from dataclasses import dataclass, field |
| from typing import Dict, List, Optional, Tuple |
|
|
| import cv2 |
| import fitz |
| import gradio as gr |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from PIL import Image |
| from scipy.optimize import linear_sum_assignment |
| from skimage.metrics import structural_similarity as ssim |
| from torchvision import models, transforms |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| def _load_logo_b64(filename: str = "logo_0.png") -> str: |
| """Return a data-URI string for the logo, or empty string if file not found.""" |
| logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename) |
| if os.path.exists(logo_path): |
| with open(logo_path, "rb") as f: |
| b64 = base64.b64encode(f.read()).decode("utf-8") |
| ext = filename.rsplit(".", 1)[-1].lower() |
| mime = "image/png" if ext == "png" else f"image/{ext}" |
| return f"data:{mime};base64,{b64}" |
| return "" |
|
|
| _LOGO_URI = _load_logo_b64("logo_0.png") |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Region: |
| """A detected layout region (axis-aligned bounding box).""" |
| x: int |
| y: int |
| w: int |
| h: int |
| label: str = "text_block" |
| confidence: float = 1.0 |
|
|
| @property |
| def bbox(self) -> Tuple[int, int, int, int]: |
| return (self.x, self.y, self.x + self.w, self.y + self.h) |
|
|
| @property |
| def area(self) -> int: |
| return self.w * self.h |
|
|
| @property |
| def center(self) -> Tuple[float, float]: |
| return (self.x + self.w / 2.0, self.y + self.h / 2.0) |
|
|
| def iou(self, other: "Region") -> float: |
| xa = max(self.x, other.x) |
| ya = max(self.y, other.y) |
| xb = min(self.x + self.w, other.x + other.w) |
| yb = min(self.y + self.h, other.y + other.h) |
| inter = max(0, xb - xa) * max(0, yb - ya) |
| union = self.area + other.area - inter |
| return inter / union if union > 0 else 0.0 |
|
|
|
|
| @dataclass |
| class MatchedPair: |
| """A matched region pair between old and new documents.""" |
| region_old: Region |
| region_new: Region |
| match_score: float |
| position_cost: float |
| appearance_cost: float |
| pixel_diff: float = 0.0 |
| ssim_score: float = 1.0 |
| semantic_diff: float = 0.0 |
| total_change: float = 0.0 |
| heatmap: Optional[np.ndarray] = None |
|
|
|
|
| @dataclass |
| class ComparisonResult: |
| """Full comparison result for one document page.""" |
| matched_pairs: List[MatchedPair] |
| unmatched_old: List[Region] |
| unmatched_new: List[Region] |
| global_transform: Optional[np.ndarray] |
| total_change_pct: float |
| heatmap: np.ndarray |
| img_old_aligned: Optional[np.ndarray] = None |
|
|
| def summary(self) -> str: |
| lines = [ |
| f" Global Alignment : {'Applied' if self.global_transform is not None else 'Skipped'}", |
| f" Matched Pairs : {len(self.matched_pairs)}", |
| f" Deleted Regions : {len(self.unmatched_old)}", |
| f" Added Regions : {len(self.unmatched_new)}", |
| f" Total Change : {self.total_change_pct:.1f}%", |
| ] |
| changed = [p for p in self.matched_pairs if p.total_change > 0.05] |
| if changed: |
| avg_chg = np.mean([p.total_change for p in changed]) |
| lines.append(f" Avg Change (modified regions): {avg_chg:.2f}") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| class GlobalAligner: |
| def __init__(self, orb_features: int = 2000, ransac_threshold: float = 5.0): |
| self.orb_features = orb_features |
| self.ransac_threshold = ransac_threshold |
|
|
| def _phase_correlation_shift(self, gray1: np.ndarray, gray2: np.ndarray) -> Tuple[float, float]: |
| f1 = np.fft.fft2(gray1.astype(np.float32)) |
| f2 = np.fft.fft2(gray2.astype(np.float32)) |
| denom = np.abs(f1 * np.conj(f2)) + 1e-10 |
| cross = (f1 * np.conj(f2)) / denom |
| corr = np.fft.ifft2(cross).real |
| y_shift, x_shift = np.unravel_index(np.argmax(corr), corr.shape) |
| h, w = gray1.shape |
| if y_shift > h // 2: |
| y_shift -= h |
| if x_shift > w // 2: |
| x_shift -= w |
| return float(-x_shift), float(-y_shift) |
|
|
| def _orb_affine(self, gray_old: np.ndarray, gray_new: np.ndarray) -> Optional[np.ndarray]: |
| orb = cv2.ORB_create(nfeatures=self.orb_features) |
| kp1, des1 = orb.detectAndCompute(gray_old, None) |
| kp2, des2 = orb.detectAndCompute(gray_new, None) |
| if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10: |
| return None |
| bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) |
| matches = sorted(bf.match(des1, des2), key=lambda m: m.distance) |
| if len(matches) < 10: |
| return None |
| top_k = min(200, len(matches)) |
| |
| |
| |
| |
| |
| |
| |
| src_pts = np.float32([kp1[m.queryIdx].pt for m in matches[:top_k]]).reshape(-1, 1, 2) |
| dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches[:top_k]]).reshape(-1, 1, 2) |
| M, mask = cv2.estimateAffinePartial2D( |
| src_pts, dst_pts, method=cv2.RANSAC, |
| ransacReprojThreshold=self.ransac_threshold, |
| ) |
| return M |
|
|
| def align(self, img_old: np.ndarray, img_new: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| g_old = cv2.cvtColor(img_old, cv2.COLOR_RGB2GRAY) |
| g_new = cv2.cvtColor(img_new, cv2.COLOR_RGB2GRAY) |
| dx, dy = self._phase_correlation_shift(g_old, g_new) |
| M = self._orb_affine(g_old, g_new) |
| if M is None: |
| M = np.array([[1.0, 0.0, dx], [0.0, 1.0, dy]], dtype=np.float32) |
| h, w = img_old.shape[:2] |
| aligned = cv2.warpAffine( |
| img_old, M, (w, h), |
| flags=cv2.INTER_LINEAR, |
| borderMode=cv2.BORDER_CONSTANT, |
| borderValue=(255, 255, 255), |
| ) |
| return aligned, M |
|
|
|
|
| |
| |
| |
|
|
| class LayoutRegionExtractor: |
| def __init__( |
| self, |
| min_area_ratio: float = 0.0003, |
| max_area_ratio: float = 0.92, |
| dilation_kernel: Tuple[int, int] = (8, 2), |
| dilation_iters: int = 2, |
| merge_iou_threshold: float = 0.40, |
| ): |
| self.min_area_ratio = min_area_ratio |
| self.max_area_ratio = max_area_ratio |
| self.dilation_kernel = dilation_kernel |
| self.dilation_iters = dilation_iters |
| self.merge_iou_threshold = merge_iou_threshold |
|
|
| def _binarise(self, gray: np.ndarray) -> np.ndarray: |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) |
| _, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) |
| return binary |
|
|
| def _dilate(self, binary: np.ndarray) -> np.ndarray: |
| k = cv2.getStructuringElement(cv2.MORPH_RECT, self.dilation_kernel) |
| dilated = cv2.dilate(binary, k, iterations=self.dilation_iters) |
| k_line = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 1)) |
| dilated = cv2.dilate(dilated, k_line, iterations=1) |
| k_vert = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 8)) |
| return cv2.morphologyEx(dilated, cv2.MORPH_CLOSE, k_vert) |
|
|
| def _classify(self, patch_gray: np.ndarray, w: int, h: int) -> str: |
| aspect = w / max(h, 1) |
| _, binary = cv2.threshold(patch_gray, 127, 255, cv2.THRESH_BINARY_INV) |
| density = np.sum(binary > 0) / max(w * h, 1) |
| if density < 0.02: |
| contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| if len(contours) < 3: |
| return "margin" |
| if aspect > 4.0 and density > 0.06: |
| return "text_block" |
| if 0.4 < aspect < 2.8 and density < 0.25: |
| return "figure" |
| if density > 0.18 and aspect > 1.0: |
| return "table" |
| return "text_block" |
|
|
| def _merge_overlapping(self, regions: List[Region]) -> List[Region]: |
| changed = True |
| while changed: |
| changed = False |
| used = [False] * len(regions) |
| merged: List[Region] = [] |
| for i, r1 in enumerate(regions): |
| if used[i]: |
| continue |
| x0, y0 = r1.x, r1.y |
| x1, y1 = r1.x + r1.w, r1.y + r1.h |
| for j, r2 in enumerate(regions): |
| if i == j or used[j]: |
| continue |
| expanded = Region(x0, y0, x1 - x0, y1 - y0) |
| if expanded.iou(r2) > self.merge_iou_threshold: |
| x0 = min(x0, r2.x) |
| y0 = min(y0, r2.y) |
| x1 = max(x1, r2.x + r2.w) |
| y1 = max(y1, r2.y + r2.h) |
| used[j] = True |
| changed = True |
| merged.append(Region(x0, y0, x1 - x0, y1 - y0)) |
| used[i] = True |
| regions = merged |
| return regions |
|
|
| def extract(self, img_rgb: np.ndarray) -> List[Region]: |
| h, w = img_rgb.shape[:2] |
| page_area = h * w |
| gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) |
| binary = self._binarise(gray) |
| dilated = self._dilate(binary) |
| contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| candidates: List[Region] = [] |
| for cnt in contours: |
| rx, ry, rw, rh = cv2.boundingRect(cnt) |
| area = rw * rh |
| if area < page_area * self.min_area_ratio: |
| continue |
| if area > page_area * self.max_area_ratio: |
| continue |
| patch = gray[ry: ry + rh, rx: rx + rw] |
| label = self._classify(patch, rw, rh) |
| if label == "margin": |
| continue |
| candidates.append(Region(rx, ry, rw, rh, label=label)) |
| regions = self._merge_overlapping(candidates) |
| regions.sort(key=lambda r: (r.y // 50, r.x)) |
| logger.info("LayoutExtractor: %d regions detected", len(regions)) |
| return regions |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| class SemanticRetrievalMatcher: |
| """ |
| Replaces HungarianRegionMatcher for layout-shift-robust document comparison. |
| |
| Strategy |
| -------- |
| For every region in the NEW page: |
| 1. Extract the patch image from the NEW document. |
| 2. Encode it with the shared ResNet50 backbone β 128-d L2-normalised vector. |
| Simultaneously encode every OLD region patch. |
| Build an (N_new Γ N_old) cosine-similarity matrix. |
| Run scipy.linear_sum_assignment on βsimilarity (maximise similarity). |
| Accept a pair only when similarity β₯ min_similarity. |
| |
| This means a region that has *moved* (different x/y) but is otherwise |
| identical will still get similarity β 1.0 and be matched correctly. |
| """ |
|
|
| def __init__( |
| self, |
| encoder: "_SiameseEncoder", |
| device: torch.device, |
| min_similarity: float = 0.50, |
| thumbnail_size: Tuple[int, int] = (224, 224), |
| ): |
| self.encoder = encoder |
| self.device = device |
| self.min_similarity = min_similarity |
| self._transform = transforms.Compose([ |
| transforms.Resize(thumbnail_size), |
| transforms.ToTensor(), |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], |
| std=[0.229, 0.224, 0.225]), |
| ]) |
|
|
| |
| def _patch(self, region: Region, img: np.ndarray) -> np.ndarray: |
| """Crop a region from the image; returns white 64Γ64 if empty.""" |
| p = img[region.y: region.y + region.h, region.x: region.x + region.w] |
| if p.size == 0: |
| p = np.full((64, 64, 3), 255, dtype=np.uint8) |
| return p |
|
|
| def _embed(self, patches: List[np.ndarray]) -> torch.Tensor: |
| """ |
| Batch-encode a list of patches β (N, 128) normalised embedding tensor. |
| Runs entirely on self.device with no gradient. |
| """ |
| tensors = [ |
| self._transform(Image.fromarray(p)) for p in patches |
| ] |
| batch = torch.stack(tensors).to(self.device) |
| with torch.no_grad(): |
| embeddings, _ = self.encoder.encode(batch) |
| return embeddings |
|
|
| |
| def match( |
| self, |
| regions_old: List[Region], |
| regions_new: List[Region], |
| img_old: np.ndarray, |
| img_new: np.ndarray, |
| ) -> Tuple[List[MatchedPair], List[Region], List[Region]]: |
| n_old, n_new = len(regions_old), len(regions_new) |
| if n_old == 0 or n_new == 0: |
| return [], list(regions_old), list(regions_new) |
|
|
| |
| patches_old = [self._patch(r, img_old) for r in regions_old] |
| patches_new = [self._patch(r, img_new) for r in regions_new] |
|
|
| emb_old = self._embed(patches_old) |
| emb_new = self._embed(patches_new) |
|
|
| |
| |
| sim_mat = torch.mm(emb_new, emb_old.T).cpu().numpy() |
|
|
| |
| row_ind, col_ind = linear_sum_assignment(-sim_mat) |
|
|
| matched_pairs: List[MatchedPair] = [] |
| matched_old_idx: set = set() |
| matched_new_idx: set = set() |
|
|
| for ri, ci in zip(row_ind, col_ind): |
| sim = float(sim_mat[ri, ci]) |
| if sim < self.min_similarity: |
| continue |
| matched_pairs.append(MatchedPair( |
| region_old = regions_old[ci], |
| region_new = regions_new[ri], |
| match_score = sim, |
| position_cost = 0.0, |
| appearance_cost= max(0.0, 1.0 - sim), |
| )) |
| matched_old_idx.add(ci) |
| matched_new_idx.add(ri) |
|
|
| unmatched_old = [regions_old[i] for i in range(n_old) if i not in matched_old_idx] |
| unmatched_new = [regions_new[j] for j in range(n_new) if j not in matched_new_idx] |
|
|
| logger.info( |
| "SemanticRetrieval: %d matched | %d deleted | %d added " |
| "(min_sim=%.2f)", |
| len(matched_pairs), len(unmatched_old), len(unmatched_new), |
| self.min_similarity, |
| ) |
| return matched_pairs, unmatched_old, unmatched_new |
|
|
|
|
| |
| |
| |
|
|
| class _SiameseEncoder(nn.Module): |
| def __init__(self): |
| super().__init__() |
| resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) |
| self.features = nn.Sequential(*list(resnet.children())[:-2]) |
| self.pool = resnet.avgpool |
| self.embed = nn.Sequential( |
| nn.Linear(2048, 512), nn.ReLU(), |
| nn.Linear(512, 128), |
| ) |
|
|
| def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
| feat_map = self.features(x) |
| pooled = torch.flatten(self.pool(feat_map), 1) |
| embed = F.normalize(self.embed(pooled), p=2, dim=1) |
| return embed, feat_map |
|
|
| def forward(self, x1: torch.Tensor, x2: torch.Tensor): |
| e1, f1 = self.encode(x1) |
| e2, f2 = self.encode(x2) |
| return e1, e2, f1, f2 |
|
|
|
|
| class SiamesePatchComparator: |
| def __init__( |
| self, |
| device: Optional[torch.device] = None, |
| encoder: Optional[_SiameseEncoder] = None, |
| ): |
| if device is None: |
| if torch.cuda.is_available(): |
| device = torch.device("cuda") |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): |
| device = torch.device("mps") |
| else: |
| device = torch.device("cpu") |
| self.device = device |
| |
| |
| if encoder is not None: |
| self.model = encoder |
| logger.info("SiamesePatchComparator: reusing shared encoder on %s", device) |
| else: |
| self.model = _SiameseEncoder().to(device).eval() |
| logger.info("SiamesePatchComparator: created new encoder on %s", device) |
| self.transform = transforms.Compose([ |
| transforms.Resize((224, 224)), |
| transforms.ToTensor(), |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
| ]) |
|
|
| def _to_tensor(self, patch_rgb: np.ndarray) -> torch.Tensor: |
| return self.transform(Image.fromarray(patch_rgb)).unsqueeze(0).to(self.device) |
|
|
| def _grad_cam( |
| self, |
| patch_old: np.ndarray, |
| patch_new: np.ndarray, |
| target_hw: Tuple[int, int], |
| ) -> np.ndarray: |
| """ |
| Grad-CAM spatial change map β WHERE inside the patch the embedding differs. |
| |
| Method |
| ------ |
| 1. Forward patch_old (no grad) β embedding e_old. |
| 2. Forward patch_new (with grad, hooks on last conv block) β embedding e_new |
| + feature map F captured by forward hook. |
| 3. Scalar loss = pairwise_distance(e_old.detach(), e_new). |
| 4. loss.backward() β βloss/βF captured by backward hook. |
| 5. Grad-CAM = ReLU( mean_c(βloss/βF) Β· F ) β (7Γ7) β upsample to patch size. |
| |
| Pixels with HIGH activation changed the embedding the most β the actual edits. |
| |
| Returns |
| ------- |
| np.ndarray shape (target_hw[0], target_hw[1]), float32, values in [0, 1]. |
| """ |
| t_old = self._to_tensor(patch_old) |
| t_new = self._to_tensor(patch_new) |
|
|
| feat_store: Dict[str, torch.Tensor] = {} |
| grad_store: Dict[str, torch.Tensor] = {} |
|
|
| |
| last_block = self.model.features[-1] |
|
|
| def _fwd(module, inp, out): |
| feat_store["f"] = out |
|
|
| def _bwd(module, grad_in, grad_out): |
| grad_store["g"] = grad_out[0] |
|
|
| h_fwd = last_block.register_forward_hook(_fwd) |
| h_bwd = last_block.register_full_backward_hook(_bwd) |
|
|
| try: |
| |
| with torch.no_grad(): |
| e_old, _ = self.model.encode(t_old) |
|
|
| |
| with torch.enable_grad(): |
| self.model.zero_grad() |
| e_new, _ = self.model.encode(t_new) |
| dist = F.pairwise_distance(e_old.detach(), e_new) |
| dist.backward() |
| finally: |
| h_fwd.remove() |
| h_bwd.remove() |
|
|
| if "f" not in feat_store or "g" not in grad_store: |
| return np.zeros(target_hw, dtype=np.float32) |
|
|
| |
| weights = grad_store["g"].mean(dim=[2, 3], keepdim=True) |
| cam = (weights * feat_store["f"]).sum(dim=1).squeeze() |
| cam = F.relu(cam) |
|
|
| cam_max = cam.max() |
| if cam_max < 1e-8: |
| return np.zeros(target_hw, dtype=np.float32) |
|
|
| cam = (cam / cam_max).detach().cpu().numpy() |
|
|
| |
| h, w = target_hw |
| cam_up = cv2.resize(cam, (w, h), interpolation=cv2.INTER_LINEAR) |
| return np.clip(cam_up, 0.0, 1.0).astype(np.float32) |
|
|
| def compare(self, patch_old: np.ndarray, patch_new: np.ndarray) -> Dict[str, object]: |
| g_old = cv2.cvtColor(patch_old, cv2.COLOR_RGB2GRAY).astype(np.float32) |
| g_new = cv2.cvtColor(patch_new, cv2.COLOR_RGB2GRAY).astype(np.float32) |
| diff_map = np.abs(g_old - g_new) |
| |
| |
| changed_pixels = np.sum(diff_map > 8.0) |
| pixel_diff = float(changed_pixels) / max(g_old.size, 1) |
| ssim_val = float(ssim(g_old, g_new, data_range=255.0)) |
| ssim_cost = max(0.0, 1.0 - ssim_val) |
| with torch.no_grad(): |
| t1 = self._to_tensor(patch_old) |
| t2 = self._to_tensor(patch_new) |
| e1, e2, _, _ = self.model(t1, t2) |
| l2_dist = float(F.pairwise_distance(e1, e2).item()) |
| semantic_diff = min(l2_dist / 10.0, 1.0) |
| total = 0.30 * pixel_diff + 0.40 * ssim_cost + 0.30 * semantic_diff |
|
|
| |
| h, w = patch_new.shape[:2] |
| grad_cam_map = self._grad_cam(patch_old, patch_new, (h, w)) |
|
|
| return { |
| "pixel_diff": pixel_diff, |
| "ssim_score": ssim_val, |
| "semantic_diff":semantic_diff, |
| "total_change": min(float(total), 1.0), |
| "grad_cam": grad_cam_map, |
| } |
|
|
| def compare_pair(self, pair: MatchedPair, img_old: np.ndarray, img_new: np.ndarray) -> MatchedPair: |
| ro, rn = pair.region_old, pair.region_new |
| patch_old = img_old[ro.y: ro.y + ro.h, ro.x: ro.x + ro.w] |
| patch_new = img_new[rn.y: rn.y + rn.h, rn.x: rn.x + rn.w] |
| if patch_old.size == 0 or patch_new.size == 0: |
| return pair |
| target_h = max(patch_old.shape[0], patch_new.shape[0]) |
| target_w = max(patch_old.shape[1], patch_new.shape[1]) |
|
|
| def _pad_white(patch: np.ndarray, th: int, tw: int) -> np.ndarray: |
| canvas = np.full((th, tw, patch.shape[2]), 255, dtype=np.uint8) |
| canvas[:patch.shape[0], :patch.shape[1]] = patch |
| return canvas |
|
|
| patch_old_p = _pad_white(patch_old, target_h, target_w) |
| patch_new_p = _pad_white(patch_new, target_h, target_w) |
| metrics = self.compare(patch_old_p, patch_new_p) |
| pair.pixel_diff = metrics["pixel_diff"] |
| pair.ssim_score = metrics["ssim_score"] |
| pair.semantic_diff = metrics["semantic_diff"] |
| pair.total_change = metrics["total_change"] |
| |
| raw_cam = metrics.get("grad_cam") |
| if raw_cam is not None: |
| rn = pair.region_new |
| pair.heatmap = cv2.resize(raw_cam, (rn.w, rn.h), |
| interpolation=cv2.INTER_LINEAR) |
| return pair |
|
|
|
|
| |
| |
| |
|
|
| class HeatmapGenerator: |
| _COLOUR_CHANGED = np.array([255, 220, 0], dtype=np.float32) |
| _COLOUR_MAJOR = np.array([230, 30, 30], dtype=np.float32) |
| _COLOUR_ADDED = np.array([ 30, 200, 60], dtype=np.float32) |
| _COLOUR_DELETED = np.array([200, 30, 200], dtype=np.float32) |
|
|
| @staticmethod |
| def _project_region(r: Region, M_inv: Optional[np.ndarray], w: int, h: int) -> Tuple[int, int, int, int]: |
| if M_inv is not None: |
| corners = np.array([ |
| [r.x, r.y ], |
| [r.x + r.w, r.y ], |
| [r.x, r.y + r.h], |
| [r.x + r.w, r.y + r.h], |
| ], dtype=np.float32) |
| ones = np.ones((4, 1), dtype=np.float32) |
| projected = (M_inv @ np.hstack([corners, ones]).T).T |
| x0 = int(np.clip(projected[:, 0].min(), 0, w - 1)) |
| y0 = int(np.clip(projected[:, 1].min(), 0, h - 1)) |
| x1 = int(np.clip(projected[:, 0].max(), 0, w - 1)) |
| y1 = int(np.clip(projected[:, 1].max(), 0, h - 1)) |
| else: |
| x0, y0, x1, y1 = r.x, r.y, r.x + r.w, r.y + r.h |
| return x0, y0, x1, y1 |
|
|
| @staticmethod |
| def generate( |
| img_shape: Tuple[int, int], |
| matched_pairs: List[MatchedPair], |
| unmatched_old: List[Region], |
| unmatched_new: List[Region], |
| smooth_kernel: int = 11, |
| M_inv: Optional[np.ndarray] = None, |
| change_threshold: float = 0.05, |
| ) -> np.ndarray: |
| h, w = img_shape |
| layers = np.zeros((h, w, 4), dtype=np.float32) |
| for pair in matched_pairs: |
| chg = float(pair.total_change) |
| if chg <= change_threshold: |
| continue |
| r = pair.region_new |
| ch = 0 if chg <= 0.40 else 1 |
|
|
| if pair.heatmap is not None: |
| |
| |
| |
| cam = pair.heatmap |
| if cam.shape != (r.h, r.w): |
| cam = cv2.resize(cam, (r.w, r.h), |
| interpolation=cv2.INTER_LINEAR) |
| intensity = np.clip(cam * chg, 0.0, 1.0) |
| layers[r.y:r.y + r.h, r.x:r.x + r.w, ch] = np.maximum( |
| layers[r.y:r.y + r.h, r.x:r.x + r.w, ch], intensity) |
| else: |
| |
| layers[r.y:r.y + r.h, r.x:r.x + r.w, ch] = np.maximum( |
| layers[r.y:r.y + r.h, r.x:r.x + r.w, ch], chg) |
| |
| |
| |
| if smooth_kernel > 0: |
| ksize = smooth_kernel if smooth_kernel % 2 == 1 else smooth_kernel + 1 |
| for ch in range(4): |
| if layers[:, :, ch].max() > 0: |
| layers[:, :, ch] = cv2.GaussianBlur(layers[:, :, ch], (ksize, ksize), sigmaX=3.0) |
| for ch in range(2): |
| if layers[:, :, ch].max() > 0: |
| layers[:, :, ch] = np.power(layers[:, :, ch], 0.6) |
| return layers |
|
|
|
|
| |
| |
| |
|
|
| class Visualiser: |
| COLOURS: Dict[str, Tuple[int, int, int]] = { |
| "text_block": (30, 144, 255), |
| "figure": (255, 165, 0), |
| "table": (50, 205, 50), |
| "unknown": (180, 180, 180), |
| "deleted": (220, 50, 50), |
| "added": (50, 220, 80), |
| "changed": (255, 200, 0), |
| "unchanged": (80, 220, 80), |
| } |
|
|
| @staticmethod |
| def draw_alignment_check( |
| img_old_aligned: np.ndarray, |
| img_new: np.ndarray, |
| ) -> np.ndarray: |
| """ |
| Red-cyan overlay β Alignment Check tab. |
| |
| How to read it |
| -------------- |
| OLD aligned β Red channel |
| NEW doc β Green + Blue channels (= Cyan) |
| |
| β’ Lines present at the SAME pixel in both β gray (RβGβB) |
| β’ Lines in OLD that drifted β RED fringe |
| β’ Lines in NEW that drifted β CYAN fringe |
| β’ White background on both β white |
| |
| If the overlay looks mostly gray/white with no fringes, alignment is |
| good. Red/cyan colour fringes indicate residual misalignment. |
| """ |
| g_old = cv2.cvtColor(img_old_aligned, cv2.COLOR_RGB2GRAY) |
| g_new = cv2.cvtColor(img_new, cv2.COLOR_RGB2GRAY) |
| |
| return np.stack([g_old, g_new, g_new], axis=2) |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| _UNMATCHED_PIXEL_THR: float = 12.0 |
|
|
|
|
| def _region_mean_diff( |
| r: Region, |
| img_a: np.ndarray, |
| candidates: List[Region], |
| img_b: np.ndarray, |
| thumb: int = 64, |
| ) -> float: |
| """ |
| Return the *minimum* mean-abs-diff (grayscale, 0β255) between region `r` |
| in `img_a` and the spatially closest candidate region in `img_b`. |
| |
| "Spatially closest" = smallest Euclidean centre-to-centre distance. |
| If there are no candidates, return 255.0 (maximally different). |
| """ |
| if not candidates: |
| return 255.0 |
| pa = img_a[r.y: r.y + r.h, r.x: r.x + r.w] |
| if pa.size == 0: |
| return 255.0 |
| ga = cv2.resize(cv2.cvtColor(pa, cv2.COLOR_RGB2GRAY), (thumb, thumb)).astype(np.float32) |
|
|
| cx_r, cy_r = r.center |
| |
| candidates_sorted = sorted( |
| candidates, |
| key=lambda c: (c.center[0] - cx_r) ** 2 + (c.center[1] - cy_r) ** 2, |
| )[:3] |
|
|
| best = 255.0 |
| for cand in candidates_sorted: |
| pb = img_b[cand.y: cand.y + cand.h, cand.x: cand.x + cand.w] |
| if pb.size == 0: |
| continue |
| gb = cv2.resize( |
| cv2.cvtColor(pb, cv2.COLOR_RGB2GRAY), (thumb, thumb) |
| ).astype(np.float32) |
| diff = float(np.mean(np.abs(ga - gb))) |
| if diff < best: |
| best = diff |
| return best |
|
|
|
|
| def _is_truly_changed( |
| r: Region, |
| candidates: List[Region], |
| img_a: np.ndarray, |
| img_b: np.ndarray, |
| ) -> bool: |
| """ |
| Return True only when region `r` (from img_a) is visually *different* |
| from its nearest spatial counterpart in candidates (from img_b). |
| |
| Used to distinguish "matcher failed to pair identical regions" from |
| "content was genuinely added or deleted." |
| """ |
| return _region_mean_diff(r, img_a, candidates, img_b) >= _UNMATCHED_PIXEL_THR |
|
|
|
|
| |
| |
| |
|
|
| class CoarseToFinePipeline: |
| def __init__( |
| self, |
| align: bool = True, |
| device: Optional[torch.device] = None, |
| region_extractor: Optional[LayoutRegionExtractor] = None, |
| matcher=None, |
| comparator: Optional[SiamesePatchComparator] = None, |
| min_similarity: float = 0.50, |
| ): |
| |
| if device is None: |
| if torch.cuda.is_available(): |
| device = torch.device("cuda") |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): |
| device = torch.device("mps") |
| else: |
| device = torch.device("cpu") |
| self._device = device |
|
|
| self.aligner = GlobalAligner() if align else None |
| self.extractor = region_extractor or LayoutRegionExtractor() |
|
|
| if matcher is not None: |
| |
| self.matcher = matcher |
| self.comparator = comparator or SiamesePatchComparator(device=device) |
| else: |
| |
| |
| |
| |
| shared_encoder = _SiameseEncoder().to(device).eval() |
| logger.info("Pipeline: shared ResNet50 encoder on %s", device) |
|
|
| self.matcher = SemanticRetrievalMatcher( |
| encoder = shared_encoder, |
| device = device, |
| min_similarity = min_similarity, |
| ) |
| self.comparator = comparator or SiamesePatchComparator( |
| device = device, |
| encoder = shared_encoder, |
| ) |
|
|
| def compare(self, img_old: np.ndarray, img_new: np.ndarray, verbose: bool = True) -> ComparisonResult: |
| timings: Dict[str, float] = {} |
| t = time.time() |
| M = None |
| if self.aligner is not None: |
| img_old_aligned, M = self.aligner.align(img_old, img_new) |
| else: |
| img_old_aligned = img_old.copy() |
| timings["alignment"] = time.time() - t |
|
|
| t = time.time() |
| regions_old = self.extractor.extract(img_old_aligned) |
| regions_new = self.extractor.extract(img_new) |
| timings["extraction"] = time.time() - t |
|
|
| t = time.time() |
| matched, unmatched_old, unmatched_new = self.matcher.match( |
| regions_old, regions_new, img_old_aligned, img_new) |
| timings["matching"] = time.time() - t |
|
|
| t = time.time() |
| for i, pair in enumerate(matched): |
| matched[i] = self.comparator.compare_pair(pair, img_old_aligned, img_new) |
| timings["siamese"] = time.time() - t |
|
|
| if verbose: |
| logger.info("Timings β align: %.2fs | extract: %.2fs | match: %.2fs | siamese: %.2fs", |
| timings["alignment"], timings["extraction"], |
| timings["matching"], timings["siamese"]) |
|
|
| h, w = img_new.shape[:2] |
| |
| |
| |
| |
| |
| heatmap = HeatmapGenerator.generate( |
| (h, w), matched, unmatched_old, unmatched_new, |
| M_inv=M, change_threshold=0.05, |
| ) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| truly_deleted = [ |
| r for r in unmatched_old |
| if _is_truly_changed(r, unmatched_new, img_old_aligned, img_new) |
| ] |
| truly_added = [ |
| r for r in unmatched_new |
| if _is_truly_changed(r, unmatched_old, img_new, img_old_aligned) |
| ] |
|
|
| page_area = max(h * w, 1) |
| changed_area = sum(p.region_new.area for p in matched if p.total_change > 0.05) |
| deleted_area = sum(r.area for r in truly_deleted) |
| added_area = sum(r.area for r in truly_added) |
| total_pct = min(100.0 * (changed_area + added_area + deleted_area) / page_area, 100.0) |
|
|
| return ComparisonResult( |
| matched_pairs=matched, |
| unmatched_old=unmatched_old, |
| unmatched_new=unmatched_new, |
| global_transform=M, |
| total_change_pct=total_pct, |
| heatmap=heatmap, |
| img_old_aligned=img_old_aligned, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _pick_device() -> torch.device: |
| if torch.cuda.is_available(): |
| return torch.device("cuda") |
| if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): |
| return torch.device("mps") |
| return torch.device("cpu") |
|
|
|
|
| def _page_to_rgb(doc: fitz.Document, idx: int, dpi: int) -> np.ndarray: |
| pix = doc[idx].get_pixmap(dpi=dpi) |
| return np.frombuffer(pix.samples, np.uint8).reshape(pix.height, pix.width, 3) |
|
|
|
|
| def _build_summary( |
| page_results: list, |
| aligned: bool, |
| skip_old_p1: bool = False, |
| skip_new_p1: bool = False, |
| ) -> str: |
| total_changes = [pr["total_change_pct"] for pr in page_results] |
|
|
| lines = [ |
| "ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ", |
| "β POWERGRID DOCUMENT AUDIT β CHANGE REPORT β", |
| "ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ", |
| "", |
| f" Total Pages Analysed : {len(page_results)}", |
| f" Overall Avg Change : {np.mean(total_changes):.2f}%", |
| "", |
| "ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ", |
| " PAGE-WISE CHANGE SUMMARY", |
| "ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ", |
| ] |
|
|
| for pr in page_results: |
| pct = pr["total_change_pct"] |
| status = "β
MINIMAL" if pct < 5 else "β οΈ MODERATE" if pct < 20 else "π΄ SIGNIFICANT" |
| lines.append(f" Page {pr['page']:>3} β {pct:>5.1f}% β {status}") |
|
|
| significant = [pr["page"] for pr in page_results if pr["total_change_pct"] > 20] |
| if significant: |
| lines += [ |
| "", |
| f" β οΈ Pages with significant changes (>20%): {significant}", |
| ] |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _build_output_pdf(page_results: list, output_path: str, |
| process_dpi: int = 400) -> str: |
| """ |
| Build the output PDF at full pixel depth. |
| |
| PyMuPDF page dimensions are in points (1 pt = 1/72 inch). |
| The overlay images are rendered at process_dpi. To preserve every |
| pixel without resampling, set the page size so that 1 image pixel = 1 pt |
| scaled by (72 / process_dpi): |
| page_width_pts = img_width_px * 72 / process_dpi |
| page_height_pts = img_height_px * 72 / process_dpi |
| insert_image() maps the image 1:1 onto the page rect, so no |
| downsampling or upsampling occurs β full pixel depth is preserved. |
| """ |
| doc_out = fitz.open() |
| for pr in page_results: |
| img = pr["align_check"].convert("RGB") |
| px_w, px_h = img.size |
| |
| pt_w = px_w * 72.0 / process_dpi |
| pt_h = px_h * 72.0 / process_dpi |
| page_out = doc_out.new_page(width=pt_w, height=pt_h) |
| buf = io.BytesIO() |
| img.save(buf, format="PNG", optimize=True) |
| buf.seek(0) |
| page_out.insert_image(page_out.rect, stream=buf.read()) |
| doc_out.save(output_path, deflate=True, garbage=4, clean=True) |
| doc_out.close() |
| return output_path |
|
|
|
|
| |
| |
| |
|
|
| |
| _REGION_TRANSFORM = transforms.Compose([ |
| transforms.Resize((224, 224)), |
| transforms.ToTensor(), |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], |
| std=[0.229, 0.224, 0.225]), |
| ]) |
|
|
|
|
| def _embed_patch(patch_rgb: np.ndarray, |
| encoder: "_SiameseEncoder", |
| device: torch.device) -> torch.Tensor: |
| """Encode a single RGB numpy patch β (128,) L2-normalised embedding.""" |
| t = _REGION_TRANSFORM(Image.fromarray(patch_rgb)).unsqueeze(0).to(device) |
| with torch.no_grad(): |
| emb, _ = encoder.encode(t) |
| return emb[0] |
|
|
|
|
| def _find_matching_region_in_old( |
| new_crop: np.ndarray, |
| img_old_full: np.ndarray, |
| encoder: "_SiameseEncoder", |
| device: torch.device, |
| ) -> Tuple[int, int, int, int]: |
| """ |
| Locate where new_crop (user-selected patch from NEW page) sits inside |
| img_old_full (the complete OLD page). |
| |
| Method β Semantic sliding-window search |
| ---------------------------------------- |
| 1. Encode new_crop with the shared ResNet50 encoder β 128-d embedding. |
| 2. Slide a window across img_old_full at multiple scales (Β±30 % of the |
| crop size, preserving aspect ratio). Step = 50 % of window size so |
| adjacent windows overlap and the true location is never missed. |
| 3. Encode every window patch and compute cosine similarity with the |
| query embedding. Pick the window with the highest similarity. |
| 4. Clamp the winning box to page bounds and return it. |
| |
| Why semantic (not pixel-level): |
| β’ ResNet50 encodes *what* is in a region (shapes, structure, symbols), |
| not pixel values. Two revisions of the same table/panel/diagram will |
| have near-identical embeddings even if text values changed slightly. |
| β’ Scale-invariant: the multi-scale sweep handles content that was |
| enlarged or shrunk between revisions. |
| β’ Position-invariant: the full-page sweep finds content anywhere on the |
| OLD page regardless of how far it moved. |
| |
| Returns (x1, y1, x2, y2) in img_old_full pixel space. |
| """ |
| crop_h, crop_w = new_crop.shape[:2] |
| old_h, old_w = img_old_full.shape[:2] |
|
|
| def _clamp_box(bx: int, by: int, bw: int, bh: int |
| ) -> Tuple[int, int, int, int]: |
| bx = max(0, min(bx, old_w - 1)) |
| by = max(0, min(by, old_h - 1)) |
| bw = max(1, min(bw, old_w - bx)) |
| bh = max(1, min(bh, old_h - by)) |
| return bx, by, bx + bw, by + bh |
|
|
| |
| q_emb = _embed_patch(new_crop, encoder, device) |
|
|
| |
| |
| |
| scales = (0.70, 0.85, 1.00, 1.15, 1.30) |
| aspect = crop_h / max(crop_w, 1) |
|
|
| candidates: List[Tuple[int, int, int, int]] = [] |
|
|
| for sc in scales: |
| win_w = max(32, int(crop_w * sc)) |
| win_h = max(32, int(crop_h * sc)) |
| if win_w > old_w or win_h > old_h: |
| continue |
| step_x = max(1, win_w // 2) |
| step_y = max(1, win_h // 2) |
| for y in range(0, old_h - win_h + 1, step_y): |
| for x in range(0, old_w - win_w + 1, step_x): |
| candidates.append((x, y, win_w, win_h)) |
|
|
| logger.info( |
| "_find_matching_region_in_old: %d candidate windows across %d scales", |
| len(candidates), len(scales), |
| ) |
|
|
| if not candidates: |
| |
| logger.warning("_find_matching_region_in_old: crop >= page; returning full page box.") |
| return _clamp_box(0, 0, old_w, old_h) |
|
|
| |
| |
| BATCH = 64 |
| best_sim: float = -1.0 |
| best_box: Tuple[int, int, int, int] = candidates[0] |
|
|
| for start in range(0, len(candidates), BATCH): |
| batch_cands = candidates[start: start + BATCH] |
| patches = [] |
| for (cx, cy, cw, ch) in batch_cands: |
| patch = img_old_full[cy: cy + ch, cx: cx + cw] |
| patches.append(patch) |
|
|
| tensors = [ |
| _REGION_TRANSFORM(Image.fromarray(p)) for p in patches |
| ] |
| batch_t = torch.stack(tensors).to(device) |
| with torch.no_grad(): |
| embs, _ = encoder.encode(batch_t) |
|
|
| |
| sims = (embs @ q_emb).cpu().numpy() |
|
|
| idx = int(sims.argmax()) |
| if sims[idx] > best_sim: |
| best_sim = float(sims[idx]) |
| best_box = batch_cands[idx] |
|
|
| bx, by, bw, bh = best_box |
| x1o, y1o, x2o, y2o = _clamp_box(bx, by, bw, bh) |
|
|
| logger.info( |
| "_find_matching_region_in_old: best cosine=%.4f OLD box (%d,%d)β(%d,%d)", |
| best_sim, x1o, y1o, x2o, y2o, |
| ) |
| return (x1o, y1o, x2o, y2o) |
|
|
|
|
| |
| |
| |
|
|
| def run_comparison( |
| pdf_old_file, |
| pdf_new_file, |
| skip_old_p1: bool, |
| skip_new_p1: bool, |
| enable_align: bool, |
| compare_mode: str, |
| page_old_input: int, |
| page_new_input: int, |
| page_compare_mode: str = "Full Page", |
| region_coords=None, |
| display_dpi: int = 72, |
| progress=gr.Progress(), |
| ): |
| dpi = 400 |
|
|
| if pdf_old_file is None or pdf_new_file is None: |
| raise gr.Error("Please upload both Previous Revision and New Document PDF files.") |
|
|
| device = _pick_device() |
|
|
| pipeline = CoarseToFinePipeline( |
| align = enable_align, |
| device = device, |
| min_similarity = 0.50, |
| ) |
|
|
| progress(0, desc="Opening PDF files β¦") |
| doc_old = fitz.open(pdf_old_file.name) |
| doc_new = fitz.open(pdf_new_file.name) |
|
|
| |
| if compare_mode == "Specific Pages": |
| |
| old_idx_req = int(page_old_input or 1) - 1 |
| new_idx_req = int(page_new_input or 1) - 1 |
| |
| old_idx_req = max(0, min(old_idx_req, len(doc_old) - 1)) |
| new_idx_req = max(0, min(new_idx_req, len(doc_new) - 1)) |
| page_pairs = [(old_idx_req, new_idx_req)] |
| else: |
| |
| old_start = 1 if skip_old_p1 else 0 |
| new_start = 1 if skip_new_p1 else 0 |
| old_pages = len(doc_old) - old_start |
| new_pages = len(doc_new) - new_start |
| num_pages = min(old_pages, new_pages) |
|
|
| if skip_old_p1: |
| gr.Info("Skipping cover page of Previous Revision.") |
| if skip_new_p1: |
| gr.Info("Skipping cover page of New Document.") |
| if old_pages != new_pages: |
| gr.Warning( |
| f"Page count mismatch: Previous Revision={old_pages}, New Document={new_pages}. " |
| f"Processing {num_pages} pages." |
| ) |
| page_pairs = [(pg + old_start, pg + new_start) for pg in range(num_pages)] |
|
|
| num_pairs = len(page_pairs) |
| page_results = [] |
|
|
| for i, (old_idx, new_idx) in enumerate(page_pairs): |
| progress(i / num_pairs, desc=f"Processing page {i + 1} / {num_pairs} β¦") |
| img_old = _page_to_rgb(doc_old, old_idx, dpi) |
| img_new = _page_to_rgb(doc_new, new_idx, dpi) |
|
|
| |
| |
| |
| if img_old.shape != img_new.shape: |
| img_old = cv2.resize(img_old, (img_new.shape[1], img_new.shape[0])) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if (compare_mode == "Specific Pages" |
| and page_compare_mode == "Specific Region" |
| and region_coords): |
| rx = region_coords.get("x", 0) |
| ry = region_coords.get("y", 0) |
| rw = region_coords.get("width", img_new.shape[1]) |
| rh = region_coords.get("height", img_new.shape[0]) |
| sf = dpi / float(display_dpi or 72) |
| x1 = max(0, int(rx * sf)) |
| y1 = max(0, int(ry * sf)) |
| x2 = min(img_new.shape[1], int((rx + rw) * sf)) |
| y2 = min(img_new.shape[0], int((ry + rh) * sf)) |
|
|
| logger.info( |
| "Specific Region: display_dpi=%d sf=%.3f " |
| "preview-box (%d,%d,%d,%d) β process-px (%d,%d)β(%d,%d)", |
| display_dpi, sf, rx, ry, rw, rh, x1, y1, x2, y2, |
| ) |
|
|
| if x2 > x1 and y2 > y1: |
| |
| img_new_crop = img_new[y1:y2, x1:x2] |
|
|
| |
| |
| |
| |
| ox1, oy1, ox2, oy2 = _find_matching_region_in_old( |
| new_crop = img_new_crop, |
| img_old_full = img_old, |
| encoder = pipeline.matcher.encoder, |
| device = device, |
| ) |
| logger.info( |
| "Specific Region: NEW (%d,%d)β(%d,%d) β OLD (%d,%d)β(%d,%d)", |
| x1, y1, x2, y2, ox1, oy1, ox2, oy2, |
| ) |
|
|
| |
| |
| img_old_raw = img_old[oy1:oy2, ox1:ox2] |
| nh, nw = img_new_crop.shape[:2] |
| if img_old_raw.shape[:2] != (nh, nw): |
| img_old_crop = cv2.resize( |
| img_old_raw, (nw, nh), interpolation=cv2.INTER_LINEAR, |
| ) |
| else: |
| img_old_crop = img_old_raw |
|
|
| |
| img_old = img_old_crop |
| img_new = img_new_crop |
|
|
| result = pipeline.compare(img_old, img_new) |
|
|
| old_aligned_for_check = ( |
| result.img_old_aligned if result.img_old_aligned is not None |
| else img_old |
| ) |
| align_check = Visualiser.draw_alignment_check(old_aligned_for_check, img_new) |
|
|
| page_results.append({ |
| "page": i + 1, |
| "result": result, |
| "align_check": Image.fromarray(align_check), |
| "original": Image.fromarray(img_old), |
| "revised": Image.fromarray(img_new), |
| "total_change_pct": result.total_change_pct, |
| }) |
|
|
| doc_old.close() |
| doc_new.close() |
|
|
| progress(0.95, desc="Generating report PDF β¦") |
| output_pdf = _build_output_pdf(page_results, "ctf_output.pdf", process_dpi=dpi) |
| summary = _build_summary(page_results, enable_align, skip_old_p1, skip_new_p1) |
|
|
| progress(1.0, desc="Done!") |
| return page_results, summary, output_pdf, 1, gr.update(maximum=num_pairs, value=1) |
|
|
|
|
| def get_page_view(page_num, pages_data, view_mode, rotation: int = 0, |
| nudge_x: int = 0, nudge_y: int = 0, nudge_scale: float = 1.0): |
| if not pages_data: |
| return None |
| idx = int(page_num) - 1 |
| idx = max(0, min(idx, len(pages_data) - 1)) |
| pr = pages_data[idx] |
| key_map = { |
| "Auto-Overlay": "align_check", |
| "Previous Revision": "original", |
| "New Document": "revised", |
| } |
| img = pr.get(key_map.get(view_mode, "align_check")) |
| if img is None: |
| return None |
|
|
| |
| ns = float(nudge_scale) if nudge_scale else 1.0 |
| if view_mode == "Auto-Overlay" and (nudge_x != 0 or nudge_y != 0 or abs(ns - 1.0) > 1e-4): |
| img = _apply_nudge_overlay(pr, nudge_x, nudge_y, ns) |
|
|
| if img is not None and rotation % 360 != 0: |
| img = img.rotate(rotation, expand=True) |
| return img |
|
|
|
|
| def _apply_nudge_overlay(pr: dict, dx: int, dy: int, scale: float = 1.0) -> Image.Image: |
| """ |
| Re-render the Auto-Overlay with the NEW (red) layer shifted by (dx, dy) pixels |
| and scaled by `scale` around the image centre. |
| |
| Cyan channel stays fixed (Previous Revision aligned). |
| Red channel = New Doc with nudge translate + scale applied. |
| """ |
| if pr.get("align_check") is None: |
| return None |
|
|
| |
| align_check_arr = np.array(pr["align_check"].convert("RGB")) |
| g_old_aligned = align_check_arr[:, :, 0] |
| g_new_orig = align_check_arr[:, :, 1] |
|
|
| h, w = g_old_aligned.shape |
| cx, cy = w / 2.0, h / 2.0 |
|
|
| |
| |
| scale = float(scale) if scale and scale > 0 else 1.0 |
| |
| M = np.float32([ |
| [scale, 0, dx + cx * (1 - scale)], |
| [0, scale, dy + cy * (1 - scale)], |
| ]) |
|
|
| g_new_transformed = cv2.warpAffine( |
| g_new_orig, M, (w, h), |
| flags=cv2.INTER_LINEAR, |
| borderMode=cv2.BORDER_CONSTANT, |
| borderValue=255, |
| ) |
|
|
| |
| overlay = np.stack([g_old_aligned, g_new_transformed, g_new_transformed], axis=2) |
| return Image.fromarray(overlay.astype(np.uint8)) |
|
|
|
|
| |
| |
| |
|
|
| with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "styles.css"), |
| encoding="utf-8") as _css_f: |
| _CSS = _css_f.read() |
|
|
| _THEME = gr.themes.Base( |
| primary_hue=gr.themes.colors.blue, |
| neutral_hue=gr.themes.colors.gray, |
| font=[gr.themes.GoogleFont("Inter"), "sans-serif"], |
| ) |
|
|
| |
| with gr.Blocks(title="POWERGRID Document Auditor") as demo: |
|
|
| |
| _logo_tag = ( |
| f'<img src="{_LOGO_URI}" alt="POWERGRID Logo" />' |
| if _LOGO_URI else |
| '<span style="font-size:1.4rem;font-weight:900;color:#003087;letter-spacing:-1px;">PG</span>' |
| ) |
| gr.HTML(f""" |
| <div id="app-header"> |
| <div id="app-header-inner"> |
| <div id="app-header-logo">{_logo_tag}</div> |
| <div id="app-header-text"> |
| <h1>POWERGRID Document Auditor</h1> |
| <p>Power Grid Corporation of India Limited — AI-Powered Document Comparison</p> |
| </div> |
| </div> |
| </div> |
| """) |
|
|
| |
|
|
| |
| pages_state = gr.State(value=None) |
| rotation_state = gr.State(value=0) |
| nudge_x_state = gr.State(value=0) |
| nudge_y_state = gr.State(value=0) |
| nudge_scale_state = gr.State(value=1.0) |
| region_coords_state = gr.State(value=None) |
| display_dpi_state = gr.State(value=72) |
|
|
| |
| with gr.Row(equal_height=False): |
|
|
| |
| |
| |
| with gr.Column(scale=1, min_width=290, elem_id="left-panel"): |
|
|
| gr.HTML('<div class="section-label">Documents</div>') |
| pdf_old = gr.File(label="Previous Revision PDF", file_types=[".pdf"]) |
| skip_old_p1 = gr.Checkbox( |
| value=False, |
| label="Skip cover page of Previous Revision", |
| interactive=False, |
| elem_classes=["skip-cb"], |
| ) |
|
|
| gr.HTML('<div class="section-divider"></div>') |
| pdf_new = gr.File(label="Revised (New) PDF", file_types=[".pdf"]) |
| skip_new_p1 = gr.Checkbox( |
| value=False, |
| label="Skip cover page of New Revision", |
| interactive=False, |
| elem_classes=["skip-cb"], |
| ) |
|
|
| gr.HTML('<div class="section-divider"></div>') |
| gr.HTML('<div class="section-label">Options</div>') |
| enable_align = gr.Checkbox( |
| value=True, |
| label="Auto-align pages before comparing", |
| info="Enable if documents were scanned or printed at different positions or scales.", |
| ) |
|
|
| gr.HTML('<div class="section-divider"></div>') |
| gr.HTML('<div class="section-label">Compare Mode</div>') |
| compare_mode = gr.Radio( |
| choices=["Full Document", "Specific Pages"], |
| value="Full Document", |
| label="Compare Mode", |
| show_label=False, |
| elem_id="compare-mode-radio", |
| ) |
| with gr.Row(visible=False, elem_id="specific-pages-row") as specific_pages_row: |
| page_old_input = gr.Number( |
| value=1, minimum=1, step=1, precision=0, |
| label="Prev. Revision Page", |
| elem_id="page-old-input", |
| ) |
| page_new_input = gr.Number( |
| value=1, minimum=1, step=1, precision=0, |
| label="New Document Page", |
| elem_id="page-new-input", |
| ) |
|
|
| |
| with gr.Column(visible=False, elem_id="region-col") as region_col: |
| page_compare_mode = gr.Radio( |
| choices=["Full Page", "Specific Region"], |
| value="Full Page", |
| label="Page Comparison", |
| show_label=True, |
| elem_id="page-compare-mode-radio", |
| ) |
|
|
| |
| with gr.Column(visible=False, elem_id="region-preview-col") as region_preview_col: |
| region_readout = gr.HTML( |
| value='<div id="region-readout">No region selected β full page will be used</div>', |
| elem_id="region-readout", |
| ) |
| |
| region_page_img = gr.Image( |
| value=None, |
| label=None, |
| show_label=False, |
| type="pil", |
| interactive=False, |
| elem_id="region-page-img", |
| height=380, |
| ) |
| |
| region_coords_txt = gr.Textbox( |
| value="", |
| label=None, |
| show_label=False, |
| elem_id="region-coords-txt", |
| elem_classes=["region-coords-hidden"], |
| ) |
| clear_region_btn = gr.Button( |
| "β Clear Region", |
| size="sm", |
| elem_id="clear-region-btn", |
| ) |
|
|
| gr.HTML('<div class="section-divider"></div>') |
| run_btn = gr.Button("Run Audit", variant="primary", size="lg", elem_id="run-btn") |
|
|
| gr.HTML('<div class="section-divider"></div>') |
| gr.HTML('<div class="section-label">Fine-Tune Alignment</div>') |
|
|
| |
| |
| with gr.Row(equal_height=True, elem_id="nudge-row-top"): |
| gr.HTML('<div style="flex:1;min-width:0"></div>') |
| nudge_up_btn = gr.Button("β²", elem_id="nudge-up", min_width=44, scale=0) |
| gr.HTML('<div style="flex:1;min-width:0"></div>') |
|
|
| |
| with gr.Row(equal_height=True, elem_id="nudge-row-bot"): |
| nudge_left_btn = gr.Button("β", elem_id="nudge-left", min_width=44, scale=0) |
| nudge_down_btn = gr.Button("βΌ", elem_id="nudge-down", min_width=44, scale=0) |
| nudge_right_btn = gr.Button("βΆ", elem_id="nudge-right", min_width=44, scale=0) |
|
|
| gr.HTML('<p class="nudge-tip">Tip: Run Audit resets alignment</p>') |
|
|
| nudge_step = gr.Number( |
| value=1, minimum=1, maximum=100, step=1, |
| label="Step Size (px)", precision=0, |
| elem_id="nudge-step", |
| ) |
| nudge_scale = gr.Number( |
| value=1.0, minimum=0.10, maximum=10.0, step=0.005, |
| label="Scale β Red Layer", precision=3, |
| elem_id="nudge-scale", |
| ) |
| nudge_readout = gr.HTML( |
| value='<div id="nudge-readout-wrap">x = +0 px<br>y = +0 px<br>scale = 1.000</div>', |
| elem_id="nudge-readout", |
| ) |
|
|
| |
| |
| |
| with gr.Column(scale=3, elem_id="right-panel"): |
|
|
| |
| with gr.Row(elem_id="toolbar-row"): |
| view_mode = gr.Radio( |
| choices=["Auto-Overlay", "Previous Revision", "New Document"], |
| value="Auto-Overlay", |
| label="View", |
| show_label=False, |
| scale=1, |
| min_width=320, |
| elem_id="view-mode-radio", |
| ) |
| gr.HTML('<div class="toolbar-sep"></div>') |
| rot_left_btn = gr.Button("βΊ", scale=0, elem_id="rot-left", min_width=38) |
| rot_right_btn = gr.Button("β»", scale=0, elem_id="rot-right", min_width=38) |
|
|
| |
| page_slider = gr.Slider( |
| minimum=1, maximum=1, value=1, step=1, |
| label="Page", |
| visible=False, |
| elem_id="page-slider", |
| ) |
|
|
| |
| page_num_state = gr.State(value=1) |
| total_pages_state = gr.State(value=1) |
|
|
| result_image = gr.Image( |
| label="", |
| type="pil", |
| height=720, |
| interactive=False, |
| elem_id="result-image", |
| ) |
|
|
| gr.HTML(""" |
| <div id="legend-bar" style="display:flex; gap:18px; flex-wrap:wrap; align-items:center;"> |
| <span style="font-size:0.60rem;font-weight:700;color:#8BA0BB;text-transform:uppercase; |
| letter-spacing:0.11em;white-space:nowrap;flex-shrink:0;">Overlay Legend</span> |
| <span style="display:flex;align-items:center;gap:6px;"> |
| <span style="width:12px;height:12px;border-radius:3px;background:#7A7A7A; |
| flex-shrink:0;display:inline-block;box-shadow:0 1px 2px rgba(0,0,0,0.15);"></span> |
| <span style="font-size:0.75rem;color:#4A6585;white-space:nowrap;"> |
| <b style="color:#0F1C2E;font-weight:600;">Gray</b> — Unchanged</span> |
| </span> |
| <span style="display:flex;align-items:center;gap:6px;"> |
| <span style="width:12px;height:12px;border-radius:3px;background:#00BBBB; |
| flex-shrink:0;display:inline-block;box-shadow:0 1px 2px rgba(0,0,0,0.15);"></span> |
| <span style="font-size:0.75rem;color:#4A6585;white-space:nowrap;"> |
| <b style="color:#007070;font-weight:600;">Cyan</b> — Previous Revision</span> |
| </span> |
| <span style="display:flex;align-items:center;gap:6px;"> |
| <span style="width:12px;height:12px;border-radius:3px;background:#EE3333; |
| flex-shrink:0;display:inline-block;box-shadow:0 1px 2px rgba(0,0,0,0.15);"></span> |
| <span style="font-size:0.75rem;color:#4A6585;white-space:nowrap;"> |
| <b style="color:#BB0000;font-weight:600;">Red</b> — New Document</span> |
| </span> |
| </div> |
| """) |
|
|
| with gr.Row(): |
| pdf_output = gr.File(label="β¬οΈ Download Result PDF") |
|
|
| |
| |
| |
|
|
| def on_pdf_upload(pdf_file): |
| """Disable skip-cover-page checkbox when uploaded PDF has only 1 page.""" |
| if pdf_file is None: |
| return gr.update(interactive=False, value=False) |
| try: |
| doc = fitz.open(pdf_file.name) |
| n = len(doc) |
| doc.close() |
| if n <= 1: |
| return gr.update(interactive=False, value=False) |
| else: |
| return gr.update(interactive=True) |
| except Exception: |
| return gr.update(interactive=True) |
|
|
| def _readout_html(nx: int, ny: int, ns: float) -> str: |
| return ( |
| f'<div id="nudge-readout-wrap">' |
| f'x = {nx:+d} px<br>' |
| f'y = {ny:+d} px<br>' |
| f'scale = {ns:.3f}' |
| f'</div>' |
| ) |
|
|
| def on_compare_mode_change(mode): |
| """Show/hide the specific-page number inputs and region sub-options.""" |
| show = (mode == "Specific Pages") |
| return gr.update(visible=show), gr.update(visible=show) |
|
|
| def on_load_preview(pdf_new_f, pg_new): |
| """Render the New Doc page at 72 DPI and return as PIL image for inline display.""" |
| if pdf_new_f is None: |
| raise gr.Error("Please upload the New Document PDF first.") |
| preview_dpi = 72 |
| doc = fitz.open(pdf_new_f.name) |
| idx = max(0, int(pg_new or 1) - 1) |
| idx = min(idx, len(doc) - 1) |
| arr = _page_to_rgb(doc, idx, preview_dpi) |
| doc.close() |
| pil_img = Image.fromarray(arr) |
| readout = '<div id="region-readout">Draw a box on the image below to select a region</div>' |
| |
| return pil_img, "", None, preview_dpi, readout |
|
|
| def on_region_coords_change(coords_txt): |
| """Parse 'x,y,w,h' string written by JS canvas into region_coords_state dict.""" |
| if not coords_txt or coords_txt.strip() == "": |
| return None, '<div id="region-readout">No region selected β full page will be used</div>' |
| try: |
| parts = [float(v) for v in coords_txt.strip().split(",")] |
| x, y, w, h = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3]) |
| if w < 5 or h < 5: |
| return None, '<div id="region-readout">Region too small β drag a larger area</div>' |
| coords = {"x": x, "y": y, "width": w, "height": h} |
| readout = ( |
| f'<div id="region-readout">' |
| f'β
Region: ({x}, {y}) β ({x+w}, {y+h})' |
| f' | {w}×{h} px' |
| f'</div>' |
| ) |
| return coords, readout |
| except Exception: |
| return None, '<div id="region-readout">Invalid region β drag again</div>' |
|
|
| def on_clear_region(): |
| """Reset region β clear coords textbox and state (image stays, JS clears the overlay).""" |
| return "", None, '<div id="region-readout">Draw a box on the image below to select a region</div>' |
|
|
| def on_run(pdf_old_f, pdf_new_f, skip_old, skip_new, align, |
| cmp_mode, pg_old, pg_new, |
| pg_cmp_mode, region_coords, display_dpi, |
| progress=gr.Progress()): |
| page_results, _summary, pdf_path, _, _ = run_comparison( |
| pdf_old_f, pdf_new_f, skip_old, skip_new, align, |
| cmp_mode, pg_old, pg_new, |
| pg_cmp_mode, region_coords, display_dpi, |
| progress |
| ) |
| n_pages = len(page_results) |
| first_img = page_results[0]["align_check"] if page_results else None |
| return ( |
| page_results, |
| 0, |
| 0, |
| 0, |
| 1.0, |
| 1, |
| n_pages, |
| pdf_path, |
| first_img, |
| _readout_html(0, 0, 1.0), |
| gr.update(visible=n_pages > 1, minimum=1, maximum=n_pages, value=1), |
| ) |
|
|
| def on_view_change(view, pg, total, pages_data, rot, nx, ny, ns): |
| return get_page_view(pg, pages_data, view, 0, nx, ny, ns), 0 |
|
|
| def on_rot_left(pg, total, pages_data, view, rot, nx, ny, ns): |
| new_rot = (rot + 90) % 360 |
| return get_page_view(pg, pages_data, view, new_rot, nx, ny, ns), new_rot |
|
|
| def on_rot_right(pg, total, pages_data, view, rot, nx, ny, ns): |
| new_rot = (rot - 90) % 360 |
| return get_page_view(pg, pages_data, view, new_rot, nx, ny, ns), new_rot |
|
|
| def on_pg_slide(pg, total, pages_data, view, rot, nx, ny, ns): |
| pg = int(pg or 1) |
| img = get_page_view(pg, pages_data, view, rot, nx, ny, ns) |
| return img, pg |
|
|
| |
| def on_nudge(direction: str, pg, total, pages_data, view, rot, nx, ny, ns, step): |
| step = int(step or 1) |
| if direction == "left": nx -= step |
| elif direction == "right": nx += step |
| elif direction == "up": ny -= step |
| elif direction == "down": ny += step |
| img = get_page_view(pg, pages_data, view, rot, nx, ny, ns) |
| return img, nx, ny, ns, _readout_html(nx, ny, ns) |
|
|
| def on_scale_change(sc, pg, total, pages_data, view, rot, nx, ny): |
| ns = float(sc) if sc else 1.0 |
| img = get_page_view(pg, pages_data, view, rot, nx, ny, ns) |
| return img, ns, _readout_html(nx, ny, ns) |
|
|
| pdf_old.change(fn=on_pdf_upload, inputs=[pdf_old], outputs=[skip_old_p1]) |
| pdf_new.change(fn=on_pdf_upload, inputs=[pdf_new], outputs=[skip_new_p1]) |
|
|
| |
| compare_mode.change( |
| fn=on_compare_mode_change, |
| inputs=[compare_mode], |
| outputs=[specific_pages_row, region_col], |
| ) |
|
|
| |
| |
| _preview_outputs = [region_page_img, region_coords_txt, |
| region_coords_state, display_dpi_state, region_readout] |
|
|
| def on_page_compare_mode_change(sub_mode, pdf_new_f, pg_new): |
| show = (sub_mode == "Specific Region") |
| col_update = gr.update(visible=show) |
| if show: |
| try: |
| pil_img, ctxt, coords, dpi, rdout = on_load_preview(pdf_new_f, pg_new) |
| return col_update, pil_img, ctxt, coords, dpi, rdout |
| except Exception: |
| pass |
| blank_readout = '<div id="region-readout">No region selected β full page will be used</div>' |
| return col_update, None, "", None, 72, blank_readout |
|
|
| page_compare_mode.change( |
| fn=on_page_compare_mode_change, |
| inputs=[page_compare_mode, pdf_new, page_new_input], |
| outputs=[region_preview_col] + _preview_outputs, |
| ) |
|
|
| |
| def on_page_new_change(pg_new, pdf_new_f, sub_mode): |
| if sub_mode == "Specific Region" and pdf_new_f is not None: |
| try: |
| return on_load_preview(pdf_new_f, pg_new) |
| except Exception: |
| pass |
| blank_readout = '<div id="region-readout">No region selected β full page will be used</div>' |
| return None, "", None, 72, blank_readout |
|
|
| page_new_input.change( |
| fn=on_page_new_change, |
| inputs=[page_new_input, pdf_new, page_compare_mode], |
| outputs=_preview_outputs, |
| ) |
|
|
| |
| region_coords_txt.change( |
| fn=on_region_coords_change, |
| inputs=[region_coords_txt], |
| outputs=[region_coords_state, region_readout], |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
|
|
| |
| clear_region_btn.click( |
| fn=on_clear_region, |
| inputs=None, |
| outputs=[region_coords_txt, region_coords_state, region_readout], |
| ) |
|
|
| run_btn.click( |
| fn=on_run, |
| inputs=[pdf_old, pdf_new, skip_old_p1, skip_new_p1, enable_align, |
| compare_mode, page_old_input, page_new_input, |
| page_compare_mode, region_coords_state, display_dpi_state], |
| outputs=[pages_state, rotation_state, nudge_x_state, nudge_y_state, nudge_scale_state, |
| page_num_state, total_pages_state, |
| pdf_output, result_image, nudge_readout, page_slider], |
| ) |
|
|
| |
| view_mode.change( |
| fn=on_view_change, |
| inputs=[view_mode, page_num_state, total_pages_state, pages_state, rotation_state, |
| nudge_x_state, nudge_y_state, nudge_scale_state], |
| outputs=[result_image, rotation_state], |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
|
|
| |
| rot_left_btn.click( |
| fn=on_rot_left, |
| inputs=[page_num_state, total_pages_state, pages_state, view_mode, rotation_state, |
| nudge_x_state, nudge_y_state, nudge_scale_state], |
| outputs=[result_image, rotation_state], |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
| rot_right_btn.click( |
| fn=on_rot_right, |
| inputs=[page_num_state, total_pages_state, pages_state, view_mode, rotation_state, |
| nudge_x_state, nudge_y_state, nudge_scale_state], |
| outputs=[result_image, rotation_state], |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
|
|
| |
| page_slider.change( |
| fn=on_pg_slide, |
| inputs=[page_slider, total_pages_state, pages_state, view_mode, |
| rotation_state, nudge_x_state, nudge_y_state, nudge_scale_state], |
| outputs=[result_image, page_num_state], |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
|
|
| |
| _nudge_inputs = [page_num_state, total_pages_state, pages_state, view_mode, rotation_state, |
| nudge_x_state, nudge_y_state, nudge_scale_state, nudge_step] |
| _nudge_outputs = [result_image, nudge_x_state, nudge_y_state, |
| nudge_scale_state, nudge_readout] |
|
|
| nudge_left_btn.click( |
| fn=lambda *a: on_nudge("left", *a), inputs=_nudge_inputs, outputs=_nudge_outputs, |
| show_progress="hidden", show_progress_on=[]) |
| nudge_right_btn.click( |
| fn=lambda *a: on_nudge("right", *a), inputs=_nudge_inputs, outputs=_nudge_outputs, |
| show_progress="hidden", show_progress_on=[]) |
| nudge_up_btn.click( |
| fn=lambda *a: on_nudge("up", *a), inputs=_nudge_inputs, outputs=_nudge_outputs, |
| show_progress="hidden", show_progress_on=[]) |
| nudge_down_btn.click( |
| fn=lambda *a: on_nudge("down", *a), inputs=_nudge_inputs, outputs=_nudge_outputs, |
| show_progress="hidden", show_progress_on=[]) |
|
|
| |
| nudge_scale.change( |
| fn=on_scale_change, |
| inputs=[nudge_scale, page_num_state, total_pages_state, pages_state, view_mode, |
| rotation_state, nudge_x_state, nudge_y_state], |
| outputs=[result_image, nudge_scale_state, nudge_readout], |
| show_progress="hidden", |
| show_progress_on=[], |
| ) |
|
|
| |
| _INLINE_CANVAS_JS = """ |
| () => { |
| let _overlay = null, _ctx = null; |
| let _dragging = false, _sx = 0, _sy = 0, _sel = null; |
| let _lastCoords = ''; |
| |
| function getImgEl() { |
| // The rendered <img> inside the gr.Image component |
| const wrap = document.getElementById('region-page-img'); |
| return wrap ? wrap.querySelector('img') : null; |
| } |
| |
| function getCoordsEl() { |
| const wrap = document.getElementById('region-coords-txt'); |
| return wrap ? wrap.querySelector('textarea') : null; |
| } |
| |
| function syncOverlay() { |
| if (!_overlay) return; |
| const img = getImgEl(); |
| if (!img || !img.src || img.src.startsWith('data:image/gif')) return; |
| const r = img.getBoundingClientRect(); |
| const pr = img.parentElement.getBoundingClientRect(); |
| _overlay.style.left = (r.left - pr.left) + 'px'; |
| _overlay.style.top = (r.top - pr.top) + 'px'; |
| _overlay.style.width = r.width + 'px'; |
| _overlay.style.height = r.height + 'px'; |
| if (_overlay.width !== Math.round(r.width) || _overlay.height !== Math.round(r.height)) { |
| _overlay.width = Math.round(r.width); |
| _overlay.height = Math.round(r.height); |
| redraw(); |
| } |
| } |
| |
| function toCanvas(cx, cy) { |
| const r = _overlay.getBoundingClientRect(); |
| return { x: (cx - r.left) * _overlay.width / r.width, |
| y: (cy - r.top) * _overlay.height / r.height }; |
| } |
| |
| function redraw() { |
| if (!_ctx || !_overlay.width) return; |
| _ctx.clearRect(0, 0, _overlay.width, _overlay.height); |
| if (_sel) { |
| _ctx.strokeStyle = '#00BBBB'; |
| _ctx.lineWidth = Math.max(2, _overlay.width / 400); |
| _ctx.strokeRect(_sel.x, _sel.y, _sel.w, _sel.h); |
| _ctx.fillStyle = 'rgba(0,187,187,0.15)'; |
| _ctx.fillRect(_sel.x, _sel.y, _sel.w, _sel.h); |
| } |
| } |
| |
| function pushCoords() { |
| const el = getCoordsEl(); |
| if (!el || !_sel) return; |
| // Scale from display px back to natural image px |
| const img = getImgEl(); |
| if (!img) return; |
| const scaleX = img.naturalWidth / _overlay.width; |
| const scaleY = img.naturalHeight / _overlay.height; |
| const val = Math.round(_sel.x * scaleX) + ',' + |
| Math.round(_sel.y * scaleY) + ',' + |
| Math.round(_sel.w * scaleX) + ',' + |
| Math.round(_sel.h * scaleY); |
| const setter = Object.getOwnPropertyDescriptor(HTMLTextAreaElement.prototype, 'value').set; |
| setter.call(el, val); |
| el.dispatchEvent(new Event('input', { bubbles: true })); |
| } |
| |
| function setupOverlay() { |
| const imgWrap = document.getElementById('region-page-img'); |
| if (!imgWrap) return false; |
| // Make sure parent is positioned |
| const parent = imgWrap.querySelector('.image-container') || imgWrap; |
| if (getComputedStyle(parent).position === 'static') parent.style.position = 'relative'; |
| |
| if (!_overlay) { |
| _overlay = document.createElement('canvas'); |
| _overlay.id = 'region-draw-overlay'; |
| _overlay.style.cssText = 'position:absolute;top:0;left:0;cursor:crosshair;z-index:10;pointer-events:all;'; |
| parent.appendChild(_overlay); |
| _ctx = _overlay.getContext('2d'); |
| |
| _overlay.addEventListener('mousedown', function(e) { |
| const p = toCanvas(e.clientX, e.clientY); |
| _sx = p.x; _sy = p.y; _sel = null; _dragging = true; e.preventDefault(); |
| }); |
| _overlay.addEventListener('mousemove', function(e) { |
| if (!_dragging) return; |
| const p = toCanvas(e.clientX, e.clientY); |
| _sel = { x: Math.min(_sx, p.x), y: Math.min(_sy, p.y), |
| w: Math.abs(p.x - _sx), h: Math.abs(p.y - _sy) }; |
| redraw(); e.preventDefault(); |
| }); |
| _overlay.addEventListener('mouseup', function(e) { |
| if (!_dragging) return; _dragging = false; |
| if (!_sel || _sel.w < 5 || _sel.h < 5) { _sel = null; redraw(); return; } |
| redraw(); pushCoords(); e.preventDefault(); |
| }); |
| } |
| return true; |
| } |
| |
| // Poll every 300ms: sync overlay size, watch for cleared coords |
| setInterval(function() { |
| setupOverlay(); |
| syncOverlay(); |
| |
| // Clear overlay when coords textbox is wiped by Clear button |
| const el = getCoordsEl(); |
| if (el) { |
| const cur = el.value; |
| if (cur !== _lastCoords) { |
| _lastCoords = cur; |
| if (cur === '') { _sel = null; redraw(); } |
| } |
| } |
| }, 300); |
| } |
| """ |
| demo.load(fn=None, js=_INLINE_CANVAS_JS) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import socket as _socket |
| def _find_free_port(start: int = 7860, end: int = 7880) -> int: |
| for p in range(start, end + 1): |
| with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as s: |
| try: |
| s.bind(("", p)) |
| return p |
| except OSError: |
| continue |
| return start |
|
|
| _port = _find_free_port() |
| print(f"\nπ POWERGRID Document Auditor β http://localhost:{_port}\n") |
| demo.queue(default_concurrency_limit=20).launch( |
| server_name="0.0.0.0", |
| server_port=_port, |
| share=False, |
| show_error=True, |
| theme=_THEME, |
| css=_CSS, |
| ) |
|
|