# import cv2 # import numpy as np # from PIL import Image # import io # import json # from datetime import datetime # from typing import List, Tuple, Dict, Optional # from dataclasses import dataclass # import time # import os # import warnings # import gc # from collections import OrderedDict # # ============================================================================ # # CLOUD DEPLOYMENT OPTIMIZATIONS # # ============================================================================ # IS_HF_SPACE = os.environ.get('SPACE_ID') is not None or os.environ.get('HF_SPACE') is not None # IS_RENDER = os.environ.get('RENDER') is not None # IS_CLOUD = IS_HF_SPACE or IS_RENDER # # Memory limits for cloud # MAX_IMAGE_SIZE = (1280, 960) if IS_CLOUD else (1920, 1440) # MAX_CACHE_SIZE = 50 if IS_CLOUD else 100 # ENABLE_GC = IS_CLOUD # Force garbage collection on cloud # # Lazy imports for faster cold start # _torch = None # _torchvision = None # _YOLO = None # def _get_torch(): # global _torch # if _torch is None: # import torch # _torch = torch # if IS_CLOUD: # _torch.set_num_threads(2) # return _torch # def _get_torchvision(): # global _torchvision # if _torchvision is None: # import torchvision # _torchvision = torchvision # return _torchvision # def _get_yolo(): # global _YOLO # if _YOLO is None: # try: # from ultralytics import YOLO # _YOLO = YOLO # except ImportError: # _YOLO = False # return _YOLO # def _yolo_available(): # yolo = _get_yolo() # return yolo is not False and yolo is not None # warnings.filterwarnings('ignore') # # Import optimized validators # try: # from saddle_structure_validator import SaddleStructureValidator, SaddleStructure # STRUCTURE_VALIDATOR_AVAILABLE = True # except ImportError: # STRUCTURE_VALIDATOR_AVAILABLE = False # SaddleStructure = None # print("[WARN] saddle_structure_validator not available") # try: # from geometric_validator import OptimizedGeometricValidator, PhysicalDimensions, PixelToMMCalibrator # OPTIMIZED_VALIDATOR_AVAILABLE = True # except ImportError: # OPTIMIZED_VALIDATOR_AVAILABLE = False # print("[WARN] geometric_validator not available") # if IS_HF_SPACE: # print("[HF Space] Memory-optimized mode enabled") # elif IS_RENDER: # print("[Render] Cloud-optimized mode enabled") # @dataclass # class SaddleROI: # """Saddle region of interest with rotation and structure info""" # id: int # bbox: Tuple[int, int, int, int] # crop: np.ndarray # center: Tuple[int, int] # area: int # angle: float # confidence: float # structure: Optional[any] = None # def to_dict(self): # result = { # 'id': int(self.id), # 'bbox': [int(x) for x in self.bbox], # 'center': [int(self.center[0]), int(self.center[1])], # 'area': int(self.area), # 'angle': float(self.angle), # 'confidence': float(self.confidence) # } # if self.structure is not None: # result['has_semicircle'] = bool(getattr(self.structure, 'has_semicircle', False)) # result['has_middle_arc'] = bool(getattr(self.structure, 'has_middle_arc', False)) # result['structure_confidence'] = float(getattr(self.structure, 'structure_confidence', 0.0)) # return result # def cleanup(self): # """Free memory""" # if hasattr(self, 'crop'): # del self.crop # self.crop = None # self.structure = None # @dataclass # class SaddleInspectionResult: # """Inspection result for one saddle""" # saddle_id: int # status: str # similarity_score: float # confidence: float # feature_distance: float # detected: bool # def to_dict(self): # return { # 'saddle_id': int(self.saddle_id), # 'status': str(self.status), # 'similarity_score': float(self.similarity_score), # 'confidence': float(self.confidence), # 'feature_distance': float(self.feature_distance), # 'detected': bool(self.detected) # } # @dataclass # class BlockInspectionResult: # """Complete block inspection result""" # block_status: str # total_saddles: int # detected_saddles: int # defective_saddles: int # saddle_results: List[SaddleInspectionResult] # processing_time_ms: float # alignment_status: str # block_angle: float # def to_dict(self): # return { # 'block_status': str(self.block_status), # 'total_saddles': int(self.total_saddles), # 'detected_saddles': int(self.detected_saddles), # 'defective_saddles': int(self.defective_saddles), # 'saddle_results': [r.to_dict() for r in self.saddle_results], # 'processing_time_ms': float(self.processing_time_ms), # 'alignment_status': str(self.alignment_status), # 'block_angle': float(self.block_angle) # } # class LRUCache: # """Thread-safe LRU cache with size limit""" # def __init__(self, max_size=100): # self.cache = OrderedDict() # self.max_size = max_size # def get(self, key): # if key in self.cache: # self.cache.move_to_end(key) # return self.cache[key] # return None # def put(self, key, value): # if key in self.cache: # self.cache.move_to_end(key) # self.cache[key] = value # if len(self.cache) > self.max_size: # self.cache.popitem(last=False) # def clear(self): # self.cache.clear() # if ENABLE_GC: # gc.collect() # class YOLOOBBDetector: # """YOLO-OBB Detector with lazy loading and memory management""" # def __init__(self, model_path: Optional[str] = None): # self.model = None # self.model_loaded = False # self._model_path = model_path if model_path else 'yolo26n-obb.pt' # self._initialized = False # def _lazy_init(self): # if self._initialized: # return # self._initialized = True # if not _yolo_available(): # print("⚠ YOLO not available") # return # YOLO = _get_yolo() # try: # self.model = YOLO(self._model_path) # self.model_loaded = True # print(f"✓ YOLO-OBB loaded: {self._model_path}") # except Exception as e: # try: # fallback = 'yolo11n-obb.pt' if self._model_path == 'yolo26n-obb.pt' else 'yolo26n-obb.pt' # self.model = YOLO(fallback) # self.model_loaded = True # print(f"✓ YOLO-OBB fallback: {fallback}") # except: # print(f"✗ YOLO load failed: {e}") # self.model_loaded = False # @property # def available(self): # if not self._initialized: # self._lazy_init() # return self.model_loaded # def detect_saddles(self, image: np.ndarray, conf_threshold: float = 0.25) -> List[SaddleROI]: # """Detect saddles using YOLO-OBB with memory optimization""" # if not self.available or self.model is None: # return [] # # Resize if too large # h, w = image.shape[:2] # if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: # scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) # new_w, new_h = int(w*scale), int(h*scale) # image_resized = cv2.resize(image, (new_w, new_h)) # else: # image_resized = image # scale = 1.0 # # Convert to grayscale for faster inference # if len(image_resized.shape) == 3: # gray = cv2.cvtColor(image_resized, cv2.COLOR_BGR2GRAY) # gray_3ch = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) # else: # gray_3ch = cv2.cvtColor(image_resized, cv2.COLOR_GRAY2BGR) # results = self.model(gray_3ch, conf=conf_threshold, verbose=False) # saddles = [] # for result in results: # if hasattr(result, 'obb') and result.obb is not None: # for i, obb in enumerate(result.obb): # xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy() # conf = float(obb.conf[0]) # # Scale back to original coordinates # xyxyxyxy = xyxyxyxy / scale # angle = self._calculate_angle(xyxyxyxy) # x_coords = xyxyxyxy[:, 0] # y_coords = xyxyxyxy[:, 1] # x_min, x_max = int(x_coords.min()), int(x_coords.max()) # y_min, y_max = int(y_coords.min()), int(y_coords.max()) # w = x_max - x_min # h = y_max - y_min # cx = int(np.mean(x_coords)) # cy = int(np.mean(y_coords)) # # Crop from original image # crop = image[y_min:y_max, x_min:x_max].copy() # area = w * h # saddles.append(SaddleROI( # id=i, # bbox=(x_min, y_min, w, h), # crop=crop, # center=(cx, cy), # area=area, # angle=angle, # confidence=conf # )) # elif hasattr(result, 'boxes') and result.boxes is not None: # for i, box in enumerate(result.boxes): # xyxy = box.xyxy[0].cpu().numpy() # conf = float(box.conf[0]) # # Scale back # xyxy = xyxy / scale # x_min, y_min, x_max, y_max = map(int, xyxy) # w = x_max - x_min # h = y_max - y_min # cx = (x_min + x_max) // 2 # cy = (y_min + y_max) // 2 # crop = image[y_min:y_max, x_min:x_max].copy() # area = w * h # saddles.append(SaddleROI( # id=i, # bbox=(x_min, y_min, w, h), # crop=crop, # center=(cx, cy), # area=area, # angle=0.0, # confidence=conf # )) # # Sort by position # saddles.sort(key=lambda s: (s.center[1], s.center[0])) # for i, saddle in enumerate(saddles): # saddle.id = i # print(f"✓ YOLO detected: {len(saddles)} saddles") # # Memory cleanup # if ENABLE_GC: # del gray_3ch, results # gc.collect() # return saddles # def _calculate_angle(self, xyxyxyxy: np.ndarray) -> float: # p1 = xyxyxyxy[0] # p2 = xyxyxyxy[1] # dx = p2[0] - p1[0] # dy = p2[1] - p1[1] # angle = np.degrees(np.arctan2(dy, dx)) # if angle > 180: # angle -= 360 # elif angle < -180: # angle += 360 # return float(angle) # def cleanup(self): # """Release model from memory""" # if self.model is not None: # del self.model # self.model = None # self.model_loaded = False # if ENABLE_GC: # gc.collect() # class GeometricSaddleDetector: # """Optimized geometric detector with better 4-saddle detection""" # def __init__(self, max_cache_size=None): # if max_cache_size is None: # max_cache_size = MAX_CACHE_SIZE # self._mask_cache = LRUCache(max_cache_size) # def detect_saddles(self, image: np.ndarray) -> List[SaddleROI]: # """Detect using improved geometric segmentation""" # print(f"[GeometricDetector] Input shape: {image.shape}") # h, w = image.shape[:2] # # Resize if too large # if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: # scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) # new_w, new_h = int(w*scale), int(h*scale) # image_work = cv2.resize(image, (new_w, new_h)) # print(f"[GeometricDetector] Resized to {new_w}x{new_h}") # else: # image_work = image # scale = 1.0 # # Detect circles with improved method # outer_circle, inner_circle = self._detect_circles_improved(image_work) # if outer_circle is None: # print("[GeometricDetector] ✗ Circle detection failed") # return [] # ox, oy, or_ = outer_circle # ix, iy, ir = inner_circle # # Scale back to original # if scale != 1.0: # ox, oy = int(ox/scale), int(oy/scale) # or_, ir = int(or_/scale), int(ir/scale) # h, w = image.shape[:2] # print(f"[GeometricDetector] Outer: ({ox},{oy}) r={or_}, Inner: ({ix},{iy}) r={ir}") # # Validate # if or_ < 50 or or_ > min(h, w): # print(f"[GeometricDetector] ✗ Invalid outer radius: {or_}") # return [] # if ir >= or_: # ir = int(or_ * 0.4) # print(f"[GeometricDetector] Adjusted inner radius to {ir}") # # Create annular mask # annular_mask = np.zeros((h, w), dtype=np.uint8) # cv2.circle(annular_mask, (ox, oy), or_, 255, -1) # cv2.circle(annular_mask, (ix, iy), ir, 0, -1) # mask_pixels = np.sum(annular_mask > 0) # print(f"[GeometricDetector] Annular mask pixels: {mask_pixels}") # if mask_pixels < 1000: # print(f"[GeometricDetector] ✗ Mask too small") # return [] # # Find saddles using contour detection instead of fixed quadrants # saddles = self._detect_saddles_from_annular_region(image, annular_mask, (ox, oy), or_, ir) # print(f"[GeometricDetector] Total saddles detected: {len(saddles)}") # # Memory cleanup # if ENABLE_GC: # del annular_mask, image_work # gc.collect() # return saddles # def _detect_saddles_from_annular_region( # self, # image: np.ndarray, # annular_mask: np.ndarray, # center: Tuple[int, int], # outer_r: int, # inner_r: int # ) -> List[SaddleROI]: # """ # Improved saddle detection using contour-based approach # """ # h, w = annular_mask.shape # # Apply mask to image # if len(image.shape) == 3: # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # else: # gray = image # masked = cv2.bitwise_and(gray, gray, mask=annular_mask) # # Enhance contrast # masked = cv2.equalizeHist(masked) # # Edge detection on masked region # edges = cv2.Canny(masked, 50, 150) # # Find contours # contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # # Filter and group contours # valid_contours = [] # for cnt in contours: # area = cv2.contourArea(cnt) # if area > 500: # Minimum area threshold # valid_contours.append(cnt) # if len(valid_contours) < 4: # # Fallback to quadrant-based detection # print("[GeometricDetector] Using quadrant fallback") # return self._detect_quadrant_based(image, annular_mask, center, outer_r, inner_r) # # Sort contours by position (left to right, top to bottom) # valid_contours = sorted(valid_contours, key=lambda c: (cv2.boundingRect(c)[1], cv2.boundingRect(c)[0])) # # Take top 4 contours # saddles = [] # for idx, cnt in enumerate(valid_contours[:4]): # x, y, w_box, h_box = cv2.boundingRect(cnt) # # Validate size # if w_box < 10 or h_box < 10: # continue # # Extract crop # crop = image[y:y+h_box, x:x+w_box].copy() # mask_crop = annular_mask[y:y+h_box, x:x+w_box].copy() # crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop) # # Calculate center # M = cv2.moments(cnt) # if M['m00'] != 0: # cx = int(M['m10'] / M['m00']) # cy = int(M['m01'] / M['m00']) # else: # cx = x + w_box // 2 # cy = y + h_box // 2 # area = int(M['m00']) # saddles.append(SaddleROI( # id=idx, # bbox=(x, y, w_box, h_box), # crop=crop_masked, # center=(cx, cy), # area=area, # angle=0.0, # confidence=0.8 # Geometric detection confidence # )) # return saddles # def _detect_quadrant_based( # self, # image: np.ndarray, # annular_mask: np.ndarray, # center: Tuple[int, int], # outer_r: int, # inner_r: int # ) -> List[SaddleROI]: # """Fallback quadrant-based detection""" # h, w = annular_mask.shape # ox, oy = center # quadrants = [ # (0, 90, "Top-Right"), # (90, 180, "Top-Left"), # (180, 270, "Bottom-Left"), # (270, 360, "Bottom-Right") # ] # saddles = [] # for idx, (start_angle, end_angle, name) in enumerate(quadrants): # mask_key = (h, w, ox, oy, outer_r, inner_r, start_angle, end_angle) # sector_mask = self._mask_cache.get(mask_key) # if sector_mask is None: # sector_mask = self._create_sector_mask( # (h, w), (ox, oy), outer_r, inner_r, start_angle, end_angle # ) # self._mask_cache.put(mask_key, sector_mask) # saddle_mask = cv2.bitwise_and(annular_mask, sector_mask) # ys, xs = np.where(saddle_mask > 0) # if len(xs) == 0 or len(ys) == 0: # print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): No pixels") # continue # x_min, x_max = xs.min(), xs.max() # y_min, y_max = ys.min(), ys.max() # if (x_max - x_min) < 10 or (y_max - y_min) < 10: # print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): Box too small") # continue # crop = image[y_min:y_max+1, x_min:x_max+1].copy() # mask_crop = saddle_mask[y_min:y_max+1, x_min:x_max+1].copy() # crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop) # cx = int(np.mean(xs)) # cy = int(np.mean(ys)) # area = np.sum(saddle_mask > 0) # saddles.append(SaddleROI( # id=idx, # bbox=(x_min, y_min, x_max - x_min, y_max - y_min), # crop=crop_masked, # center=(cx, cy), # area=area, # angle=0.0, # confidence=1.0 # )) # return saddles # def _detect_circles_improved(self, image: np.ndarray) -> Tuple[Optional[Tuple], Optional[Tuple]]: # """Improved circle detection with multiple strategies""" # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image # h, w = gray.shape # # Strategy 1: Standard HoughCircles with optimized params # gray_blur = cv2.GaussianBlur(gray, (9, 9), 2) # minRadius = int(min(h, w) * 0.2) # maxRadius = int(min(h, w) * 0.7) # # Try multiple parameter sets # param_sets = [ # (80, 25), # Standard # (60, 30), # More lenient # (100, 20), # More strict # ] # for param1, param2 in param_sets: # circles = cv2.HoughCircles( # gray_blur, # cv2.HOUGH_GRADIENT, # dp=1.0, # minDist=80, # param1=param1, # param2=param2, # minRadius=minRadius, # maxRadius=maxRadius # ) # if circles is not None and len(circles[0]) >= 1: # circles = np.round(circles[0, :]).astype(int) # circles = sorted(circles, key=lambda c: c[2], reverse=True) # outer = tuple(circles[0]) # inner_radius = int(outer[2] * 0.4) # inner = (outer[0], outer[1], inner_radius) # print(f"✓ HoughCircles success (param1={param1}, param2={param2})") # return outer, inner # # Strategy 2: Contour-based detection # print("→ Using contour fallback") # edges = cv2.Canny(gray_blur, 30, 100) # # Dilate to connect edges # kernel = np.ones((3, 3), np.uint8) # edges = cv2.dilate(edges, kernel, iterations=2) # contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # if contours: # # Find largest contour # largest = max(contours, key=cv2.contourArea) # (cx, cy), radius = cv2.minEnclosingCircle(largest) # center_x, center_y = int(cx), int(cy) # outer_radius = int(radius * 0.85) # inner_radius = int(outer_radius * 0.4) # print(f"✓ Contour-based: center=({center_x},{center_y}), radius={outer_radius}") # return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) # # Strategy 3: Dimension-based estimate # center_x, center_y = w // 2, h // 2 # outer_radius = min(w, h) // 2 - 50 # inner_radius = int(outer_radius * 0.4) # print(f"→ Using dimension estimate: center=({center_x},{center_y}), radius={outer_radius}") # return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) # def _create_sector_mask(self, shape, center, outer_r, inner_r, start_angle, end_angle): # """Create sector mask with caching""" # h, w = shape # mask = np.zeros((h, w), dtype=np.uint8) # y, x = np.ogrid[:h, :w] # dx = x - center[0] # dy = y - center[1] # angles = np.rad2deg(np.arctan2(dy, dx)) % 360 # radius = np.sqrt(dx**2 + dy**2) # if start_angle < end_angle: # angle_mask = (angles >= start_angle) & (angles < end_angle) # else: # angle_mask = (angles >= start_angle) | (angles < end_angle) # radius_mask = (radius >= inner_r) & (radius <= outer_r) # sector = angle_mask & radius_mask # mask[sector] = 255 # return mask # def cleanup(self): # """Release cache""" # self._mask_cache.clear() # class AlignmentCorrector: # """Affine transformation for rotation correction""" # @staticmethod # def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray: # h, w = image.shape[:2] # center = (w // 2, h // 2) # M = cv2.getRotationMatrix2D(center, -angle, 1.0) # cos = np.abs(M[0, 0]) # sin = np.abs(M[0, 1]) # new_w = int((h * sin) + (w * cos)) # new_h = int((h * cos) + (w * sin)) # M[0, 2] += (new_w / 2) - center[0] # M[1, 2] += (new_h / 2) - center[1] # rotated = cv2.warpAffine(image, M, (new_w, new_h), # flags=cv2.INTER_LINEAR, # borderMode=cv2.BORDER_CONSTANT, # borderValue=(0, 0, 0)) # return rotated # @staticmethod # def correct_saddle(saddle: SaddleROI) -> np.ndarray: # if abs(saddle.angle) < 1.0: # return saddle.crop # return AlignmentCorrector.correct_rotation(saddle.crop, saddle.angle) # class CNNFeatureExtractor: # """ResNet-18 feature extraction with memory optimization""" # def __init__(self, onnx_path: str = "resnet18_features.onnx"): # self.onnx_path = onnx_path # self.session = None # self.model = None # self.device = None # self.feature_dim = 1024 # self._initialized = False # def _lazy_init(self): # if self._initialized: # return # self._initialized = True # # Use PyTorch with fused features # self._init_pytorch_model() # def _init_pytorch_model(self): # torch = _get_torch() # torchvision = _get_torchvision() # from torchvision.models import resnet18, ResNet18_Weights # import torch.nn as nn # self.device = torch.device('cpu') # weights = ResNet18_Weights.IMAGENET1K_V1 # base_model = resnet18(weights=weights) # class FeatureFusionModel(nn.Module): # def __init__(self, base): # super().__init__() # self.conv1 = base.conv1 # self.bn1 = base.bn1 # self.relu = base.relu # self.maxpool = base.maxpool # self.layer1 = base.layer1 # self.layer2 = base.layer2 # self.layer3 = base.layer3 # self.layer4 = base.layer4 # self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # self.maxpool_global = nn.AdaptiveMaxPool2d((1, 1)) # def forward(self, x): # x = self.conv1(x) # x = self.bn1(x) # x = self.relu(x) # x = self.maxpool(x) # x = self.layer1(x) # x = self.layer2(x) # l3 = self.layer3(x) # l4 = self.layer4(l3) # f3_avg = self.avgpool(l3).view(l3.size(0), -1) # f3_max = self.maxpool_global(l3).view(l3.size(0), -1) # f4_avg = self.avgpool(l4).view(l4.size(0), -1) # return torch.cat([f3_avg, f3_max, f4_avg], dim=1) # self.model = FeatureFusionModel(base_model) # self.model.to(self.device) # self.model.eval() # print(f"✓ PyTorch FeatureFusion ResNet-18 (dim={self.feature_dim})") # def _preprocess_image(self, image: np.ndarray) -> np.ndarray: # if image is None or image.size == 0: # return np.zeros((3, 224, 224), dtype=np.float32) # if len(image.shape) == 2: # image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) # elif len(image.shape) == 3 and image.shape[2] == 4: # image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB) # elif len(image.shape) == 3 and image.shape[2] == 3: # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # image = cv2.resize(image, (224, 224)) # image = image.astype(np.float32) / 255.0 # mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) # std = np.array([0.229, 0.224, 0.225], dtype=np.float32) # image = (image - mean) / std # image = image.transpose(2, 0, 1) # return image # def extract_features(self, image: np.ndarray) -> np.ndarray: # self._lazy_init() # batch = self._preprocess_image(image)[np.newaxis, ...] # features = self.extract_batch(batch) # return features[0] # def extract_batch(self, images: List[np.ndarray]) -> np.ndarray: # self._lazy_init() # if isinstance(images, np.ndarray) and len(images.shape) == 4: # batch = images.astype(np.float32) # else: # preprocessed = [self._preprocess_image(img) for img in images] # batch = np.stack(preprocessed, axis=0).astype(np.float32) # torch = _get_torch() # batch_tensor = torch.from_numpy(batch).to(self.device) # with torch.no_grad(): # features = self.model(batch_tensor) # features = features.squeeze(-1).squeeze(-1).cpu().numpy() # norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8 # features = features / norms # # Memory cleanup # if ENABLE_GC: # del batch_tensor # gc.collect() # return features # def compute_similarity(self, features1: np.ndarray, features2: np.ndarray) -> float: # similarity = np.dot(features1, features2) / ( # np.linalg.norm(features1) * np.linalg.norm(features2) + 1e-8 # ) # return float(np.clip(similarity, 0, 1)) # def cleanup(self): # """Release model from memory""" # if self.model is not None: # del self.model # self.model = None # if ENABLE_GC: # gc.collect() # class GeometricConstraintValidator: # """Validate saddle positions using geometric constraints""" # def __init__(self, expected_count: int = 4): # self.expected_count = expected_count # self.reference_positions = None # self.reference_distances = {} # def set_reference_positions(self, saddles: List[SaddleROI]): # if len(saddles) != self.expected_count: # print(f"⚠ Reference has {len(saddles)} saddles, expected {self.expected_count}") # return False # centers = [s.center for s in saddles] # centroid = ( # int(np.mean([c[0] for c in centers])), # int(np.mean([c[1] for c in centers])) # ) # self.reference_positions = [ # (c[0] - centroid[0], c[1] - centroid[1]) for c in centers # ] # self.reference_distances = {} # for i in range(len(saddles)): # for j in range(i+1, len(saddles)): # dist = np.linalg.norm( # np.array(saddles[i].center) - np.array(saddles[j].center) # ) # self.reference_distances[(i, j)] = dist # print(f"✓ Reference geometry learned: {len(saddles)} saddles") # return True # def infer_missing_saddles( # self, # detected_saddles: List[SaddleROI], # image: np.ndarray # ) -> List[SaddleROI]: # if len(detected_saddles) >= self.expected_count: # return detected_saddles # if self.reference_positions is None: # print("⚠ No reference geometry") # return detected_saddles # if len(detected_saddles) < 2: # print("⚠ Need at least 2 saddles to infer") # return detected_saddles # centers = [s.center for s in detected_saddles] # centroid = ( # int(np.mean([c[0] for c in centers])), # int(np.mean([c[1] for c in centers])) # ) # scale = self._estimate_scale(detected_saddles) # detected_ids = {s.id for s in detected_saddles} # all_saddles = list(detected_saddles) # for expected_id in range(self.expected_count): # if expected_id not in detected_ids: # rel_x, rel_y = self.reference_positions[expected_id] # expected_x = int(centroid[0] + rel_x * scale) # expected_y = int(centroid[1] + rel_y * scale) # crop_size = 100 # x_min = max(0, expected_x - crop_size // 2) # y_min = max(0, expected_y - crop_size // 2) # x_max = min(image.shape[1], expected_x + crop_size // 2) # y_max = min(image.shape[0], expected_y + crop_size // 2) # crop = image[y_min:y_max, x_min:x_max].copy() # inferred = SaddleROI( # id=expected_id, # bbox=(x_min, y_min, x_max - x_min, y_max - y_min), # crop=crop, # center=(expected_x, expected_y), # area=(x_max - x_min) * (y_max - y_min), # angle=0.0, # confidence=0.0 # ) # all_saddles.append(inferred) # print(f" ⚙ Inferred Saddle {expected_id} at ({expected_x}, {expected_y})") # all_saddles.sort(key=lambda s: s.id) # return all_saddles # def _estimate_scale(self, saddles: List[SaddleROI]) -> float: # if len(saddles) < 2 or not self.reference_distances: # return 1.0 # if len(saddles) >= 2: # s0, s1 = saddles[0], saddles[1] # current_dist = np.linalg.norm( # np.array(s0.center) - np.array(s1.center) # ) # key = (min(s0.id, s1.id), max(s0.id, s1.id)) # if key in self.reference_distances: # ref_dist = self.reference_distances[key] # scale = current_dist / ref_dist # return scale # return 1.0 # def validate_geometry(self, saddles: List[SaddleROI], tolerance: float = 0.05) -> Tuple[bool, str]: # if len(saddles) != self.expected_count: # return False, f"Expected {self.expected_count} saddles, got {len(saddles)}" # if self.reference_distances is None or not self.reference_distances: # return True, "No reference geometry to validate" # sorted_saddles = sorted(saddles, key=lambda s: s.id) # centers = [np.array(s.center) for s in sorted_saddles] # # Collinearity check # if len(centers) >= 3: # v1 = centers[1] - centers[0] # for i in range(2, len(centers)): # v2 = centers[i] - centers[0] # cross = abs(v1[0] * v2[1] - v1[1] * v2[0]) # line_length = np.linalg.norm(v1) * np.linalg.norm(v2) # if line_length > 0: # deviation = cross / line_length # if deviation > 0.1: # return False, f"Saddles not collinear (deviation: {deviation:.2f})" # # Distance validation # for i in range(len(sorted_saddles) - 1): # s0, s1 = sorted_saddles[i], sorted_saddles[i + 1] # current_dist = np.linalg.norm(centers[i + 1] - centers[i]) # key = (min(s0.id, s1.id), max(s0.id, s1.id)) # if key in self.reference_distances: # ref_dist = self.reference_distances[key] # if ref_dist > 0: # deviation = abs(current_dist - ref_dist) / ref_dist # if deviation > tolerance: # return False, f"Distance S{s0.id}-S{s1.id} off by {deviation*100:.1f}%" # return True, "Geometry valid" # class MultiReferenceManager: # """Manage multiple golden template images with memory optimization""" # def __init__(self, feature_extractor: CNNFeatureExtractor): # self.feature_extractor = feature_extractor # self.references = [] # self.max_references = 20 if not IS_CLOUD else 10 # def add_reference(self, image: np.ndarray, saddles: List[SaddleROI]) -> bool: # if len(saddles) != 4: # print(f"⚠ Reference must have 4 saddles, got {len(saddles)}") # return False # # Limit number of references # if len(self.references) >= self.max_references: # print(f"⚠ Max references ({self.max_references}) reached, removing oldest") # old_ref = self.references.pop(0) # # Cleanup # if ENABLE_GC: # del old_ref # gc.collect() # features_list = [] # for saddle in saddles: # aligned_crop = AlignmentCorrector.correct_saddle(saddle) # features = self.feature_extractor.extract_features(aligned_crop) # features_list.append(features) # # Compress image for storage # if image.shape[0] > 800 or image.shape[1] > 800: # scale = min(800/image.shape[1], 800/image.shape[0]) # new_w, new_h = int(image.shape[1]*scale), int(image.shape[0]*scale) # image_stored = cv2.resize(image, (new_w, new_h)) # else: # image_stored = image # self.references.append({ # 'image': image_stored, # 'saddles': saddles, # 'features': features_list # }) # print(f"✓ Reference {len(self.references)} added") # return True # def match_saddle(self, saddle: SaddleROI) -> Tuple[float, float]: # if not self.references: # return 0.0, 999.0 # aligned_crop = AlignmentCorrector.correct_saddle(saddle) # test_features = self.feature_extractor.extract_features(aligned_crop) # similarities = [] # distances = [] # for ref in self.references: # if saddle.id < len(ref['features']): # ref_features = ref['features'][saddle.id] # sim = self.feature_extractor.compute_similarity(test_features, ref_features) # dist = np.linalg.norm(test_features - ref_features) # similarities.append(sim) # distances.append(dist) # if similarities: # best_sim = max(similarities) # avg_dist = np.mean(distances) # return best_sim, avg_dist # return 0.0, 999.0 # def get_reference_count(self) -> int: # return len(self.references) # def get_references(self) -> List[str]: # refs = [] # for ref in self.references: # try: # _, buffer = cv2.imencode('.jpg', ref['image'], [cv2.IMWRITE_JPEG_QUALITY, 50]) # import base64 # img_str = base64.b64encode(buffer).decode('utf-8') # refs.append(f"data:image/jpeg;base64,{img_str}") # except: # continue # return refs # def cleanup(self): # """Release all references""" # self.references.clear() # if ENABLE_GC: # gc.collect() # class AdvancedBlockInspector: # """ # Advanced Engine Block Inspector - CLOUD OPTIMIZED # Memory leak fixes: # - LRU caching with limits # - Automatic cleanup after processing # - Garbage collection triggers # - Reference counting limits # Improved 4-saddle detection: # - Contour-based detection # - Multiple circle detection strategies # - Structure validation integration # - Geometric validator integration # """ # def __init__(self, yolo_model_path: Optional[str] = None, onnx_path: str = "resnet18_features.onnx"): # self.yolo_detector = YOLOOBBDetector(yolo_model_path) # self.geometric_detector = GeometricSaddleDetector() # self.cnn_extractor = CNNFeatureExtractor(onnx_path=onnx_path) # self.reference_manager = MultiReferenceManager(self.cnn_extractor) # # Use optimized validator if available # if OPTIMIZED_VALIDATOR_AVAILABLE: # self.geo_validator = OptimizedGeometricValidator(expected_count=4) # self.use_optimized_validator = True # print("✓ Optimized Validator: RANSAC + Physical Dims") # else: # self.geo_validator = GeometricConstraintValidator(expected_count=4) # self.use_optimized_validator = False # # Structure validator # self.structure_validator = None # if STRUCTURE_VALIDATOR_AVAILABLE: # self.structure_validator = SaddleStructureValidator( # arc_center_tolerance=0.05, # min_arc_length_ratio=0.6, # min_structure_confidence=0.65 # ) # print("✓ Structure Validator: Semicircle + Middle Arc") # self.config = { # 'expected_saddles': 4, # 'similarity_threshold': 0.88, # 'use_yolo': self.yolo_detector.model_loaded, # 'use_geometric_fallback': True, # 'use_alignment_correction': True, # 'infer_missing_saddles': True, # 'yolo_confidence': 0.25, # 'use_geometry_validation': True, # 'use_structure_validation': STRUCTURE_VALIDATOR_AVAILABLE, # 'use_optimized_validator': self.use_optimized_validator, # 'min_brightness': 20.0, # } # # Memory tracking # self.last_saddles = [] # self.last_image = None # print("[OK] Advanced Inspector initialized (Cloud-Optimized)") # print(f" YOLO-OBB: {'Enabled' if self.config['use_yolo'] else 'Disabled'}") # print(f" Max cache size: {MAX_CACHE_SIZE}") # print(f" GC enabled: {ENABLE_GC}") # def add_reference_image(self, image: np.ndarray) -> bool: # """Add a golden template image""" # try: # # Quality check # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image # brightness = np.mean(gray) # if brightness < 20: # print(f"✗ Reference too dark (brightness: {brightness:.1f})") # return False # # Detect saddles # if self.config['use_yolo']: # saddles = self.yolo_detector.detect_saddles( # image, conf_threshold=self.config['yolo_confidence'] # ) # else: # saddles = [] # if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: # print("→ Falling back to geometric detection") # saddles = self.geometric_detector.detect_saddles(image) # # Validate with structure validator # if self.structure_validator and len(saddles) > 0: # for saddle in saddles: # if saddle.crop is not None and saddle.crop.size > 0: # saddle.structure = self.structure_validator.validate_structure(saddle.crop) # # Use optimized validator to refine # if self.use_optimized_validator and hasattr(self.geo_validator, 'validate_and_refine'): # saddles, _ = self.geo_validator.validate_and_refine(saddles, image) # if len(saddles) != self.config['expected_saddles']: # print(f"✗ Reference failed: {len(saddles)}/{self.config['expected_saddles']} saddles") # return False # success = self.reference_manager.add_reference(image, saddles) # if success and self.reference_manager.get_reference_count() == 1: # self.geo_validator.set_reference_positions(saddles) # # Cleanup # self._cleanup_after_processing() # return success # except Exception as e: # print(f"✗ Reference error: {e}") # import traceback # traceback.print_exc() # return False # def inspect_block(self, image: np.ndarray) -> BlockInspectionResult: # """Inspect engine block with memory optimization""" # start_time = time.time() # try: # # Quality check # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image # brightness = np.mean(gray) # if brightness < 15: # return BlockInspectionResult( # block_status='WASTE_IMAGE', # total_saddles=self.config['expected_saddles'], # detected_saddles=0, # defective_saddles=0, # saddle_results=[], # processing_time_ms=(time.time() - start_time) * 1000, # alignment_status='REJECTED_DARK', # block_angle=0.0 # ) # # Resize if needed # h, w = image.shape[:2] # if w > 800 or h > 600: # image = cv2.resize(image, (640, 480)) # # Detect saddles # if self.config['use_yolo']: # saddles = self.yolo_detector.detect_saddles( # image, conf_threshold=self.config['yolo_confidence'] # ) # else: # saddles = [] # if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: # print("→ Falling back to geometric detection") # saddles = self.geometric_detector.detect_saddles(image) # detected_count = len(saddles) # # Validate and refine with optimized validator # if self.use_optimized_validator and hasattr(self.geo_validator, 'validate_and_refine'): # saddles, refine_report = self.geo_validator.validate_and_refine(saddles, image) # print(f" Refined: {refine_report}") # elif len(saddles) < self.config['expected_saddles'] and self.config['infer_missing_saddles']: # print(f" Inferring {self.config['expected_saddles'] - len(saddles)} missing saddles") # saddles = self.geo_validator.infer_missing_saddles(saddles, image) # # Structure validation # if self.structure_validator and self.config.get('use_structure_validation', False): # for saddle in saddles: # if saddle.crop is not None and saddle.crop.size > 0: # saddle.structure = self.structure_validator.validate_structure(saddle.crop) # # Validate count # if len(saddles) != self.config['expected_saddles']: # alignment_status = f"ERROR: {len(saddles)}/{self.config['expected_saddles']} saddles" # return self._error_result(alignment_status, 0.0, saddles, detected_count) # # Geometry validation # if self.config.get('use_geometry_validation', True): # is_valid, reason = self.geo_validator.validate_geometry(saddles, 0.05) # if not is_valid: # return self._error_result(f"GEOMETRY: {reason}", 0.0, saddles, detected_count) # alignment_status = "OK" # block_angle = np.mean([s.angle for s in saddles]) # # Batch feature extraction # aligned_crops = [AlignmentCorrector.correct_saddle(s) for s in saddles] # batch_features = self.cnn_extractor.extract_batch(aligned_crops) # # Compare against references # saddle_results = [] # for i, saddle in enumerate(saddles): # test_features = batch_features[i] # best_sim = 0.0 # avg_dist = 999.0 # for ref in self.reference_manager.references: # if saddle.id < len(ref['features']): # ref_features = ref['features'][saddle.id] # sim = self.cnn_extractor.compute_similarity(test_features, ref_features) # dist = np.linalg.norm(test_features - ref_features) # if sim > best_sim: # best_sim = sim # avg_dist = dist # status = 'PERFECT' if best_sim >= self.config['similarity_threshold'] else 'DEFECTIVE' # saddle_results.append(SaddleInspectionResult( # saddle_id=saddle.id, # status=status, # similarity_score=best_sim, # confidence=best_sim, # feature_distance=avg_dist, # detected=saddle.confidence > 0.0 # )) # # Block decision # defective_count = sum(1 for r in saddle_results if r.status == 'DEFECTIVE') # avg_sim = np.mean([r.similarity_score for r in saddle_results]) if saddle_results else 0 # if defective_count == self.config['expected_saddles'] and avg_sim < 0.70: # block_status = 'WASTE_IMAGE' # elif detected_count == 0: # if avg_sim < 0.75: # block_status = 'WASTE_IMAGE' # else: # block_status = 'DEFECTIVE' # else: # block_status = 'DEFECTIVE' if defective_count > 0 else 'PERFECT' # processing_time = (time.time() - start_time) * 1000 # self.last_saddles = saddles # self.last_image = image # result = BlockInspectionResult( # block_status=block_status, # total_saddles=self.config['expected_saddles'], # detected_saddles=detected_count, # defective_saddles=defective_count, # saddle_results=saddle_results, # processing_time_ms=processing_time, # alignment_status=alignment_status, # block_angle=block_angle # ) # # Cleanup # self._cleanup_after_processing() # return result # except Exception as e: # print(f"✗ Inspection error: {e}") # import traceback # traceback.print_exc() # return self._error_result(str(e), 0.0, [], 0) # def get_reference_images(self) -> List[str]: # return self.reference_manager.get_references() # def _error_result(self, message: str, angle: float, saddles: List, detected_count: int) -> BlockInspectionResult: # self.last_saddles = saddles # return BlockInspectionResult( # block_status='ERROR', # total_saddles=self.config['expected_saddles'], # detected_saddles=detected_count, # defective_saddles=0, # saddle_results=[], # processing_time_ms=0.0, # alignment_status=message, # block_angle=angle # ) # def visualize_results( # self, # original_image: np.ndarray, # saddles: List[SaddleROI], # results: List[SaddleInspectionResult] # ) -> np.ndarray: # """Create detailed visualization""" # vis = original_image.copy() # for saddle, result in zip(saddles, results): # if not result.detected: # color = (255, 165, 0) # Orange # elif result.status == 'PERFECT': # color = (0, 255, 0) # Green # else: # color = (0, 0, 255) # Red # x, y, w, h = saddle.bbox # cv2.rectangle(vis, (x, y), (x+w, y+h), color, 3) # cx, cy = saddle.center # cv2.circle(vis, (cx, cy), 10, color, -1) # if abs(saddle.angle) > 1.0: # rad = np.deg2rad(saddle.angle) # end_x = int(cx + 30 * np.cos(rad)) # end_y = int(cy + 30 * np.sin(rad)) # cv2.arrowedLine(vis, (cx, cy), (end_x, end_y), color, 3) # detection_marker = "" if result.detected else "⚙" # label = f"{detection_marker} S{saddle.id}: {result.status}" # sim_text = f"{result.similarity_score:.3f}" # angle_text = f"{saddle.angle:.1f}°" # cv2.rectangle(vis, (cx-70, cy-70), (cx+70, cy+40), color, -1) # cv2.putText(vis, label, (cx-65, cy-45), # cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2) # cv2.putText(vis, sim_text, (cx-65, cy-20), # cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # cv2.putText(vis, f"C:{saddle.confidence:.2f}", (cx-65, cy+28), # cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) # # Add angle info if it doesn't overlap text # # (Simplified visualization for better performance) # return vis # def _cleanup_after_processing(self): # """Clean up resources after processing""" # # Clear old saddles # for saddle in self.last_saddles: # if hasattr(saddle, 'cleanup'): # saddle.cleanup() # # Trigger garbage collection # if ENABLE_GC: # gc.collect() # def cleanup(self): # """Full cleanup - call when done""" # print("Cleaning up resources...") # # Cleanup components # if hasattr(self.yolo_detector, 'cleanup'): # self.yolo_detector.cleanup() # if hasattr(self.geometric_detector, 'cleanup'): # self.geometric_detector.cleanup() # if hasattr(self.cnn_extractor, 'cleanup'): # self.cnn_extractor.cleanup() # if hasattr(self.reference_manager, 'cleanup'): # self.reference_manager.cleanup() # # Clear state # self.last_saddles.clear() # self.last_image = None # # Force garbage collection # if ENABLE_GC: # gc.collect() # print("✓ Cleanup complete") import cv2 import numpy as np from PIL import Image import io import json from datetime import datetime from typing import List, Tuple, Dict, Optional from dataclasses import dataclass import time import os import warnings import gc from collections import OrderedDict # ============================================================================ # CLOUD DEPLOYMENT OPTIMIZATIONS # ============================================================================ IS_HF_SPACE = os.environ.get('SPACE_ID') is not None or os.environ.get('HF_SPACE') is not None IS_RENDER = os.environ.get('RENDER') is not None IS_CLOUD = IS_HF_SPACE or IS_RENDER # Memory limits for cloud MAX_IMAGE_SIZE = (1280, 960) if IS_CLOUD else (1920, 1440) MAX_CACHE_SIZE = 50 if IS_CLOUD else 100 ENABLE_GC = IS_CLOUD # Lazy imports for faster cold start _torch = None _torchvision = None _YOLO = None def _get_torch(): global _torch if _torch is None: import torch _torch = torch if IS_CLOUD: _torch.set_num_threads(2) return _torch def _get_torchvision(): global _torchvision if _torchvision is None: import torchvision _torchvision = torchvision return _torchvision def _get_yolo(): global _YOLO if _YOLO is None: try: from ultralytics import YOLO _YOLO = YOLO except ImportError: _YOLO = False return _YOLO def _yolo_available(): yolo = _get_yolo() return yolo is not False and yolo is not None warnings.filterwarnings('ignore') # Import optimized validators try: from saddle_structure_validator import SaddleStructureValidator, SaddleStructure STRUCTURE_VALIDATOR_AVAILABLE = True except ImportError: STRUCTURE_VALIDATOR_AVAILABLE = False SaddleStructure = None print("[WARN] saddle_structure_validator not available") try: from geometric_validator import OptimizedGeometricValidator, PhysicalDimensions, PixelToMMCalibrator OPTIMIZED_VALIDATOR_AVAILABLE = True except ImportError: OPTIMIZED_VALIDATOR_AVAILABLE = False print("[WARN] geometric_validator not available") if IS_HF_SPACE: print("[HF Space] Memory-optimized mode enabled") elif IS_RENDER: print("[Render] Cloud-optimized mode enabled") @dataclass class SaddleROI: """Saddle region of interest with rotation and structure info""" id: int bbox: Tuple[int, int, int, int] crop: np.ndarray center: Tuple[int, int] area: int angle: float confidence: float structure: Optional[any] = None def to_dict(self): result = { 'id': int(self.id), 'bbox': [int(x) for x in self.bbox], 'center': [int(self.center[0]), int(self.center[1])], 'area': int(self.area), 'angle': float(self.angle), 'confidence': float(self.confidence) } if self.structure is not None: result['has_semicircle'] = bool(getattr(self.structure, 'has_semicircle', False)) result['has_middle_arc'] = bool(getattr(self.structure, 'has_middle_arc', False)) result['structure_confidence'] = float(getattr(self.structure, 'structure_confidence', 0.0)) return result def cleanup(self): """Free memory""" if hasattr(self, 'crop'): del self.crop self.crop = None self.structure = None @dataclass class SaddleInspectionResult: """Inspection result for one saddle""" saddle_id: int status: str similarity_score: float confidence: float feature_distance: float detected: bool def to_dict(self): return { 'saddle_id': int(self.saddle_id), 'status': str(self.status), 'similarity_score': float(self.similarity_score), 'confidence': float(self.confidence), 'feature_distance': float(self.feature_distance), 'detected': bool(self.detected) } @dataclass class BlockInspectionResult: """Complete block inspection result""" block_status: str total_saddles: int detected_saddles: int defective_saddles: int saddle_results: List[SaddleInspectionResult] processing_time_ms: float alignment_status: str block_angle: float def to_dict(self): return { 'block_status': str(self.block_status), 'total_saddles': int(self.total_saddles), 'detected_saddles': int(self.detected_saddles), 'defective_saddles': int(self.defective_saddles), 'saddle_results': [r.to_dict() for r in self.saddle_results], 'processing_time_ms': float(self.processing_time_ms), 'alignment_status': str(self.alignment_status), 'block_angle': float(self.block_angle) } class LRUCache: """Thread-safe LRU cache with size limit""" def __init__(self, max_size=100): self.cache = OrderedDict() self.max_size = max_size def get(self, key): if key in self.cache: self.cache.move_to_end(key) return self.cache[key] return None def put(self, key, value): if key in self.cache: self.cache.move_to_end(key) self.cache[key] = value if len(self.cache) > self.max_size: self.cache.popitem(last=False) def clear(self): self.cache.clear() if ENABLE_GC: gc.collect() class YOLOOBBDetector: """YOLO-OBB Detector with lazy loading and memory management""" def __init__(self, model_path: Optional[str] = None): self.model = None self.model_loaded = False self._model_path = model_path if model_path else 'yolo11n-obb.pt' self._initialized = False def _lazy_init(self): if self._initialized: return self._initialized = True if not _yolo_available(): print("⚠ YOLO not available - will use geometric fallback") return YOLO = _get_yolo() try: self.model = YOLO(self._model_path) self.model_loaded = True print(f"✓ YOLO-OBB loaded: {self._model_path}") except Exception as e: print(f"⚠ YOLO load failed: {e} - will use geometric fallback") self.model_loaded = False @property def available(self): if not self._initialized: self._lazy_init() return self.model_loaded def detect_saddles(self, image: np.ndarray, conf_threshold: float = 0.20) -> List[SaddleROI]: """Detect saddles using YOLO-OBB with memory optimization""" if not self.available or self.model is None: return [] # Resize if too large h, w = image.shape[:2] if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) new_w, new_h = int(w*scale), int(h*scale) image_resized = cv2.resize(image, (new_w, new_h)) else: image_resized = image scale = 1.0 # Convert to grayscale for faster inference if len(image_resized.shape) == 3: gray = cv2.cvtColor(image_resized, cv2.COLOR_BGR2GRAY) gray_3ch = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) else: gray_3ch = cv2.cvtColor(image_resized, cv2.COLOR_GRAY2BGR) results = self.model(gray_3ch, conf=conf_threshold, verbose=False) saddles = [] for result in results: if hasattr(result, 'obb') and result.obb is not None: for i, obb in enumerate(result.obb): xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy() conf = float(obb.conf[0]) # Scale back to original coordinates xyxyxyxy = xyxyxyxy / scale angle = self._calculate_angle(xyxyxyxy) x_coords = xyxyxyxy[:, 0] y_coords = xyxyxyxy[:, 1] x_min, x_max = int(x_coords.min()), int(x_coords.max()) y_min, y_max = int(y_coords.min()), int(y_coords.max()) w = x_max - x_min h = y_max - y_min cx = int(np.mean(x_coords)) cy = int(np.mean(y_coords)) # Crop from original image crop = image[y_min:y_max, x_min:x_max].copy() area = w * h saddles.append(SaddleROI( id=i, bbox=(x_min, y_min, w, h), crop=crop, center=(cx, cy), area=area, angle=angle, confidence=conf )) elif hasattr(result, 'boxes') and result.boxes is not None: for i, box in enumerate(result.boxes): xyxy = box.xyxy[0].cpu().numpy() conf = float(box.conf[0]) # Scale back xyxy = xyxy / scale x_min, y_min, x_max, y_max = map(int, xyxy) w = x_max - x_min h = y_max - y_min cx = (x_min + x_max) // 2 cy = (y_min + y_max) // 2 crop = image[y_min:y_max, x_min:x_max].copy() area = w * h saddles.append(SaddleROI( id=i, bbox=(x_min, y_min, w, h), crop=crop, center=(cx, cy), area=area, angle=0.0, confidence=conf )) # Sort by position saddles.sort(key=lambda s: (s.center[1], s.center[0])) for i, saddle in enumerate(saddles): saddle.id = i print(f"✓ YOLO detected: {len(saddles)} saddles") # Memory cleanup if ENABLE_GC: del gray_3ch, results gc.collect() return saddles def _calculate_angle(self, xyxyxyxy: np.ndarray) -> float: p1 = xyxyxyxy[0] p2 = xyxyxyxy[1] dx = p2[0] - p1[0] dy = p2[1] - p1[1] angle = np.degrees(np.arctan2(dy, dx)) if angle > 180: angle -= 360 elif angle < -180: angle += 360 return float(angle) def cleanup(self): """Release model from memory""" if self.model is not None: del self.model self.model = None self.model_loaded = False if ENABLE_GC: gc.collect() class GeometricSaddleDetector: """Optimized geometric detector with better 4-saddle detection""" def __init__(self, max_cache_size=None): if max_cache_size is None: max_cache_size = MAX_CACHE_SIZE self._mask_cache = LRUCache(max_cache_size) def detect_saddles(self, image: np.ndarray) -> List[SaddleROI]: """Detect using improved geometric segmentation""" print(f"[GeometricDetector] Input shape: {image.shape}") h, w = image.shape[:2] # Resize if too large if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) new_w, new_h = int(w*scale), int(h*scale) image_work = cv2.resize(image, (new_w, new_h)) print(f"[GeometricDetector] Resized to {new_w}x{new_h}") else: image_work = image scale = 1.0 # Detect circles with improved method outer_circle, inner_circle = self._detect_circles_improved(image_work) if outer_circle is None: print("[GeometricDetector] ✗ Circle detection failed") return [] ox, oy, or_ = outer_circle ix, iy, ir = inner_circle # Scale back to original if scale != 1.0: ox, oy = int(ox/scale), int(oy/scale) or_, ir = int(or_/scale), int(ir/scale) h, w = image.shape[:2] print(f"[GeometricDetector] Outer: ({ox},{oy}) r={or_}, Inner: ({ix},{iy}) r={ir}") # Validate if or_ < 50 or or_ > min(h, w): print(f"[GeometricDetector] ✗ Invalid outer radius: {or_}") return [] if ir >= or_: ir = int(or_ * 0.4) print(f"[GeometricDetector] Adjusted inner radius to {ir}") # Create annular mask annular_mask = np.zeros((h, w), dtype=np.uint8) cv2.circle(annular_mask, (ox, oy), or_, 255, -1) cv2.circle(annular_mask, (ix, iy), ir, 0, -1) mask_pixels = np.sum(annular_mask > 0) print(f"[GeometricDetector] Annular mask pixels: {mask_pixels}") if mask_pixels < 1000: print(f"[GeometricDetector] ✗ Mask too small") return [] # Find saddles using quadrant-based detection (more reliable for 4 saddles) saddles = self._detect_quadrant_based(image, annular_mask, (ox, oy), or_, ir) print(f"[GeometricDetector] Total saddles detected: {len(saddles)}") # Memory cleanup if ENABLE_GC: del annular_mask, image_work gc.collect() return saddles def _detect_quadrant_based( self, image: np.ndarray, annular_mask: np.ndarray, center: Tuple[int, int], outer_r: int, inner_r: int ) -> List[SaddleROI]: """Quadrant-based detection for 4 saddles""" h, w = annular_mask.shape ox, oy = center quadrants = [ (0, 90, "Top-Right"), (90, 180, "Top-Left"), (180, 270, "Bottom-Left"), (270, 360, "Bottom-Right") ] saddles = [] for idx, (start_angle, end_angle, name) in enumerate(quadrants): mask_key = (h, w, ox, oy, outer_r, inner_r, start_angle, end_angle) sector_mask = self._mask_cache.get(mask_key) if sector_mask is None: sector_mask = self._create_sector_mask( (h, w), (ox, oy), outer_r, inner_r, start_angle, end_angle ) self._mask_cache.put(mask_key, sector_mask) saddle_mask = cv2.bitwise_and(annular_mask, sector_mask) ys, xs = np.where(saddle_mask > 0) if len(xs) == 0 or len(ys) == 0: print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): No pixels") continue x_min, x_max = xs.min(), xs.max() y_min, y_max = ys.min(), ys.max() if (x_max - x_min) < 10 or (y_max - y_min) < 10: print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): Box too small") continue crop = image[y_min:y_max+1, x_min:x_max+1].copy() mask_crop = saddle_mask[y_min:y_max+1, x_min:x_max+1].copy() crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop) cx = int(np.mean(xs)) cy = int(np.mean(ys)) area = np.sum(saddle_mask > 0) saddles.append(SaddleROI( id=idx, bbox=(x_min, y_min, x_max - x_min, y_max - y_min), crop=crop_masked, center=(cx, cy), area=area, angle=0.0, confidence=0.9 # Higher confidence for geometric detection )) return saddles def _detect_circles_improved(self, image: np.ndarray) -> Tuple[Optional[Tuple], Optional[Tuple]]: """Improved circle detection with multiple strategies""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image h, w = gray.shape # Strategy 1: Standard HoughCircles with optimized params gray_blur = cv2.GaussianBlur(gray, (9, 9), 2) minRadius = int(min(h, w) * 0.2) maxRadius = int(min(h, w) * 0.7) # Try multiple parameter sets param_sets = [ (80, 25), # Standard (60, 30), # More lenient (100, 20), # More strict ] for param1, param2 in param_sets: circles = cv2.HoughCircles( gray_blur, cv2.HOUGH_GRADIENT, dp=1.0, minDist=80, param1=param1, param2=param2, minRadius=minRadius, maxRadius=maxRadius ) if circles is not None and len(circles[0]) >= 1: circles = np.round(circles[0, :]).astype(int) circles = sorted(circles, key=lambda c: c[2], reverse=True) outer = tuple(circles[0]) inner_radius = int(outer[2] * 0.4) inner = (outer[0], outer[1], inner_radius) print(f"✓ HoughCircles success (param1={param1}, param2={param2})") return outer, inner # Strategy 2: Contour-based detection print("→ Using contour fallback") edges = cv2.Canny(gray_blur, 30, 100) # Dilate to connect edges kernel = np.ones((3, 3), np.uint8) edges = cv2.dilate(edges, kernel, iterations=2) contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: # Find largest contour largest = max(contours, key=cv2.contourArea) (cx, cy), radius = cv2.minEnclosingCircle(largest) center_x, center_y = int(cx), int(cy) outer_radius = int(radius * 0.85) inner_radius = int(outer_radius * 0.4) print(f"✓ Contour-based: center=({center_x},{center_y}), radius={outer_radius}") return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) # Strategy 3: Dimension-based estimate center_x, center_y = w // 2, h // 2 outer_radius = min(w, h) // 2 - 50 inner_radius = int(outer_radius * 0.4) print(f"→ Using dimension estimate: center=({center_x},{center_y}), radius={outer_radius}") return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) def _create_sector_mask(self, shape, center, outer_r, inner_r, start_angle, end_angle): """Create sector mask with caching""" h, w = shape mask = np.zeros((h, w), dtype=np.uint8) y, x = np.ogrid[:h, :w] dx = x - center[0] dy = y - center[1] angles = np.rad2deg(np.arctan2(dy, dx)) % 360 radius = np.sqrt(dx**2 + dy**2) if start_angle < end_angle: angle_mask = (angles >= start_angle) & (angles < end_angle) else: angle_mask = (angles >= start_angle) | (angles < end_angle) radius_mask = (radius >= inner_r) & (radius <= outer_r) sector = angle_mask & radius_mask mask[sector] = 255 return mask def cleanup(self): """Release cache""" self._mask_cache.clear() class AlignmentCorrector: """Affine transformation for rotation correction""" @staticmethod def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray: if image is None or image.size == 0: return image h, w = image.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, -angle, 1.0) cos = np.abs(M[0, 0]) sin = np.abs(M[0, 1]) new_w = int((h * sin) + (w * cos)) new_h = int((h * cos) + (w * sin)) M[0, 2] += (new_w / 2) - center[0] M[1, 2] += (new_h / 2) - center[1] rotated = cv2.warpAffine(image, M, (new_w, new_h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0)) return rotated @staticmethod def correct_saddle(saddle: SaddleROI) -> np.ndarray: if saddle.crop is None or saddle.crop.size == 0: return np.zeros((224, 224, 3), dtype=np.uint8) if abs(saddle.angle) < 1.0: return saddle.crop return AlignmentCorrector.correct_rotation(saddle.crop, saddle.angle) class CNNFeatureExtractor: """ResNet-18 feature extraction with memory optimization""" def __init__(self, onnx_path: str = "resnet18_features.onnx"): self.onnx_path = onnx_path self.session = None self.model = None self.device = None self.feature_dim = 1024 self._initialized = False def _lazy_init(self): if self._initialized: return self._initialized = True # Use PyTorch with fused features self._init_pytorch_model() def _init_pytorch_model(self): torch = _get_torch() torchvision = _get_torchvision() from torchvision.models import resnet18, ResNet18_Weights import torch.nn as nn self.device = torch.device('cpu') weights = ResNet18_Weights.IMAGENET1K_V1 base_model = resnet18(weights=weights) class FeatureFusionModel(nn.Module): def __init__(self, base): super().__init__() self.conv1 = base.conv1 self.bn1 = base.bn1 self.relu = base.relu self.maxpool = base.maxpool self.layer1 = base.layer1 self.layer2 = base.layer2 self.layer3 = base.layer3 self.layer4 = base.layer4 self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.maxpool_global = nn.AdaptiveMaxPool2d((1, 1)) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) l3 = self.layer3(x) l4 = self.layer4(l3) f3_avg = self.avgpool(l3).view(l3.size(0), -1) f3_max = self.maxpool_global(l3).view(l3.size(0), -1) f4_avg = self.avgpool(l4).view(l4.size(0), -1) return torch.cat([f3_avg, f3_max, f4_avg], dim=1) self.model = FeatureFusionModel(base_model) self.model.to(self.device) self.model.eval() print(f"✓ PyTorch FeatureFusion ResNet-18 (dim={self.feature_dim})") def _preprocess_image(self, image: np.ndarray) -> np.ndarray: if image is None or image.size == 0: return np.zeros((3, 224, 224), dtype=np.float32) if len(image.shape) == 2: image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) elif len(image.shape) == 3 and image.shape[2] == 4: image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB) elif len(image.shape) == 3 and image.shape[2] == 3: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (224, 224)) image = image.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) image = (image - mean) / std image = image.transpose(2, 0, 1) return image def extract_features(self, image: np.ndarray) -> np.ndarray: self._lazy_init() batch = self._preprocess_image(image)[np.newaxis, ...] features = self.extract_batch(batch) return features[0] def extract_batch(self, images: List[np.ndarray]) -> np.ndarray: self._lazy_init() if isinstance(images, np.ndarray) and len(images.shape) == 4: batch = images.astype(np.float32) else: preprocessed = [self._preprocess_image(img) for img in images] batch = np.stack(preprocessed, axis=0).astype(np.float32) torch = _get_torch() batch_tensor = torch.from_numpy(batch).to(self.device) with torch.no_grad(): features = self.model(batch_tensor) features = features.squeeze(-1).squeeze(-1).cpu().numpy() norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8 features = features / norms # Memory cleanup if ENABLE_GC: del batch_tensor gc.collect() return features def compute_similarity(self, features1: np.ndarray, features2: np.ndarray) -> float: similarity = np.dot(features1, features2) / ( np.linalg.norm(features1) * np.linalg.norm(features2) + 1e-8 ) return float(np.clip(similarity, 0, 1)) def cleanup(self): """Release model from memory""" if self.model is not None: del self.model self.model = None if ENABLE_GC: gc.collect() class GeometricConstraintValidator: """Validate saddle positions using geometric constraints - RELAXED""" def __init__(self, expected_count: int = 4): self.expected_count = expected_count self.reference_positions = None self.reference_distances = {} def set_reference_positions(self, saddles: List[SaddleROI]): if len(saddles) != self.expected_count: print(f"⚠ Reference has {len(saddles)} saddles, expected {self.expected_count}") return False centers = [s.center for s in saddles] centroid = ( int(np.mean([c[0] for c in centers])), int(np.mean([c[1] for c in centers])) ) self.reference_positions = [ (c[0] - centroid[0], c[1] - centroid[1]) for c in centers ] self.reference_distances = {} for i in range(len(saddles)): for j in range(i+1, len(saddles)): dist = np.linalg.norm( np.array(saddles[i].center) - np.array(saddles[j].center) ) self.reference_distances[(i, j)] = dist print(f"✓ Reference geometry learned: {len(saddles)} saddles") return True def infer_missing_saddles( self, detected_saddles: List[SaddleROI], image: np.ndarray ) -> List[SaddleROI]: """Infer missing saddles based on reference geometry""" if len(detected_saddles) >= self.expected_count: return detected_saddles if self.reference_positions is None: print("⚠ No reference geometry - cannot infer") return detected_saddles if len(detected_saddles) == 0: print("⚠ No saddles detected - cannot infer") return detected_saddles centers = [s.center for s in detected_saddles] centroid = ( int(np.mean([c[0] for c in centers])), int(np.mean([c[1] for c in centers])) ) scale = self._estimate_scale(detected_saddles) detected_ids = {s.id for s in detected_saddles} all_saddles = list(detected_saddles) for expected_id in range(self.expected_count): if expected_id not in detected_ids: rel_x, rel_y = self.reference_positions[expected_id] expected_x = int(centroid[0] + rel_x * scale) expected_y = int(centroid[1] + rel_y * scale) crop_size = 150 # Increased crop size x_min = max(0, expected_x - crop_size // 2) y_min = max(0, expected_y - crop_size // 2) x_max = min(image.shape[1], expected_x + crop_size // 2) y_max = min(image.shape[0], expected_y + crop_size // 2) crop = image[y_min:y_max, x_min:x_max].copy() inferred = SaddleROI( id=expected_id, bbox=(x_min, y_min, x_max - x_min, y_max - y_min), crop=crop, center=(expected_x, expected_y), area=(x_max - x_min) * (y_max - y_min), angle=0.0, confidence=0.5 # Mark as inferred ) all_saddles.append(inferred) print(f" ⚙ Inferred Saddle {expected_id} at ({expected_x}, {expected_y})") all_saddles.sort(key=lambda s: s.id) return all_saddles def _estimate_scale(self, saddles: List[SaddleROI]) -> float: if len(saddles) < 2 or not self.reference_distances: return 1.0 if len(saddles) >= 2: s0, s1 = saddles[0], saddles[1] current_dist = np.linalg.norm( np.array(s0.center) - np.array(s1.center) ) key = (min(s0.id, s1.id), max(s0.id, s1.id)) if key in self.reference_distances: ref_dist = self.reference_distances[key] if ref_dist > 0: scale = current_dist / ref_dist return scale return 1.0 def validate_geometry(self, saddles: List[SaddleROI], tolerance: float = 0.30) -> Tuple[bool, str]: """RELAXED geometry validation""" if len(saddles) != self.expected_count: return False, f"Expected {self.expected_count} saddles, got {len(saddles)}" if self.reference_distances is None or not self.reference_distances: # No reference - accept geometry return True, "No reference geometry (accepting)" sorted_saddles = sorted(saddles, key=lambda s: s.id) centers = [np.array(s.center) for s in sorted_saddles] # Skip collinearity check - too strict # Distance validation with relaxed tolerance max_deviation = 0.0 for i in range(len(sorted_saddles) - 1): s0, s1 = sorted_saddles[i], sorted_saddles[i + 1] current_dist = np.linalg.norm(centers[i + 1] - centers[i]) key = (min(s0.id, s1.id), max(s0.id, s1.id)) if key in self.reference_distances: ref_dist = self.reference_distances[key] if ref_dist > 0: deviation = abs(current_dist - ref_dist) / ref_dist max_deviation = max(max_deviation, deviation) if deviation > tolerance: print(f" Distance S{s0.id}-S{s1.id} off by {deviation*100:.1f}%") if max_deviation > tolerance: return False, f"Max distance deviation: {max_deviation*100:.1f}%" return True, "Geometry valid" class MultiReferenceManager: """Manage multiple golden template images with memory optimization""" def __init__(self, feature_extractor: CNNFeatureExtractor): self.feature_extractor = feature_extractor self.references = [] self.max_references = 20 if not IS_CLOUD else 10 def add_reference(self, image: np.ndarray, saddles: List[SaddleROI]) -> bool: if len(saddles) != 4: print(f"⚠ Reference must have 4 saddles, got {len(saddles)}") return False # Limit number of references if len(self.references) >= self.max_references: print(f"⚠ Max references ({self.max_references}) reached, removing oldest") old_ref = self.references.pop(0) # Cleanup if ENABLE_GC: del old_ref gc.collect() features_list = [] for saddle in saddles: aligned_crop = AlignmentCorrector.correct_saddle(saddle) features = self.feature_extractor.extract_features(aligned_crop) features_list.append(features) # Compress image for storage if image.shape[0] > 800 or image.shape[1] > 800: scale = min(800/image.shape[1], 800/image.shape[0]) new_w, new_h = int(image.shape[1]*scale), int(image.shape[0]*scale) image_stored = cv2.resize(image, (new_w, new_h)) else: image_stored = image self.references.append({ 'image': image_stored, 'saddles': saddles, 'features': features_list }) print(f"✓ Reference {len(self.references)} added") return True def match_saddle(self, saddle: SaddleROI) -> Tuple[float, float]: if not self.references: # No references - return neutral score return 0.90, 0.5 # High similarity when no reference aligned_crop = AlignmentCorrector.correct_saddle(saddle) test_features = self.feature_extractor.extract_features(aligned_crop) similarities = [] distances = [] for ref in self.references: if saddle.id < len(ref['features']): ref_features = ref['features'][saddle.id] sim = self.feature_extractor.compute_similarity(test_features, ref_features) dist = np.linalg.norm(test_features - ref_features) similarities.append(sim) distances.append(dist) if similarities: best_sim = max(similarities) avg_dist = np.mean(distances) return best_sim, avg_dist return 0.90, 0.5 # Default neutral score def get_reference_count(self) -> int: return len(self.references) def get_references(self) -> List[str]: refs = [] for ref in self.references: try: _, buffer = cv2.imencode('.jpg', ref['image'], [cv2.IMWRITE_JPEG_QUALITY, 50]) import base64 img_str = base64.b64encode(buffer).decode('utf-8') refs.append(f"data:image/jpeg;base64,{img_str}") except: continue return refs def cleanup(self): """Release all references""" self.references.clear() if ENABLE_GC: gc.collect() class AdvancedBlockInspector: """ Advanced Engine Block Inspector - CLOUD OPTIMIZED & RELAXED Key improvements: - Works WITHOUT reference images - Relaxed thresholds - Better error handling - Improved saddle detection fallback """ def __init__(self, yolo_model_path: Optional[str] = None, onnx_path: str = "resnet18_features.onnx"): self.yolo_detector = YOLOOBBDetector(yolo_model_path) self.geometric_detector = GeometricSaddleDetector() self.cnn_extractor = CNNFeatureExtractor(onnx_path=onnx_path) self.reference_manager = MultiReferenceManager(self.cnn_extractor) # Use base validator (more flexible) self.geo_validator = GeometricConstraintValidator(expected_count=4) self.use_optimized_validator = False # Structure validator self.structure_validator = None if STRUCTURE_VALIDATOR_AVAILABLE: self.structure_validator = SaddleStructureValidator( arc_center_tolerance=0.05, min_arc_length_ratio=0.6, min_structure_confidence=0.65 ) print("✓ Structure Validator: Semicircle + Middle Arc") self.config = { 'expected_saddles': 4, 'similarity_threshold': 0.75, # RELAXED from 0.88 'use_yolo': self.yolo_detector.model_loaded, 'use_geometric_fallback': True, 'use_alignment_correction': True, 'infer_missing_saddles': True, 'yolo_confidence': 0.20, # RELAXED from 0.25 'use_geometry_validation': False, # DISABLED initially 'use_structure_validation': False, # DISABLED initially 'use_optimized_validator': False, 'min_brightness': 15.0, # RELAXED from 20.0 'require_references': False, # NEW: work without references } # Memory tracking self.last_saddles = [] self.last_image = None print("[OK] Advanced Inspector initialized (Relaxed Mode)") print(f" YOLO-OBB: {'Enabled' if self.config['use_yolo'] else 'Disabled (using geometric)'}") print(f" Similarity threshold: {self.config['similarity_threshold']}") print(f" Requires references: {self.config['require_references']}") print(f" Max cache size: {MAX_CACHE_SIZE}") print(f" GC enabled: {ENABLE_GC}") def add_reference_image(self, image: np.ndarray) -> bool: """Add a golden template image""" try: # Quality check gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image brightness = np.mean(gray) if brightness < 15: print(f"✗ Reference too dark (brightness: {brightness:.1f})") return False # Detect saddles if self.config['use_yolo']: saddles = self.yolo_detector.detect_saddles( image, conf_threshold=self.config['yolo_confidence'] ) else: saddles = [] if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: print("→ Falling back to geometric detection") saddles = self.geometric_detector.detect_saddles(image) if len(saddles) != self.config['expected_saddles']: print(f"✗ Reference failed: {len(saddles)}/{self.config['expected_saddles']} saddles") return False success = self.reference_manager.add_reference(image, saddles) if success and self.reference_manager.get_reference_count() == 1: self.geo_validator.set_reference_positions(saddles) # Enable validation after first reference self.config['use_geometry_validation'] = True # Cleanup self._cleanup_after_processing() return success except Exception as e: print(f"✗ Reference error: {e}") import traceback traceback.print_exc() return False def inspect_block(self, image: np.ndarray) -> BlockInspectionResult: """Inspect engine block with memory optimization - RELAXED MODE""" start_time = time.time() try: # Quality check gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image brightness = np.mean(gray) if brightness < 10: # Very relaxed return BlockInspectionResult( block_status='WASTE_IMAGE', total_saddles=self.config['expected_saddles'], detected_saddles=0, defective_saddles=0, saddle_results=[], processing_time_ms=(time.time() - start_time) * 1000, alignment_status='REJECTED_TOO_DARK', block_angle=0.0 ) # Detect saddles if self.config['use_yolo']: saddles = self.yolo_detector.detect_saddles( image, conf_threshold=self.config['yolo_confidence'] ) else: saddles = [] if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: print("→ Falling back to geometric detection") saddles = self.geometric_detector.detect_saddles(image) detected_count = len(saddles) print(f" Initially detected: {detected_count} saddles") # Infer missing saddles if we have reference if len(saddles) < self.config['expected_saddles'] and self.config['infer_missing_saddles']: if self.reference_manager.get_reference_count() > 0: print(f" Inferring {self.config['expected_saddles'] - len(saddles)} missing saddles") saddles = self.geo_validator.infer_missing_saddles(saddles, image) else: print(" ⚠ No references - cannot infer missing saddles") # Validate count if len(saddles) != self.config['expected_saddles']: alignment_status = f"SADDLE_COUNT: {len(saddles)}/{self.config['expected_saddles']}" # If no references, this might still be OK if not self.config['require_references'] and len(saddles) > 0: print(f" Warning: {alignment_status} but continuing without references") else: return self._error_result(alignment_status, 0.0, saddles, detected_count) # Geometry validation (only if enabled and references exist) if self.config.get('use_geometry_validation', False) and self.reference_manager.get_reference_count() > 0: is_valid, reason = self.geo_validator.validate_geometry(saddles, tolerance=0.30) if not is_valid: print(f" Geometry validation failed: {reason}") # Don't reject - just warn alignment_status = "OK" block_angle = np.mean([s.angle for s in saddles]) if saddles else 0.0 # Feature extraction and comparison if len(saddles) == self.config['expected_saddles']: aligned_crops = [AlignmentCorrector.correct_saddle(s) for s in saddles] batch_features = self.cnn_extractor.extract_batch(aligned_crops) # Compare against references saddle_results = [] for i, saddle in enumerate(saddles): test_features = batch_features[i] best_sim, avg_dist = self.reference_manager.match_saddle(saddle) # Relaxed status determination status = 'PERFECT' if best_sim >= self.config['similarity_threshold'] else 'DEFECTIVE' saddle_results.append(SaddleInspectionResult( saddle_id=saddle.id, status=status, similarity_score=best_sim, confidence=saddle.confidence, feature_distance=avg_dist, detected=saddle.confidence > 0.0 )) else: # Partial detection saddle_results = [] for saddle in saddles: best_sim, avg_dist = self.reference_manager.match_saddle(saddle) saddle_results.append(SaddleInspectionResult( saddle_id=saddle.id, status='UNKNOWN', similarity_score=best_sim, confidence=saddle.confidence, feature_distance=avg_dist, detected=saddle.confidence > 0.0 )) # Block decision - RELAXED defective_count = sum(1 for r in saddle_results if r.status == 'DEFECTIVE') avg_sim = np.mean([r.similarity_score for r in saddle_results]) if saddle_results else 0 # Determine block status if len(saddles) == 0: block_status = 'WASTE_IMAGE' elif len(saddles) < self.config['expected_saddles']: if self.config['require_references']: block_status = 'DEFECTIVE' else: # No references required - accept partial detection block_status = 'UNKNOWN' elif defective_count == 0: block_status = 'PERFECT' elif defective_count <= 2: # Allow up to 2 defective block_status = 'ACCEPTABLE' else: block_status = 'DEFECTIVE' processing_time = (time.time() - start_time) * 1000 self.last_saddles = saddles self.last_image = image result = BlockInspectionResult( block_status=block_status, total_saddles=self.config['expected_saddles'], detected_saddles=detected_count, defective_saddles=defective_count, saddle_results=saddle_results, processing_time_ms=processing_time, alignment_status=alignment_status, block_angle=block_angle ) print(f" Result: {block_status} ({detected_count}/{self.config['expected_saddles']} detected, {defective_count} defective)") # Cleanup self._cleanup_after_processing() return result except Exception as e: print(f"✗ Inspection error: {e}") import traceback traceback.print_exc() return self._error_result(f"EXCEPTION: {str(e)}", 0.0, [], 0) def get_reference_images(self) -> List[str]: return self.reference_manager.get_references() def _error_result(self, message: str, angle: float, saddles: List, detected_count: int) -> BlockInspectionResult: self.last_saddles = saddles return BlockInspectionResult( block_status='ERROR', total_saddles=self.config['expected_saddles'], detected_saddles=detected_count, defective_saddles=0, saddle_results=[], processing_time_ms=0.0, alignment_status=message, block_angle=angle ) def visualize_results( self, original_image: np.ndarray, saddles: List[SaddleROI], results: List[SaddleInspectionResult] ) -> np.ndarray: """Create detailed visualization""" vis = original_image.copy() for saddle, result in zip(saddles, results): # Color coding if result.status == 'UNKNOWN': color = (255, 255, 0) # Yellow elif not result.detected: color = (255, 165, 0) # Orange elif result.status == 'PERFECT' or result.status == 'ACCEPTABLE': color = (0, 255, 0) # Green else: color = (0, 0, 255) # Red x, y, w, h = saddle.bbox cv2.rectangle(vis, (x, y), (x+w, y+h), color, 3) cx, cy = saddle.center cv2.circle(vis, (cx, cy), 10, color, -1) if abs(saddle.angle) > 1.0: rad = np.deg2rad(saddle.angle) end_x = int(cx + 30 * np.cos(rad)) end_y = int(cy + 30 * np.sin(rad)) cv2.arrowedLine(vis, (cx, cy), (end_x, end_y), color, 3) detection_marker = "" if result.detected else "⚙" label = f"{detection_marker} S{saddle.id}: {result.status}" sim_text = f"{result.similarity_score:.3f}" angle_text = f"{saddle.angle:.1f}°" cv2.rectangle(vis, (cx-70, cy-70), (cx+70, cy+40), color, -1) cv2.putText(vis, label, (cx-65, cy-45), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2) cv2.putText(vis, sim_text, (cx-65, cy-20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) cv2.putText(vis, angle_text, (cx-65, cy+5), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2) cv2.putText(vis, f"C:{saddle.confidence:.2f}", (cx-65, cy+28), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) return vis def _cleanup_after_processing(self): """Clean up resources after processing""" # Clear old saddles for saddle in self.last_saddles: if hasattr(saddle, 'cleanup'): saddle.cleanup() # Trigger garbage collection if ENABLE_GC: gc.collect() def cleanup(self): """Full cleanup - call when done""" print("Cleaning up resources...") # Cleanup components if hasattr(self.yolo_detector, 'cleanup'): self.yolo_detector.cleanup() if hasattr(self.geometric_detector, 'cleanup'): self.geometric_detector.cleanup() if hasattr(self.cnn_extractor, 'cleanup'): self.cnn_extractor.cleanup() if hasattr(self.reference_manager, 'cleanup'): self.reference_manager.cleanup() # Clear state self.last_saddles.clear() self.last_image = None # Force garbage collection if ENABLE_GC: gc.collect() print("✓ Cleanup complete")