| import gradio as gr |
| import cv2 |
| import numpy as np |
| import os |
| import pickle |
| import logging |
| import torch |
| from torchvision import models, transforms |
| from PIL import Image |
| from sklearn.decomposition import PCA |
| from sklearn.preprocessing import StandardScaler |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| TEMPLATE_FILE = "templates_v5.pkl" |
| CLUSTER_VERSION = "v5" |
| TEXTURE_WEIGHT = 1.6 |
| MIN_SAMPLES_WARN = 5 |
| MIN_MATCH_SAMPLES= 3 |
| PCA_COMPONENTS = 64 |
| ANOMALY_THRESHOLD= 2.5 |
| PERFECT_CLASS = "Perfect" |
| UNKNOWN_CLASS = "Unknown" |
|
|
| |
| MIN_COSINE_THRESHOLD = 0.30 |
| |
| MIN_CONFIDENCE_GAP = 0.05 |
| |
| ANOMALY_UNKNOWN_CEILING= 5.0 |
|
|
|
|
| |
| |
| |
|
|
| class CLAHEProcessor: |
| CLAHE_CLIP_LIMIT = 3.0 |
| CLAHE_TILE_SIZE = (8, 8) |
| BILATERAL_D = 9 |
| BILATERAL_SIGMA_C = 75 |
| BILATERAL_SIGMA_S = 75 |
| UNSHARP_STRENGTH = 0.6 |
|
|
| @classmethod |
| def process(cls, rgb: np.ndarray) -> np.ndarray: |
| |
| lab = cv2.cvtColor(rgb, cv2.COLOR_RGB2LAB) |
| l, a, b = cv2.split(lab) |
| l_f = np.float64(l) + 1.0 |
| l_log = np.log(l_f) |
| illum = cv2.GaussianBlur(l_log, (31, 31), 0) |
| reflect = cv2.normalize(l_log - illum, None, 0, 255, cv2.NORM_MINMAX) |
| l_homo = np.uint8(reflect) |
|
|
| |
| clahe = cv2.createCLAHE(clipLimit=cls.CLAHE_CLIP_LIMIT, |
| tileGridSize=cls.CLAHE_TILE_SIZE) |
| l_clahe = clahe.apply(l_homo) |
|
|
| |
| lab_c = cv2.merge((l_clahe, a, b)) |
| rgb_c = cv2.cvtColor(lab_c, cv2.COLOR_LAB2RGB) |
| bgr_den = cv2.bilateralFilter( |
| cv2.cvtColor(rgb_c, cv2.COLOR_RGB2BGR), |
| cls.BILATERAL_D, cls.BILATERAL_SIGMA_C, cls.BILATERAL_SIGMA_S) |
| rgb_den = cv2.cvtColor(bgr_den, cv2.COLOR_BGR2RGB) |
|
|
| |
| blur = cv2.GaussianBlur(rgb_den, (5, 5), 0) |
| sharp = cv2.addWeighted(rgb_den, 1.0 + cls.UNSHARP_STRENGTH, |
| blur, -cls.UNSHARP_STRENGTH, 0) |
| return np.clip(sharp, 0, 255).astype(np.uint8) |
|
|
| @classmethod |
| def preview(cls, rgb: np.ndarray) -> np.ndarray: |
| enh = cls.process(rgb) |
| h = max(rgb.shape[0], enh.shape[0]) |
| o_r = cv2.resize(rgb, (rgb.shape[1], h)) |
| e_r = cv2.resize(enh, (enh.shape[1], h)) |
| def _lbl(img, txt): |
| out = img.copy() |
| cv2.putText(out, txt, (10,30), cv2.FONT_HERSHEY_SIMPLEX, |
| 0.9, (255,255,0), 2, cv2.LINE_AA) |
| return out |
| return np.hstack([_lbl(o_r,"Original"), _lbl(e_r,"Enhanced")]) |
|
|
|
|
| |
| |
| |
|
|
| class FeatureExtractor: |
| def __init__(self): |
| self.backbone = models.resnet50(weights="IMAGENET1K_V1") |
| self.backbone.eval() |
| self.transform = transforms.Compose([ |
| transforms.Resize((224,224)), |
| transforms.ToTensor(), |
| transforms.Normalize(mean=[0.485,0.456,0.406], |
| std =[0.229,0.224,0.225]), |
| ]) |
|
|
| @staticmethod |
| def _texture(gray: np.ndarray) -> np.ndarray: |
| feats = [] |
| g = gray.astype(np.float64) |
| gx = cv2.Sobel(g, cv2.CV_64F, 1, 0, ksize=3) |
| gy = cv2.Sobel(g, cv2.CV_64F, 0, 1, ksize=3) |
| mag = np.sqrt(gx**2 + gy**2) |
| ang = np.arctan2(gy, gx) |
|
|
| mh,_ = np.histogram(mag, bins=32, density=True); feats.extend(mh) |
| ah,_ = np.histogram(ang, bins=36, range=(-np.pi,np.pi), density=True) |
| feats.extend(ah) |
|
|
| h,w = gray.shape |
| ph,pw = max(1,h//4), max(1,w//4) |
| for i in range(4): |
| for j in range(4): |
| p = gray[i*ph:(i+1)*ph, j*pw:(j+1)*pw] |
| if p.size == 0: |
| feats.extend([0.]*4); continue |
| pf = p.astype(np.float64) |
| feats.append(float(np.std(pf))) |
| hp,_ = np.histogram(p,bins=32,range=(0,256),density=True) |
| hp = hp[hp>0] |
| feats.append(-float(np.sum(hp*np.log2(hp+1e-10)))) |
| feats.append(float(np.mean(cv2.Canny(p,50,150))/255.)) |
| gxp = cv2.Sobel(pf,cv2.CV_64F,1,0,ksize=3) |
| gyp = cv2.Sobel(pf,cv2.CV_64F,0,1,ksize=3) |
| feats.append(float(np.mean(np.sqrt(gxp**2+gyp**2)))) |
|
|
| for theta in [0, np.pi/4, np.pi/2, 3*np.pi/4]: |
| for sigma in [3., 5.]: |
| k = cv2.getGaborKernel((21,21),sigma,theta,10.,0.5,0,ktype=cv2.CV_64F) |
| f = cv2.filter2D(g, cv2.CV_64F, k) |
| feats.extend([float(np.mean(f)), float(np.std(f))]) |
|
|
| return np.array(feats, dtype=np.float64) |
|
|
| def extract_raw(self, rgb) -> tuple: |
| """Return raw (un-projected) feature vector + attention overlay.""" |
| if isinstance(rgb, Image.Image): |
| rgb = np.array(rgb.convert("RGB")) |
| rgb = rgb.astype(np.uint8) |
| if len(rgb.shape) == 2: |
| rgb = cv2.cvtColor(rgb, cv2.COLOR_GRAY2RGB) |
|
|
| rgb_enh = CLAHEProcessor.process(rgb) |
|
|
| t = self.transform(Image.fromarray(rgb_enh)).unsqueeze(0) |
| with torch.no_grad(): |
| x = self.backbone.maxpool(self.backbone.relu( |
| self.backbone.bn1(self.backbone.conv1(t)))) |
| x = self.backbone.layer1(x) |
| fl2 = self.backbone.layer2(x) |
| fl3 = self.backbone.layer3(fl2) |
| c2 = torch.mean(fl2,dim=[2,3]).squeeze().cpu().numpy() |
| c3 = torch.mean(fl3,dim=[2,3]).squeeze().cpu().numpy() |
|
|
| amap = torch.sum(fl3,dim=1).squeeze().cpu().numpy() |
| amap = np.maximum(amap,0); amap /= (np.max(amap)+1e-8) |
| amap = cv2.resize(amap,(rgb.shape[1],rgb.shape[0])) |
| hm = cv2.applyColorMap(np.uint8(255*amap),cv2.COLORMAP_JET) |
| ov = cv2.addWeighted(rgb,0.6, |
| cv2.cvtColor(hm,cv2.COLOR_BGR2RGB),0.4,0) |
|
|
| gray_e = cv2.cvtColor(rgb_enh, cv2.COLOR_RGB2GRAY) |
| tex = self._texture(gray_e) |
|
|
| cnn = np.concatenate([c2,c3]) |
| cn = np.linalg.norm(cnn); cu = cnn/cn if cn>1e-8 else cnn |
| tn = np.linalg.norm(tex); tu = tex/tn if tn>1e-8 else tex |
| raw = np.concatenate([cu, tu*TEXTURE_WEIGHT]) |
| n = np.linalg.norm(raw) |
| return (raw/n if n>1e-8 else raw), ov |
|
|
|
|
| |
| |
| |
|
|
| class PCAProjector: |
| |
| def __init__(self, n_components: int = PCA_COMPONENTS): |
| self.n_components = n_components |
| self.pca = None |
| self.scaler = None |
| self.fitted = False |
|
|
| def fit(self, all_vectors: list[np.ndarray]) -> None: |
| if len(all_vectors) < self.n_components + 1: |
| logger.warning("Not enough vectors to fit PCA yet.") |
| return |
| X = np.array(all_vectors) |
| self.scaler = StandardScaler() |
| Xs = self.scaler.fit_transform(X) |
| n_comp = min(self.n_components, Xs.shape[0]-1, Xs.shape[1]) |
| self.pca = PCA(n_components=n_comp, svd_solver="full") |
| self.pca.fit(Xs) |
| var_exp = np.sum(self.pca.explained_variance_ratio_) * 100 |
| logger.info(f"PCA fitted: {n_comp} components, {var_exp:.1f}% variance explained.") |
| self.fitted = True |
|
|
| def project(self, vec: np.ndarray) -> np.ndarray: |
| if not self.fitted: |
| return vec |
| xs = self.scaler.transform(vec.reshape(1,-1)) |
| out = self.pca.transform(xs).squeeze() |
| n = np.linalg.norm(out) |
| return out/n if n>1e-8 else out |
|
|
| def project_many(self, vecs: list[np.ndarray]) -> np.ndarray: |
| if not self.fitted: |
| return np.array(vecs) |
| X = np.array(vecs) |
| Xs = self.scaler.transform(X) |
| out = self.pca.transform(Xs) |
| norms = np.linalg.norm(out, axis=1, keepdims=True) |
| return out / np.where(norms>1e-8, norms, 1.0) |
|
|
|
|
| |
| |
| |
|
|
| class EnginePartDetector: |
|
|
| def __init__(self): |
| self.fe = FeatureExtractor() |
| self.projector = PCAProjector(PCA_COMPONENTS) |
|
|
| |
| self.classes: dict[str, list[np.ndarray]] = {} |
| |
| self.centroids: dict[str, np.ndarray] = {} |
| self.class_spread: dict[str, float] = {} |
| self.class_cov_inv:dict[str, np.ndarray] = {} |
| self.class_rois: dict[str, np.ndarray] = {} |
| self._load_data() |
|
|
| |
|
|
| def _refit_pca_and_centroids(self) -> None: |
| """Call after any class modification β keeps PCA up to date.""" |
| all_vecs = [v for vecs in self.classes.values() for v in vecs] |
| if len(all_vecs) >= PCA_COMPONENTS + 1: |
| self.projector.fit(all_vecs) |
| self._rebuild_all_centroids() |
|
|
| def _rebuild_all_centroids(self) -> None: |
| for name in self.classes: |
| self._compute_centroid(name) |
|
|
| def _compute_centroid(self, name: str) -> None: |
| raw_vecs = self.classes[name] |
| if self.projector.fitted: |
| vecs = self.projector.project_many(raw_vecs) |
| else: |
| vecs = np.array(raw_vecs) |
|
|
| centroid = np.mean(vecs, axis=0) |
| n = np.linalg.norm(centroid) |
| self.centroids[name] = centroid/n if n>1e-8 else centroid |
|
|
| if len(vecs) > 1: |
| dists = [float(np.linalg.norm(v - centroid)) for v in vecs] |
| self.class_spread[name] = float(np.std(dists)) + 1e-6 |
| else: |
| self.class_spread[name] = 1.0 |
|
|
| |
| if len(vecs) >= 4: |
| var = np.var(vecs, axis=0) + 1e-6 |
| self.class_cov_inv[name] = 1.0 / var |
| else: |
| self.class_cov_inv[name] = None |
|
|
| |
|
|
| def _persist_data(self) -> None: |
| try: |
| with open(TEMPLATE_FILE, "wb") as f: |
| pickle.dump({ |
| "version": CLUSTER_VERSION, |
| "texture_weight": TEXTURE_WEIGHT, |
| "pca_components": PCA_COMPONENTS, |
| "classes": self.classes, |
| "rois": self.class_rois, |
| "projector": self.projector, |
| }, f) |
| except Exception as e: |
| logger.error(f"Save failed: {e}") |
|
|
| def _load_data(self) -> None: |
| if not os.path.exists(TEMPLATE_FILE): |
| return |
| try: |
| with open(TEMPLATE_FILE,"rb") as f: |
| data = pickle.load(f) |
| if (data.get("version") != CLUSTER_VERSION or |
| data.get("texture_weight") != TEXTURE_WEIGHT or |
| data.get("pca_components") != PCA_COMPONENTS): |
| logger.warning("Stale cluster file β discarding.") |
| os.remove(TEMPLATE_FILE); return |
|
|
| self.classes = data.get("classes", {}) |
| self.class_rois = data.get("rois", {}) |
| self.projector = data.get("projector", PCAProjector(PCA_COMPONENTS)) |
| self._rebuild_all_centroids() |
| logger.info(f"Loaded {len(self.classes)} class(es).") |
| except Exception as e: |
| logger.error(f"Load failed: {e}") |
| self.classes = {} |
|
|
| |
|
|
| @staticmethod |
| def detect_and_crop(img_rgb: np.ndarray) -> tuple: |
| img_h, img_w = img_rgb.shape[:2] |
| gray = cv2.GaussianBlur( |
| cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY),(7,7),0) |
| sc = img_w / 1000.0 |
| circles = cv2.HoughCircles( |
| gray, cv2.HOUGH_GRADIENT, dp=1.2, |
| minDist=max(30,int(60*sc)), param1=100, param2=35, |
| minRadius=max(5,int(12*sc)), maxRadius=max(20,int(45*sc))) |
|
|
| if circles is None: |
| return img_rgb, img_rgb, "β No bolt holes detected." |
|
|
| circles = np.round(circles[0]).astype(int) |
| ys = [c[1] for c in circles] |
| y_med = np.median(ys) |
| top_row = sorted([c for c in circles if c[1]<y_med], key=lambda x:x[0]) |
| bot_row = sorted([c for c in circles if c[1]>=y_med], key=lambda x:x[0]) |
|
|
| if len(top_row)<2 or len(bot_row)<2: |
| return img_rgb, img_rgb, "β οΈ Insufficient hole rows." |
|
|
| y_top = int(np.mean([c[1] for c in top_row])) |
| y_bot = int(np.mean([c[1] for c in bot_row])) |
| xs = [c[0] for c in circles] |
| x0 = max(0, min(xs)-60); x1 = min(img_w, max(xs)+60) |
| y0 = max(0, min(y_top,y_bot)-20) |
| y1 = min(img_h, max(y_top,y_bot)+20) |
|
|
| vis = img_rgb.copy() |
| cv2.line(vis,(0,y_top),(img_w,y_top),(0,255,0),3) |
| cv2.line(vis,(0,y_bot),(img_w,y_bot),(0,255,0),3) |
| for (x,y,r) in circles: |
| cv2.circle(vis,(x,y),r,(255,0,0),3) |
| cv2.circle(vis,(x,y),2,(255,255,255),-1) |
|
|
| crop = img_rgb[y0:y1, x0:x1] |
| if crop.size == 0: |
| return vis, img_rgb, "β οΈ ROI crop failed." |
|
|
| stats = (f"β
ROI: {len(circles)} holes | " |
| f"{len(top_row)} top, {len(bot_row)} bottom | " |
| f"{crop.shape[1]}Γ{crop.shape[0]} px") |
| return vis, crop, stats |
|
|
| |
|
|
| @staticmethod |
| def _cosine(a,b) -> float: |
| na,nb = np.linalg.norm(a), np.linalg.norm(b) |
| return float(np.dot(a,b)/(na*nb)) if na>1e-8 and nb>1e-8 else 0. |
|
|
| def _mahalanobis(self, query: np.ndarray, name: str) -> float: |
| |
| centroid = self.centroids[name] |
| cov_inv = self.class_cov_inv.get(name) |
| diff = query - centroid |
| if cov_inv is not None: |
| return float(np.sqrt(np.dot(diff**2, cov_inv))) |
| else: |
| return float(np.linalg.norm(diff)) |
|
|
| def _anomaly_score(self, query_proj: np.ndarray) -> dict: |
| |
| if PERFECT_CLASS not in self.centroids: |
| return {"anomaly_z": None, "verdict": "no_perfect_class"} |
|
|
| dist = self._mahalanobis(query_proj, PERFECT_CLASS) |
| spread = self.class_spread.get(PERFECT_CLASS, 1.0) |
| z = dist / (spread + 1e-8) |
| return {"anomaly_z": z, "perfect_dist": dist, "spread": spread, |
| "verdict": "pass" if z < ANOMALY_THRESHOLD else "fail"} |
|
|
| |
|
|
| def add_to_class(self, image: np.ndarray, class_name: str) -> tuple: |
| if image is None: return "β No image supplied.", None |
| if not class_name.strip(): return "β Class name empty.", None |
|
|
| class_name = class_name.strip() |
| vis, roi, log = self.detect_and_crop(image) |
| if "β" in log or "β οΈ" in log: |
| return log, None |
|
|
| raw, _ = self.fe.extract_raw(roi) |
| if class_name not in self.classes: |
| self.classes[class_name] = [] |
| self.classes[class_name].append(raw) |
|
|
| self.class_rois[class_name] = CLAHEProcessor.process(roi) |
| self._refit_pca_and_centroids() |
| self._persist_data() |
|
|
| n = len(self.classes[class_name]) |
| pca_note = (f" PCA fitted on {sum(len(v) for v in self.classes.values())} " |
| f"total vectors β {PCA_COMPONENTS}-D." |
| if self.projector.fitted else |
| f" β οΈ Need {PCA_COMPONENTS+1} total samples to activate PCA.") |
| warn = (f"\nβ οΈ Only {n} sample(s) for '{class_name}'. " |
| f"Add β₯{MIN_SAMPLES_WARN}." if n<MIN_SAMPLES_WARN else "") |
| return (f"β
Added to '{class_name}' ({n} sample(s)){warn}\n" |
| f"{pca_note}\n{log}"), roi |
|
|
| |
|
|
| def add_bulk_to_class(self, file_paths, class_name, progress_cb=None) -> tuple: |
| if not file_paths: return "β No files.", [], None |
| if not class_name.strip(): return "β Class name empty.", [], None |
|
|
| class_name = class_name.strip() |
| total, ok, fail = len(file_paths), 0, 0 |
| log_lines, last_roi = [], None |
|
|
| for idx, fp in enumerate(file_paths): |
| path = fp if isinstance(fp,str) else fp.get("name",str(fp)) |
| fname = os.path.basename(path) |
| try: |
| image = np.array(Image.open(path).convert("RGB")) |
| except Exception as e: |
| log_lines.append(f"β [{idx+1}/{total}] {fname} β load error: {e}") |
| fail += 1; continue |
|
|
| vis, roi, loc = self.detect_and_crop(image) |
| if "β" in loc or "β οΈ" in loc: |
| log_lines.append(f"β οΈ [{idx+1}/{total}] {fname} β {loc}") |
| fail += 1; continue |
|
|
| try: |
| raw, _ = self.fe.extract_raw(roi) |
| if class_name not in self.classes: |
| self.classes[class_name] = [] |
| self.classes[class_name].append(raw) |
| last_roi = roi; ok += 1 |
| log_lines.append(f"β
[{idx+1}/{total}] {fname}") |
| except Exception as e: |
| log_lines.append(f"β [{idx+1}/{total}] {fname} β {e}") |
| fail += 1 |
|
|
| if progress_cb: progress_cb(idx+1, total) |
|
|
| if ok > 0: |
| self.class_rois[class_name] = CLAHEProcessor.process(last_roi) |
| self._refit_pca_and_centroids() |
| self._persist_data() |
|
|
| n = len(self.classes.get(class_name,[])) |
| pca = (f"PCA active: {PCA_COMPONENTS}-D projection." |
| if self.projector.fitted else |
| f"PCA pending: need {max(0,PCA_COMPONENTS+1 - sum(len(v) for v in self.classes.values()))} more total samples.") |
| summary = ( |
| f"### Bulk Upload\n" |
| f"- **Class**: `{class_name}` | **Total**: {total} | " |
| f"β
{ok} β {fail}\n" |
| f"- **'{class_name}' total samples**: {n}\n" |
| f"- {pca}" |
| ) |
| return summary, log_lines, last_roi |
|
|
| |
|
|
| def match_part(self, image: np.ndarray, threshold: float = 0.75) -> tuple: |
| if image is None: |
| return "β No image.", None, None, None, None |
| if not self.classes: |
| return ("β οΈ No classes trained yet.", None, None, None, None) |
|
|
| vis, roi, log = self.detect_and_crop(image) |
| if "β" in log or "β οΈ" in log: |
| return f"β {log}", None, vis, None, None |
|
|
| raw_feat, attn_map = self.fe.extract_raw(roi) |
|
|
| |
| if self.projector.fitted: |
| q = self.projector.project(raw_feat) |
| pca_note = f"β
PCA active ({PCA_COMPONENTS}-D projection)" |
| else: |
| q = raw_feat |
| total_needed = PCA_COMPONENTS + 1 |
| total_have = sum(len(v) for v in self.classes.values()) |
| pca_note = (f"β οΈ PCA not yet fitted β need " |
| f"{total_needed - total_have} more total samples. " |
| f"Results may be unreliable.") |
|
|
| |
| anomaly = self._anomaly_score(q) |
|
|
| |
| eligible = {n:c for n,c in self.centroids.items() |
| if len(self.classes[n]) >= MIN_MATCH_SAMPLES} |
| skipped = [n for n in self.classes if n not in eligible] |
|
|
| if not eligible: |
| return (f"β οΈ No class has β₯{MIN_MATCH_SAMPLES} samples.", None, vis, None, None) |
|
|
| |
| class_scores = [] |
| for name, centroid in eligible.items(): |
| cos = self._cosine(q, centroid) |
| spread = self.class_spread.get(name, 1.0) |
| adj = cos / (1.0 + spread) |
| class_scores.append((name, adj, cos)) |
|
|
| class_scores.sort(key=lambda x:x[1], reverse=True) |
| best_name, best_adj, best_cos = class_scores[0] |
| second_adj = class_scores[1][1] if len(class_scores)>1 else 0. |
| cosine_gap = best_adj - second_adj |
|
|
| |
| TEMPERATURE = 0.05 |
| adj_arr = np.array([s[1] for s in class_scores]) |
| exp_s = np.exp((adj_arr - np.max(adj_arr)) / TEMPERATURE) |
| probs = exp_s / np.sum(exp_s) |
| total_s = sum(len(self.classes[n]) for n in eligible) |
| n_cls = len(eligible) |
|
|
| weighted = [] |
| for (name, adj, cos), p in zip(class_scores, probs): |
| w = total_s / (n_cls * len(self.classes[name])) |
| weighted.append((name, p*w, cos)) |
|
|
| total_w = sum(x[1] for x in weighted) |
| class_probs= [(n, p/total_w, c) for n,p,c in weighted] |
| class_probs.sort(key=lambda x:x[1], reverse=True) |
|
|
| top_class = class_probs[0][0] |
| top_prob = class_probs[0][1] |
| top_cos = class_probs[0][2] |
|
|
| |
| |
| |
| second_prob = class_probs[1][1] if len(class_probs) > 1 else 0.0 |
| prob_gap = top_prob - second_prob |
|
|
| is_weak_match = ( |
| top_cos < MIN_COSINE_THRESHOLD |
| or prob_gap < MIN_CONFIDENCE_GAP |
| ) |
|
|
| |
| az = anomaly.get("anomaly_z") |
|
|
| if is_weak_match: |
| |
| verdict_class = UNKNOWN_CLASS |
| final_status = ( |
| f"β UNKNOWN " |
| f"(best cosine: {top_cos:.4f}, threshold: {MIN_COSINE_THRESHOLD})" |
| ) |
| elif az is not None: |
| if az >= ANOMALY_UNKNOWN_CEILING: |
| |
| verdict_class = UNKNOWN_CLASS |
| final_status = ( |
| f"β UNKNOWN " |
| f"(z={az:.2f}, ceiling: {ANOMALY_UNKNOWN_CEILING})" |
| ) |
| elif az < ANOMALY_THRESHOLD: |
| final_status = "β
PASS β surface matches Perfect cluster" |
| verdict_class = PERFECT_CLASS |
| else: |
| |
| non_perfect = [(n,p,c) for n,p,c in class_probs |
| if n.lower() != "perfect"] |
| if non_perfect: |
| verdict_class = non_perfect[0][0] |
| else: |
| verdict_class = top_class |
| final_status = f"β FAIL β anomaly detected ({verdict_class})" |
| else: |
| |
| verdict_class = top_class |
| if "perfect" in top_class.lower(): |
| final_status = "β
PASS" if top_prob >= threshold else "β UNCERTAIN" |
| else: |
| final_status = f"β FAIL β {verdict_class}" |
|
|
| |
| az_bar = "" |
| if az is not None: |
| filled = int(min(az / (ANOMALY_THRESHOLD * 1.5), 1.0) * 20) |
| az_bar = "β"*filled + "β"*(20-filled) |
| az_bar = f"`[{az_bar}]` {az:.2f} (threshold: {ANOMALY_THRESHOLD})" |
|
|
| lines = [ |
| f"## {final_status}", |
| "", |
| "### π¬ Anomaly Score (primary signal)", |
| f"Distance from Perfect cluster: {az_bar}" if az_bar else "*(No Perfect class trained)*", |
| "", |
| "### π Class Probabilities (PCA cosine, secondary signal)", |
| ] |
| for name, prob, cos in class_probs: |
| marker = "π " if name == verdict_class else " " |
| lines.append(f"{marker}`{name}`: **{prob:.1%}** (cosine: {cos:.4f})") |
|
|
| |
| if verdict_class == UNKNOWN_CLASS and UNKNOWN_CLASS not in [n for n,_,_ in class_probs]: |
| lines.append(f"π `{UNKNOWN_CLASS}`: **(default β no match)**") |
|
|
| lines += [ |
| "", |
| f"**Cosine gap**: {cosine_gap:.4f} | " |
| f"**Best cosine**: {top_cos:.4f} | {pca_note}", |
| "", |
| "### Pipeline", |
| "1. ROI localisation 2. CLAHE 3. ResNet-50 features", |
| "4. PCA projection 5. Anomaly z-score + centroid cosine", |
| "---", log, |
| ] |
| if skipped: |
| lines.append(f"\nβ οΈ Skipped (too few samples): {', '.join(skipped)}") |
|
|
| |
| label_dict = {n: float(p) for n,p,_ in class_probs} |
| if verdict_class == UNKNOWN_CLASS and UNKNOWN_CLASS not in label_dict: |
| label_dict[UNKNOWN_CLASS] = 0.0 |
|
|
| roi_e = CLAHEProcessor.process(roi) |
| gray_e = cv2.cvtColor(roi_e, cv2.COLOR_RGB2GRAY) |
| edges = cv2.cvtColor(cv2.Canny(gray_e,50,150), cv2.COLOR_GRAY2RGB) |
|
|
| return "\n".join(lines), label_dict, vis, attn_map, edges |
|
|
| |
|
|
| def get_template_roi(self, name): |
| return self.class_rois.get(name) |
|
|
| def list_templates(self) -> str: |
| if not self.classes: return "No classes trained yet." |
| total = sum(len(v) for v in self.classes.values()) |
| pca_s = (f"PCA: β
active ({PCA_COMPONENTS}-D)" |
| if self.projector.fitted else |
| f"PCA: β³ need {max(0,PCA_COMPONENTS+1-total)} more samples") |
| lines = [f"Classes: {len(self.classes)} | Samples: {total} | {pca_s}", |
| f"Version: {CLUSTER_VERSION}", "β"*45] |
| for name, vecs in sorted(self.classes.items()): |
| pct = 100*len(vecs)/total if total else 0 |
| warn = f" β οΈ need {MIN_SAMPLES_WARN-len(vecs)} more" if len(vecs)<MIN_SAMPLES_WARN else "" |
| spread = self.class_spread.get(name, 0) |
| lines.append(f" β’ {name}: {len(vecs)} samples ({pct:.0f}%) spread={spread:.4f}{warn}") |
| return "\n".join(lines) |
|
|
| def delete_class(self, name: str) -> bool: |
| if name in self.classes: |
| del self.classes[name] |
| for d in [self.centroids, self.class_spread, self.class_cov_inv, self.class_rois]: |
| d.pop(name, None) |
| self._refit_pca_and_centroids() |
| self._persist_data() |
| return True |
| return False |
|
|
| def reset_all(self) -> str: |
| self.classes={}; self.centroids={}; self.class_spread={} |
| self.class_cov_inv={}; self.class_rois={} |
| self.projector = PCAProjector(PCA_COMPONENTS) |
| if os.path.exists(TEMPLATE_FILE): os.remove(TEMPLATE_FILE) |
| return "β
All classes cleared. PCA reset." |
|
|
|
|
| |
| |
| |
|
|
| detector = EnginePartDetector() |
|
|
| def detect_part(image, threshold): |
| return detector.match_part(image, threshold) |
|
|
| def add_sample(image, class_name): |
| return detector.add_to_class(image, class_name) |
|
|
| def add_bulk(files, class_name, progress=gr.Progress()): |
| paths = [f.name if hasattr(f,"name") else f for f in (files or [])] |
| def cb(done, total): progress(done/total, desc=f"{done}/{total}") |
| summary, log_lines, last_roi = detector.add_bulk_to_class(paths, class_name, cb) |
| return summary, "\n".join(log_lines), last_roi |
|
|
| def clahe_preview(image): |
| return CLAHEProcessor.preview(image) if image is not None else None |
|
|
| def update_library_preview(): |
| txt = detector.list_templates() |
| roi = detector.get_template_roi(sorted(detector.classes.keys())[0]) if detector.classes else None |
| return txt, roi |
|
|
| def delete_class_ui(class_name): |
| ok = detector.delete_class(class_name) |
| msg = f"β
Deleted '{class_name}'." if ok else f"β Not found." |
| txt, roi = update_library_preview() |
| return msg, txt, roi |
|
|
| def reset_all_ui(): |
| return detector.reset_all(), "No classes.", None |
|
|
|
|
| custom_css = """ |
| .header{text-align:center;margin-bottom:1.5rem;} |
| .footer{text-align:center;margin-top:1.5rem;color:#666;} |
| """ |
|
|
| with gr.Blocks(title="Engine Part CV System v5") as demo: |
|
|
| gr.Markdown(""" |
| <div class="header"> |
| <h1>π§ Engine Part CV System <code>v5</code></h1> |
| <p><strong>Pipeline:</strong> |
| ROI β CLAHE β ResNet-50 β <b>PCA (64-D)</b> β Anomaly Score + Centroid Cosine</p> |
| <p>β οΈ <em>Add β₯10 images per class. PCA activates after 65 total samples.</em></p> |
| </div> |
| """) |
|
|
| |
| with gr.Tab("π Inspect Part"): |
| with gr.Row(): |
| with gr.Column(): |
| det_img = gr.Image(sources=["upload","webcam"], |
| type="numpy", label="Input Image") |
| thresh = gr.Slider(0.50, 0.99, value=0.75, step=0.01, |
| label="Confidence Threshold") |
| det_btn = gr.Button("π Run Inspection", variant="primary") |
| with gr.Column(): |
| det_out = gr.Markdown() |
| lbl_out = gr.Label(label="Class Probabilities", num_top_classes=5) |
| with gr.Row(): |
| vis_out = gr.Image(label="Field Visualisation") |
| attn_out = gr.Image(label="AI Attention Heatmap") |
| edge_out = gr.Image(label="Edge Map") |
|
|
| det_btn.click(detect_part, [det_img, thresh], |
| [det_out, lbl_out, vis_out, attn_out, edge_out], |
| api_name="detect_part") |
|
|
| |
| with gr.Tab("πΎ Train β Single"): |
| with gr.Row(): |
| with gr.Column(): |
| s_img = gr.Image(sources=["upload"], type="numpy", |
| label="Training Image") |
| s_cls = gr.Dropdown(["Perfect","Defected","Unknown"], |
| value="Perfect", allow_custom_value=True, |
| label="Class") |
| s_btn = gr.Button("πΎ Add", variant="primary") |
| with gr.Column(): |
| s_stat = gr.Textbox(label="Status", lines=7) |
| s_roi = gr.Image(label="Processed ROI", interactive=False) |
| s_btn.click(add_sample,[s_img,s_cls],[s_stat,s_roi],api_name="add_sample") |
|
|
| |
| with gr.Tab("π¦ Train β Bulk"): |
| gr.Markdown("Select multiple images. All assigned to the chosen class.") |
| with gr.Row(): |
| with gr.Column(): |
| b_files = gr.File(label="Images", file_count="multiple", |
| file_types=["image"]) |
| b_cls = gr.Dropdown(["Perfect","Defected","Unknown"], |
| value="Perfect", allow_custom_value=True, |
| label="Class") |
| b_btn = gr.Button("π¦ Add All", variant="primary") |
| with gr.Column(): |
| b_sum = gr.Markdown() |
| b_log = gr.Textbox(label="Per-Image Log", lines=14, |
| max_lines=30, interactive=False) |
| b_roi = gr.Image(label="Last ROI", interactive=False) |
| b_btn.click(add_bulk,[b_files,b_cls],[b_sum,b_log,b_roi],api_name="add_bulk") |
|
|
| |
| with gr.Tab("π¨ CLAHE Preview"): |
| gr.Markdown("See before/after of the 4-stage CLAHE enhancement pipeline.") |
| with gr.Row(): |
| with gr.Column(): |
| cp_in = gr.Image(sources=["upload"], type="numpy", label="Input") |
| cp_btn = gr.Button("π¨ Preview", variant="secondary") |
| with gr.Column(scale=2): |
| cp_out = gr.Image(label="Original | Enhanced", interactive=False) |
| cp_btn.click(clahe_preview,[cp_in],[cp_out]) |
|
|
| |
| with gr.Tab("π Class Library"): |
| with gr.Row(): |
| with gr.Column(): |
| lib_txt = gr.Textbox(label="Trained Classes", lines=14) |
| ref_btn = gr.Button("π Refresh", variant="secondary") |
| with gr.Column(): |
| lib_roi = gr.Image(label="Reference ROI", interactive=False) |
| gr.Markdown("### β οΈ Danger Zone") |
| with gr.Row(): |
| del_cls = gr.Dropdown(["Perfect","Defected","Unknown"], |
| allow_custom_value=True, label="Delete") |
| del_btn = gr.Button("ποΈ Delete", variant="stop") |
| del_st = gr.Textbox(label="Status", lines=2) |
| rst_btn = gr.Button("π₯ Reset ALL", variant="stop") |
| rst_st = gr.Textbox(label="Reset Status", lines=2) |
|
|
| ref_btn.click(update_library_preview, [], [lib_txt, lib_roi], |
| api_name="list_classes") |
| del_btn.click(delete_class_ui, [del_cls], [del_st, lib_txt, lib_roi], |
| api_name="delete_class") |
| rst_btn.click(reset_all_ui, [], [rst_st, lib_txt, lib_roi]) |
| demo.load(update_library_preview, [], [lib_txt, lib_roi]) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch( |
| share = False, |
| show_error = True, |
| theme = gr.themes.Soft(), |
| css = custom_css, |
| ) |