Spaces:
Sleeping
Sleeping
| # import cv2 | |
| # import numpy as np | |
| # from PIL import Image | |
| # import io | |
| # import json | |
| # from datetime import datetime | |
| # from typing import List, Tuple, Dict, Optional | |
| # from dataclasses import dataclass | |
| # import time | |
| # import os | |
| # import warnings | |
| # import gc | |
| # from collections import OrderedDict | |
| # # ============================================================================ | |
| # # CLOUD DEPLOYMENT OPTIMIZATIONS | |
| # # ============================================================================ | |
| # IS_HF_SPACE = os.environ.get('SPACE_ID') is not None or os.environ.get('HF_SPACE') is not None | |
| # IS_RENDER = os.environ.get('RENDER') is not None | |
| # IS_CLOUD = IS_HF_SPACE or IS_RENDER | |
| # # Memory limits for cloud | |
| # MAX_IMAGE_SIZE = (1280, 960) if IS_CLOUD else (1920, 1440) | |
| # MAX_CACHE_SIZE = 50 if IS_CLOUD else 100 | |
| # ENABLE_GC = IS_CLOUD # Force garbage collection on cloud | |
| # # Lazy imports for faster cold start | |
| # _torch = None | |
| # _torchvision = None | |
| # _YOLO = None | |
| # def _get_torch(): | |
| # global _torch | |
| # if _torch is None: | |
| # import torch | |
| # _torch = torch | |
| # if IS_CLOUD: | |
| # _torch.set_num_threads(2) | |
| # return _torch | |
| # def _get_torchvision(): | |
| # global _torchvision | |
| # if _torchvision is None: | |
| # import torchvision | |
| # _torchvision = torchvision | |
| # return _torchvision | |
| # def _get_yolo(): | |
| # global _YOLO | |
| # if _YOLO is None: | |
| # try: | |
| # from ultralytics import YOLO | |
| # _YOLO = YOLO | |
| # except ImportError: | |
| # _YOLO = False | |
| # return _YOLO | |
| # def _yolo_available(): | |
| # yolo = _get_yolo() | |
| # return yolo is not False and yolo is not None | |
| # warnings.filterwarnings('ignore') | |
| # # Import optimized validators | |
| # try: | |
| # from saddle_structure_validator import SaddleStructureValidator, SaddleStructure | |
| # STRUCTURE_VALIDATOR_AVAILABLE = True | |
| # except ImportError: | |
| # STRUCTURE_VALIDATOR_AVAILABLE = False | |
| # SaddleStructure = None | |
| # print("[WARN] saddle_structure_validator not available") | |
| # try: | |
| # from geometric_validator import OptimizedGeometricValidator, PhysicalDimensions, PixelToMMCalibrator | |
| # OPTIMIZED_VALIDATOR_AVAILABLE = True | |
| # except ImportError: | |
| # OPTIMIZED_VALIDATOR_AVAILABLE = False | |
| # print("[WARN] geometric_validator not available") | |
| # if IS_HF_SPACE: | |
| # print("[HF Space] Memory-optimized mode enabled") | |
| # elif IS_RENDER: | |
| # print("[Render] Cloud-optimized mode enabled") | |
| # @dataclass | |
| # class SaddleROI: | |
| # """Saddle region of interest with rotation and structure info""" | |
| # id: int | |
| # bbox: Tuple[int, int, int, int] | |
| # crop: np.ndarray | |
| # center: Tuple[int, int] | |
| # area: int | |
| # angle: float | |
| # confidence: float | |
| # structure: Optional[any] = None | |
| # def to_dict(self): | |
| # result = { | |
| # 'id': int(self.id), | |
| # 'bbox': [int(x) for x in self.bbox], | |
| # 'center': [int(self.center[0]), int(self.center[1])], | |
| # 'area': int(self.area), | |
| # 'angle': float(self.angle), | |
| # 'confidence': float(self.confidence) | |
| # } | |
| # if self.structure is not None: | |
| # result['has_semicircle'] = bool(getattr(self.structure, 'has_semicircle', False)) | |
| # result['has_middle_arc'] = bool(getattr(self.structure, 'has_middle_arc', False)) | |
| # result['structure_confidence'] = float(getattr(self.structure, 'structure_confidence', 0.0)) | |
| # return result | |
| # def cleanup(self): | |
| # """Free memory""" | |
| # if hasattr(self, 'crop'): | |
| # del self.crop | |
| # self.crop = None | |
| # self.structure = None | |
| # @dataclass | |
| # class SaddleInspectionResult: | |
| # """Inspection result for one saddle""" | |
| # saddle_id: int | |
| # status: str | |
| # similarity_score: float | |
| # confidence: float | |
| # feature_distance: float | |
| # detected: bool | |
| # def to_dict(self): | |
| # return { | |
| # 'saddle_id': int(self.saddle_id), | |
| # 'status': str(self.status), | |
| # 'similarity_score': float(self.similarity_score), | |
| # 'confidence': float(self.confidence), | |
| # 'feature_distance': float(self.feature_distance), | |
| # 'detected': bool(self.detected) | |
| # } | |
| # @dataclass | |
| # class BlockInspectionResult: | |
| # """Complete block inspection result""" | |
| # block_status: str | |
| # total_saddles: int | |
| # detected_saddles: int | |
| # defective_saddles: int | |
| # saddle_results: List[SaddleInspectionResult] | |
| # processing_time_ms: float | |
| # alignment_status: str | |
| # block_angle: float | |
| # def to_dict(self): | |
| # return { | |
| # 'block_status': str(self.block_status), | |
| # 'total_saddles': int(self.total_saddles), | |
| # 'detected_saddles': int(self.detected_saddles), | |
| # 'defective_saddles': int(self.defective_saddles), | |
| # 'saddle_results': [r.to_dict() for r in self.saddle_results], | |
| # 'processing_time_ms': float(self.processing_time_ms), | |
| # 'alignment_status': str(self.alignment_status), | |
| # 'block_angle': float(self.block_angle) | |
| # } | |
| # class LRUCache: | |
| # """Thread-safe LRU cache with size limit""" | |
| # def __init__(self, max_size=100): | |
| # self.cache = OrderedDict() | |
| # self.max_size = max_size | |
| # def get(self, key): | |
| # if key in self.cache: | |
| # self.cache.move_to_end(key) | |
| # return self.cache[key] | |
| # return None | |
| # def put(self, key, value): | |
| # if key in self.cache: | |
| # self.cache.move_to_end(key) | |
| # self.cache[key] = value | |
| # if len(self.cache) > self.max_size: | |
| # self.cache.popitem(last=False) | |
| # def clear(self): | |
| # self.cache.clear() | |
| # if ENABLE_GC: | |
| # gc.collect() | |
| # class YOLOOBBDetector: | |
| # """YOLO-OBB Detector with lazy loading and memory management""" | |
| # def __init__(self, model_path: Optional[str] = None): | |
| # self.model = None | |
| # self.model_loaded = False | |
| # self._model_path = model_path if model_path else 'yolo26n-obb.pt' | |
| # self._initialized = False | |
| # def _lazy_init(self): | |
| # if self._initialized: | |
| # return | |
| # self._initialized = True | |
| # if not _yolo_available(): | |
| # print("⚠ YOLO not available") | |
| # return | |
| # YOLO = _get_yolo() | |
| # try: | |
| # self.model = YOLO(self._model_path) | |
| # self.model_loaded = True | |
| # print(f"✓ YOLO-OBB loaded: {self._model_path}") | |
| # except Exception as e: | |
| # try: | |
| # fallback = 'yolo11n-obb.pt' if self._model_path == 'yolo26n-obb.pt' else 'yolo26n-obb.pt' | |
| # self.model = YOLO(fallback) | |
| # self.model_loaded = True | |
| # print(f"✓ YOLO-OBB fallback: {fallback}") | |
| # except: | |
| # print(f"✗ YOLO load failed: {e}") | |
| # self.model_loaded = False | |
| # @property | |
| # def available(self): | |
| # if not self._initialized: | |
| # self._lazy_init() | |
| # return self.model_loaded | |
| # def detect_saddles(self, image: np.ndarray, conf_threshold: float = 0.25) -> List[SaddleROI]: | |
| # """Detect saddles using YOLO-OBB with memory optimization""" | |
| # if not self.available or self.model is None: | |
| # return [] | |
| # # Resize if too large | |
| # h, w = image.shape[:2] | |
| # if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: | |
| # scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) | |
| # new_w, new_h = int(w*scale), int(h*scale) | |
| # image_resized = cv2.resize(image, (new_w, new_h)) | |
| # else: | |
| # image_resized = image | |
| # scale = 1.0 | |
| # # Convert to grayscale for faster inference | |
| # if len(image_resized.shape) == 3: | |
| # gray = cv2.cvtColor(image_resized, cv2.COLOR_BGR2GRAY) | |
| # gray_3ch = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) | |
| # else: | |
| # gray_3ch = cv2.cvtColor(image_resized, cv2.COLOR_GRAY2BGR) | |
| # results = self.model(gray_3ch, conf=conf_threshold, verbose=False) | |
| # saddles = [] | |
| # for result in results: | |
| # if hasattr(result, 'obb') and result.obb is not None: | |
| # for i, obb in enumerate(result.obb): | |
| # xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy() | |
| # conf = float(obb.conf[0]) | |
| # # Scale back to original coordinates | |
| # xyxyxyxy = xyxyxyxy / scale | |
| # angle = self._calculate_angle(xyxyxyxy) | |
| # x_coords = xyxyxyxy[:, 0] | |
| # y_coords = xyxyxyxy[:, 1] | |
| # x_min, x_max = int(x_coords.min()), int(x_coords.max()) | |
| # y_min, y_max = int(y_coords.min()), int(y_coords.max()) | |
| # w = x_max - x_min | |
| # h = y_max - y_min | |
| # cx = int(np.mean(x_coords)) | |
| # cy = int(np.mean(y_coords)) | |
| # # Crop from original image | |
| # crop = image[y_min:y_max, x_min:x_max].copy() | |
| # area = w * h | |
| # saddles.append(SaddleROI( | |
| # id=i, | |
| # bbox=(x_min, y_min, w, h), | |
| # crop=crop, | |
| # center=(cx, cy), | |
| # area=area, | |
| # angle=angle, | |
| # confidence=conf | |
| # )) | |
| # elif hasattr(result, 'boxes') and result.boxes is not None: | |
| # for i, box in enumerate(result.boxes): | |
| # xyxy = box.xyxy[0].cpu().numpy() | |
| # conf = float(box.conf[0]) | |
| # # Scale back | |
| # xyxy = xyxy / scale | |
| # x_min, y_min, x_max, y_max = map(int, xyxy) | |
| # w = x_max - x_min | |
| # h = y_max - y_min | |
| # cx = (x_min + x_max) // 2 | |
| # cy = (y_min + y_max) // 2 | |
| # crop = image[y_min:y_max, x_min:x_max].copy() | |
| # area = w * h | |
| # saddles.append(SaddleROI( | |
| # id=i, | |
| # bbox=(x_min, y_min, w, h), | |
| # crop=crop, | |
| # center=(cx, cy), | |
| # area=area, | |
| # angle=0.0, | |
| # confidence=conf | |
| # )) | |
| # # Sort by position | |
| # saddles.sort(key=lambda s: (s.center[1], s.center[0])) | |
| # for i, saddle in enumerate(saddles): | |
| # saddle.id = i | |
| # print(f"✓ YOLO detected: {len(saddles)} saddles") | |
| # # Memory cleanup | |
| # if ENABLE_GC: | |
| # del gray_3ch, results | |
| # gc.collect() | |
| # return saddles | |
| # def _calculate_angle(self, xyxyxyxy: np.ndarray) -> float: | |
| # p1 = xyxyxyxy[0] | |
| # p2 = xyxyxyxy[1] | |
| # dx = p2[0] - p1[0] | |
| # dy = p2[1] - p1[1] | |
| # angle = np.degrees(np.arctan2(dy, dx)) | |
| # if angle > 180: | |
| # angle -= 360 | |
| # elif angle < -180: | |
| # angle += 360 | |
| # return float(angle) | |
| # def cleanup(self): | |
| # """Release model from memory""" | |
| # if self.model is not None: | |
| # del self.model | |
| # self.model = None | |
| # self.model_loaded = False | |
| # if ENABLE_GC: | |
| # gc.collect() | |
| # class GeometricSaddleDetector: | |
| # """Optimized geometric detector with better 4-saddle detection""" | |
| # def __init__(self, max_cache_size=None): | |
| # if max_cache_size is None: | |
| # max_cache_size = MAX_CACHE_SIZE | |
| # self._mask_cache = LRUCache(max_cache_size) | |
| # def detect_saddles(self, image: np.ndarray) -> List[SaddleROI]: | |
| # """Detect using improved geometric segmentation""" | |
| # print(f"[GeometricDetector] Input shape: {image.shape}") | |
| # h, w = image.shape[:2] | |
| # # Resize if too large | |
| # if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: | |
| # scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) | |
| # new_w, new_h = int(w*scale), int(h*scale) | |
| # image_work = cv2.resize(image, (new_w, new_h)) | |
| # print(f"[GeometricDetector] Resized to {new_w}x{new_h}") | |
| # else: | |
| # image_work = image | |
| # scale = 1.0 | |
| # # Detect circles with improved method | |
| # outer_circle, inner_circle = self._detect_circles_improved(image_work) | |
| # if outer_circle is None: | |
| # print("[GeometricDetector] ✗ Circle detection failed") | |
| # return [] | |
| # ox, oy, or_ = outer_circle | |
| # ix, iy, ir = inner_circle | |
| # # Scale back to original | |
| # if scale != 1.0: | |
| # ox, oy = int(ox/scale), int(oy/scale) | |
| # or_, ir = int(or_/scale), int(ir/scale) | |
| # h, w = image.shape[:2] | |
| # print(f"[GeometricDetector] Outer: ({ox},{oy}) r={or_}, Inner: ({ix},{iy}) r={ir}") | |
| # # Validate | |
| # if or_ < 50 or or_ > min(h, w): | |
| # print(f"[GeometricDetector] ✗ Invalid outer radius: {or_}") | |
| # return [] | |
| # if ir >= or_: | |
| # ir = int(or_ * 0.4) | |
| # print(f"[GeometricDetector] Adjusted inner radius to {ir}") | |
| # # Create annular mask | |
| # annular_mask = np.zeros((h, w), dtype=np.uint8) | |
| # cv2.circle(annular_mask, (ox, oy), or_, 255, -1) | |
| # cv2.circle(annular_mask, (ix, iy), ir, 0, -1) | |
| # mask_pixels = np.sum(annular_mask > 0) | |
| # print(f"[GeometricDetector] Annular mask pixels: {mask_pixels}") | |
| # if mask_pixels < 1000: | |
| # print(f"[GeometricDetector] ✗ Mask too small") | |
| # return [] | |
| # # Find saddles using contour detection instead of fixed quadrants | |
| # saddles = self._detect_saddles_from_annular_region(image, annular_mask, (ox, oy), or_, ir) | |
| # print(f"[GeometricDetector] Total saddles detected: {len(saddles)}") | |
| # # Memory cleanup | |
| # if ENABLE_GC: | |
| # del annular_mask, image_work | |
| # gc.collect() | |
| # return saddles | |
| # def _detect_saddles_from_annular_region( | |
| # self, | |
| # image: np.ndarray, | |
| # annular_mask: np.ndarray, | |
| # center: Tuple[int, int], | |
| # outer_r: int, | |
| # inner_r: int | |
| # ) -> List[SaddleROI]: | |
| # """ | |
| # Improved saddle detection using contour-based approach | |
| # """ | |
| # h, w = annular_mask.shape | |
| # # Apply mask to image | |
| # if len(image.shape) == 3: | |
| # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| # else: | |
| # gray = image | |
| # masked = cv2.bitwise_and(gray, gray, mask=annular_mask) | |
| # # Enhance contrast | |
| # masked = cv2.equalizeHist(masked) | |
| # # Edge detection on masked region | |
| # edges = cv2.Canny(masked, 50, 150) | |
| # # Find contours | |
| # contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # # Filter and group contours | |
| # valid_contours = [] | |
| # for cnt in contours: | |
| # area = cv2.contourArea(cnt) | |
| # if area > 500: # Minimum area threshold | |
| # valid_contours.append(cnt) | |
| # if len(valid_contours) < 4: | |
| # # Fallback to quadrant-based detection | |
| # print("[GeometricDetector] Using quadrant fallback") | |
| # return self._detect_quadrant_based(image, annular_mask, center, outer_r, inner_r) | |
| # # Sort contours by position (left to right, top to bottom) | |
| # valid_contours = sorted(valid_contours, key=lambda c: (cv2.boundingRect(c)[1], cv2.boundingRect(c)[0])) | |
| # # Take top 4 contours | |
| # saddles = [] | |
| # for idx, cnt in enumerate(valid_contours[:4]): | |
| # x, y, w_box, h_box = cv2.boundingRect(cnt) | |
| # # Validate size | |
| # if w_box < 10 or h_box < 10: | |
| # continue | |
| # # Extract crop | |
| # crop = image[y:y+h_box, x:x+w_box].copy() | |
| # mask_crop = annular_mask[y:y+h_box, x:x+w_box].copy() | |
| # crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop) | |
| # # Calculate center | |
| # M = cv2.moments(cnt) | |
| # if M['m00'] != 0: | |
| # cx = int(M['m10'] / M['m00']) | |
| # cy = int(M['m01'] / M['m00']) | |
| # else: | |
| # cx = x + w_box // 2 | |
| # cy = y + h_box // 2 | |
| # area = int(M['m00']) | |
| # saddles.append(SaddleROI( | |
| # id=idx, | |
| # bbox=(x, y, w_box, h_box), | |
| # crop=crop_masked, | |
| # center=(cx, cy), | |
| # area=area, | |
| # angle=0.0, | |
| # confidence=0.8 # Geometric detection confidence | |
| # )) | |
| # return saddles | |
| # def _detect_quadrant_based( | |
| # self, | |
| # image: np.ndarray, | |
| # annular_mask: np.ndarray, | |
| # center: Tuple[int, int], | |
| # outer_r: int, | |
| # inner_r: int | |
| # ) -> List[SaddleROI]: | |
| # """Fallback quadrant-based detection""" | |
| # h, w = annular_mask.shape | |
| # ox, oy = center | |
| # quadrants = [ | |
| # (0, 90, "Top-Right"), | |
| # (90, 180, "Top-Left"), | |
| # (180, 270, "Bottom-Left"), | |
| # (270, 360, "Bottom-Right") | |
| # ] | |
| # saddles = [] | |
| # for idx, (start_angle, end_angle, name) in enumerate(quadrants): | |
| # mask_key = (h, w, ox, oy, outer_r, inner_r, start_angle, end_angle) | |
| # sector_mask = self._mask_cache.get(mask_key) | |
| # if sector_mask is None: | |
| # sector_mask = self._create_sector_mask( | |
| # (h, w), (ox, oy), outer_r, inner_r, start_angle, end_angle | |
| # ) | |
| # self._mask_cache.put(mask_key, sector_mask) | |
| # saddle_mask = cv2.bitwise_and(annular_mask, sector_mask) | |
| # ys, xs = np.where(saddle_mask > 0) | |
| # if len(xs) == 0 or len(ys) == 0: | |
| # print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): No pixels") | |
| # continue | |
| # x_min, x_max = xs.min(), xs.max() | |
| # y_min, y_max = ys.min(), ys.max() | |
| # if (x_max - x_min) < 10 or (y_max - y_min) < 10: | |
| # print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): Box too small") | |
| # continue | |
| # crop = image[y_min:y_max+1, x_min:x_max+1].copy() | |
| # mask_crop = saddle_mask[y_min:y_max+1, x_min:x_max+1].copy() | |
| # crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop) | |
| # cx = int(np.mean(xs)) | |
| # cy = int(np.mean(ys)) | |
| # area = np.sum(saddle_mask > 0) | |
| # saddles.append(SaddleROI( | |
| # id=idx, | |
| # bbox=(x_min, y_min, x_max - x_min, y_max - y_min), | |
| # crop=crop_masked, | |
| # center=(cx, cy), | |
| # area=area, | |
| # angle=0.0, | |
| # confidence=1.0 | |
| # )) | |
| # return saddles | |
| # def _detect_circles_improved(self, image: np.ndarray) -> Tuple[Optional[Tuple], Optional[Tuple]]: | |
| # """Improved circle detection with multiple strategies""" | |
| # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image | |
| # h, w = gray.shape | |
| # # Strategy 1: Standard HoughCircles with optimized params | |
| # gray_blur = cv2.GaussianBlur(gray, (9, 9), 2) | |
| # minRadius = int(min(h, w) * 0.2) | |
| # maxRadius = int(min(h, w) * 0.7) | |
| # # Try multiple parameter sets | |
| # param_sets = [ | |
| # (80, 25), # Standard | |
| # (60, 30), # More lenient | |
| # (100, 20), # More strict | |
| # ] | |
| # for param1, param2 in param_sets: | |
| # circles = cv2.HoughCircles( | |
| # gray_blur, | |
| # cv2.HOUGH_GRADIENT, | |
| # dp=1.0, | |
| # minDist=80, | |
| # param1=param1, | |
| # param2=param2, | |
| # minRadius=minRadius, | |
| # maxRadius=maxRadius | |
| # ) | |
| # if circles is not None and len(circles[0]) >= 1: | |
| # circles = np.round(circles[0, :]).astype(int) | |
| # circles = sorted(circles, key=lambda c: c[2], reverse=True) | |
| # outer = tuple(circles[0]) | |
| # inner_radius = int(outer[2] * 0.4) | |
| # inner = (outer[0], outer[1], inner_radius) | |
| # print(f"✓ HoughCircles success (param1={param1}, param2={param2})") | |
| # return outer, inner | |
| # # Strategy 2: Contour-based detection | |
| # print("→ Using contour fallback") | |
| # edges = cv2.Canny(gray_blur, 30, 100) | |
| # # Dilate to connect edges | |
| # kernel = np.ones((3, 3), np.uint8) | |
| # edges = cv2.dilate(edges, kernel, iterations=2) | |
| # contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # if contours: | |
| # # Find largest contour | |
| # largest = max(contours, key=cv2.contourArea) | |
| # (cx, cy), radius = cv2.minEnclosingCircle(largest) | |
| # center_x, center_y = int(cx), int(cy) | |
| # outer_radius = int(radius * 0.85) | |
| # inner_radius = int(outer_radius * 0.4) | |
| # print(f"✓ Contour-based: center=({center_x},{center_y}), radius={outer_radius}") | |
| # return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) | |
| # # Strategy 3: Dimension-based estimate | |
| # center_x, center_y = w // 2, h // 2 | |
| # outer_radius = min(w, h) // 2 - 50 | |
| # inner_radius = int(outer_radius * 0.4) | |
| # print(f"→ Using dimension estimate: center=({center_x},{center_y}), radius={outer_radius}") | |
| # return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) | |
| # def _create_sector_mask(self, shape, center, outer_r, inner_r, start_angle, end_angle): | |
| # """Create sector mask with caching""" | |
| # h, w = shape | |
| # mask = np.zeros((h, w), dtype=np.uint8) | |
| # y, x = np.ogrid[:h, :w] | |
| # dx = x - center[0] | |
| # dy = y - center[1] | |
| # angles = np.rad2deg(np.arctan2(dy, dx)) % 360 | |
| # radius = np.sqrt(dx**2 + dy**2) | |
| # if start_angle < end_angle: | |
| # angle_mask = (angles >= start_angle) & (angles < end_angle) | |
| # else: | |
| # angle_mask = (angles >= start_angle) | (angles < end_angle) | |
| # radius_mask = (radius >= inner_r) & (radius <= outer_r) | |
| # sector = angle_mask & radius_mask | |
| # mask[sector] = 255 | |
| # return mask | |
| # def cleanup(self): | |
| # """Release cache""" | |
| # self._mask_cache.clear() | |
| # class AlignmentCorrector: | |
| # """Affine transformation for rotation correction""" | |
| # @staticmethod | |
| # def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray: | |
| # h, w = image.shape[:2] | |
| # center = (w // 2, h // 2) | |
| # M = cv2.getRotationMatrix2D(center, -angle, 1.0) | |
| # cos = np.abs(M[0, 0]) | |
| # sin = np.abs(M[0, 1]) | |
| # new_w = int((h * sin) + (w * cos)) | |
| # new_h = int((h * cos) + (w * sin)) | |
| # M[0, 2] += (new_w / 2) - center[0] | |
| # M[1, 2] += (new_h / 2) - center[1] | |
| # rotated = cv2.warpAffine(image, M, (new_w, new_h), | |
| # flags=cv2.INTER_LINEAR, | |
| # borderMode=cv2.BORDER_CONSTANT, | |
| # borderValue=(0, 0, 0)) | |
| # return rotated | |
| # @staticmethod | |
| # def correct_saddle(saddle: SaddleROI) -> np.ndarray: | |
| # if abs(saddle.angle) < 1.0: | |
| # return saddle.crop | |
| # return AlignmentCorrector.correct_rotation(saddle.crop, saddle.angle) | |
| # class CNNFeatureExtractor: | |
| # """ResNet-18 feature extraction with memory optimization""" | |
| # def __init__(self, onnx_path: str = "resnet18_features.onnx"): | |
| # self.onnx_path = onnx_path | |
| # self.session = None | |
| # self.model = None | |
| # self.device = None | |
| # self.feature_dim = 1024 | |
| # self._initialized = False | |
| # def _lazy_init(self): | |
| # if self._initialized: | |
| # return | |
| # self._initialized = True | |
| # # Use PyTorch with fused features | |
| # self._init_pytorch_model() | |
| # def _init_pytorch_model(self): | |
| # torch = _get_torch() | |
| # torchvision = _get_torchvision() | |
| # from torchvision.models import resnet18, ResNet18_Weights | |
| # import torch.nn as nn | |
| # self.device = torch.device('cpu') | |
| # weights = ResNet18_Weights.IMAGENET1K_V1 | |
| # base_model = resnet18(weights=weights) | |
| # class FeatureFusionModel(nn.Module): | |
| # def __init__(self, base): | |
| # super().__init__() | |
| # self.conv1 = base.conv1 | |
| # self.bn1 = base.bn1 | |
| # self.relu = base.relu | |
| # self.maxpool = base.maxpool | |
| # self.layer1 = base.layer1 | |
| # self.layer2 = base.layer2 | |
| # self.layer3 = base.layer3 | |
| # self.layer4 = base.layer4 | |
| # self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
| # self.maxpool_global = nn.AdaptiveMaxPool2d((1, 1)) | |
| # def forward(self, x): | |
| # x = self.conv1(x) | |
| # x = self.bn1(x) | |
| # x = self.relu(x) | |
| # x = self.maxpool(x) | |
| # x = self.layer1(x) | |
| # x = self.layer2(x) | |
| # l3 = self.layer3(x) | |
| # l4 = self.layer4(l3) | |
| # f3_avg = self.avgpool(l3).view(l3.size(0), -1) | |
| # f3_max = self.maxpool_global(l3).view(l3.size(0), -1) | |
| # f4_avg = self.avgpool(l4).view(l4.size(0), -1) | |
| # return torch.cat([f3_avg, f3_max, f4_avg], dim=1) | |
| # self.model = FeatureFusionModel(base_model) | |
| # self.model.to(self.device) | |
| # self.model.eval() | |
| # print(f"✓ PyTorch FeatureFusion ResNet-18 (dim={self.feature_dim})") | |
| # def _preprocess_image(self, image: np.ndarray) -> np.ndarray: | |
| # if image is None or image.size == 0: | |
| # return np.zeros((3, 224, 224), dtype=np.float32) | |
| # if len(image.shape) == 2: | |
| # image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) | |
| # elif len(image.shape) == 3 and image.shape[2] == 4: | |
| # image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB) | |
| # elif len(image.shape) == 3 and image.shape[2] == 3: | |
| # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # image = cv2.resize(image, (224, 224)) | |
| # image = image.astype(np.float32) / 255.0 | |
| # mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) | |
| # std = np.array([0.229, 0.224, 0.225], dtype=np.float32) | |
| # image = (image - mean) / std | |
| # image = image.transpose(2, 0, 1) | |
| # return image | |
| # def extract_features(self, image: np.ndarray) -> np.ndarray: | |
| # self._lazy_init() | |
| # batch = self._preprocess_image(image)[np.newaxis, ...] | |
| # features = self.extract_batch(batch) | |
| # return features[0] | |
| # def extract_batch(self, images: List[np.ndarray]) -> np.ndarray: | |
| # self._lazy_init() | |
| # if isinstance(images, np.ndarray) and len(images.shape) == 4: | |
| # batch = images.astype(np.float32) | |
| # else: | |
| # preprocessed = [self._preprocess_image(img) for img in images] | |
| # batch = np.stack(preprocessed, axis=0).astype(np.float32) | |
| # torch = _get_torch() | |
| # batch_tensor = torch.from_numpy(batch).to(self.device) | |
| # with torch.no_grad(): | |
| # features = self.model(batch_tensor) | |
| # features = features.squeeze(-1).squeeze(-1).cpu().numpy() | |
| # norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8 | |
| # features = features / norms | |
| # # Memory cleanup | |
| # if ENABLE_GC: | |
| # del batch_tensor | |
| # gc.collect() | |
| # return features | |
| # def compute_similarity(self, features1: np.ndarray, features2: np.ndarray) -> float: | |
| # similarity = np.dot(features1, features2) / ( | |
| # np.linalg.norm(features1) * np.linalg.norm(features2) + 1e-8 | |
| # ) | |
| # return float(np.clip(similarity, 0, 1)) | |
| # def cleanup(self): | |
| # """Release model from memory""" | |
| # if self.model is not None: | |
| # del self.model | |
| # self.model = None | |
| # if ENABLE_GC: | |
| # gc.collect() | |
| # class GeometricConstraintValidator: | |
| # """Validate saddle positions using geometric constraints""" | |
| # def __init__(self, expected_count: int = 4): | |
| # self.expected_count = expected_count | |
| # self.reference_positions = None | |
| # self.reference_distances = {} | |
| # def set_reference_positions(self, saddles: List[SaddleROI]): | |
| # if len(saddles) != self.expected_count: | |
| # print(f"⚠ Reference has {len(saddles)} saddles, expected {self.expected_count}") | |
| # return False | |
| # centers = [s.center for s in saddles] | |
| # centroid = ( | |
| # int(np.mean([c[0] for c in centers])), | |
| # int(np.mean([c[1] for c in centers])) | |
| # ) | |
| # self.reference_positions = [ | |
| # (c[0] - centroid[0], c[1] - centroid[1]) for c in centers | |
| # ] | |
| # self.reference_distances = {} | |
| # for i in range(len(saddles)): | |
| # for j in range(i+1, len(saddles)): | |
| # dist = np.linalg.norm( | |
| # np.array(saddles[i].center) - np.array(saddles[j].center) | |
| # ) | |
| # self.reference_distances[(i, j)] = dist | |
| # print(f"✓ Reference geometry learned: {len(saddles)} saddles") | |
| # return True | |
| # def infer_missing_saddles( | |
| # self, | |
| # detected_saddles: List[SaddleROI], | |
| # image: np.ndarray | |
| # ) -> List[SaddleROI]: | |
| # if len(detected_saddles) >= self.expected_count: | |
| # return detected_saddles | |
| # if self.reference_positions is None: | |
| # print("⚠ No reference geometry") | |
| # return detected_saddles | |
| # if len(detected_saddles) < 2: | |
| # print("⚠ Need at least 2 saddles to infer") | |
| # return detected_saddles | |
| # centers = [s.center for s in detected_saddles] | |
| # centroid = ( | |
| # int(np.mean([c[0] for c in centers])), | |
| # int(np.mean([c[1] for c in centers])) | |
| # ) | |
| # scale = self._estimate_scale(detected_saddles) | |
| # detected_ids = {s.id for s in detected_saddles} | |
| # all_saddles = list(detected_saddles) | |
| # for expected_id in range(self.expected_count): | |
| # if expected_id not in detected_ids: | |
| # rel_x, rel_y = self.reference_positions[expected_id] | |
| # expected_x = int(centroid[0] + rel_x * scale) | |
| # expected_y = int(centroid[1] + rel_y * scale) | |
| # crop_size = 100 | |
| # x_min = max(0, expected_x - crop_size // 2) | |
| # y_min = max(0, expected_y - crop_size // 2) | |
| # x_max = min(image.shape[1], expected_x + crop_size // 2) | |
| # y_max = min(image.shape[0], expected_y + crop_size // 2) | |
| # crop = image[y_min:y_max, x_min:x_max].copy() | |
| # inferred = SaddleROI( | |
| # id=expected_id, | |
| # bbox=(x_min, y_min, x_max - x_min, y_max - y_min), | |
| # crop=crop, | |
| # center=(expected_x, expected_y), | |
| # area=(x_max - x_min) * (y_max - y_min), | |
| # angle=0.0, | |
| # confidence=0.0 | |
| # ) | |
| # all_saddles.append(inferred) | |
| # print(f" ⚙ Inferred Saddle {expected_id} at ({expected_x}, {expected_y})") | |
| # all_saddles.sort(key=lambda s: s.id) | |
| # return all_saddles | |
| # def _estimate_scale(self, saddles: List[SaddleROI]) -> float: | |
| # if len(saddles) < 2 or not self.reference_distances: | |
| # return 1.0 | |
| # if len(saddles) >= 2: | |
| # s0, s1 = saddles[0], saddles[1] | |
| # current_dist = np.linalg.norm( | |
| # np.array(s0.center) - np.array(s1.center) | |
| # ) | |
| # key = (min(s0.id, s1.id), max(s0.id, s1.id)) | |
| # if key in self.reference_distances: | |
| # ref_dist = self.reference_distances[key] | |
| # scale = current_dist / ref_dist | |
| # return scale | |
| # return 1.0 | |
| # def validate_geometry(self, saddles: List[SaddleROI], tolerance: float = 0.05) -> Tuple[bool, str]: | |
| # if len(saddles) != self.expected_count: | |
| # return False, f"Expected {self.expected_count} saddles, got {len(saddles)}" | |
| # if self.reference_distances is None or not self.reference_distances: | |
| # return True, "No reference geometry to validate" | |
| # sorted_saddles = sorted(saddles, key=lambda s: s.id) | |
| # centers = [np.array(s.center) for s in sorted_saddles] | |
| # # Collinearity check | |
| # if len(centers) >= 3: | |
| # v1 = centers[1] - centers[0] | |
| # for i in range(2, len(centers)): | |
| # v2 = centers[i] - centers[0] | |
| # cross = abs(v1[0] * v2[1] - v1[1] * v2[0]) | |
| # line_length = np.linalg.norm(v1) * np.linalg.norm(v2) | |
| # if line_length > 0: | |
| # deviation = cross / line_length | |
| # if deviation > 0.1: | |
| # return False, f"Saddles not collinear (deviation: {deviation:.2f})" | |
| # # Distance validation | |
| # for i in range(len(sorted_saddles) - 1): | |
| # s0, s1 = sorted_saddles[i], sorted_saddles[i + 1] | |
| # current_dist = np.linalg.norm(centers[i + 1] - centers[i]) | |
| # key = (min(s0.id, s1.id), max(s0.id, s1.id)) | |
| # if key in self.reference_distances: | |
| # ref_dist = self.reference_distances[key] | |
| # if ref_dist > 0: | |
| # deviation = abs(current_dist - ref_dist) / ref_dist | |
| # if deviation > tolerance: | |
| # return False, f"Distance S{s0.id}-S{s1.id} off by {deviation*100:.1f}%" | |
| # return True, "Geometry valid" | |
| # class MultiReferenceManager: | |
| # """Manage multiple golden template images with memory optimization""" | |
| # def __init__(self, feature_extractor: CNNFeatureExtractor): | |
| # self.feature_extractor = feature_extractor | |
| # self.references = [] | |
| # self.max_references = 20 if not IS_CLOUD else 10 | |
| # def add_reference(self, image: np.ndarray, saddles: List[SaddleROI]) -> bool: | |
| # if len(saddles) != 4: | |
| # print(f"⚠ Reference must have 4 saddles, got {len(saddles)}") | |
| # return False | |
| # # Limit number of references | |
| # if len(self.references) >= self.max_references: | |
| # print(f"⚠ Max references ({self.max_references}) reached, removing oldest") | |
| # old_ref = self.references.pop(0) | |
| # # Cleanup | |
| # if ENABLE_GC: | |
| # del old_ref | |
| # gc.collect() | |
| # features_list = [] | |
| # for saddle in saddles: | |
| # aligned_crop = AlignmentCorrector.correct_saddle(saddle) | |
| # features = self.feature_extractor.extract_features(aligned_crop) | |
| # features_list.append(features) | |
| # # Compress image for storage | |
| # if image.shape[0] > 800 or image.shape[1] > 800: | |
| # scale = min(800/image.shape[1], 800/image.shape[0]) | |
| # new_w, new_h = int(image.shape[1]*scale), int(image.shape[0]*scale) | |
| # image_stored = cv2.resize(image, (new_w, new_h)) | |
| # else: | |
| # image_stored = image | |
| # self.references.append({ | |
| # 'image': image_stored, | |
| # 'saddles': saddles, | |
| # 'features': features_list | |
| # }) | |
| # print(f"✓ Reference {len(self.references)} added") | |
| # return True | |
| # def match_saddle(self, saddle: SaddleROI) -> Tuple[float, float]: | |
| # if not self.references: | |
| # return 0.0, 999.0 | |
| # aligned_crop = AlignmentCorrector.correct_saddle(saddle) | |
| # test_features = self.feature_extractor.extract_features(aligned_crop) | |
| # similarities = [] | |
| # distances = [] | |
| # for ref in self.references: | |
| # if saddle.id < len(ref['features']): | |
| # ref_features = ref['features'][saddle.id] | |
| # sim = self.feature_extractor.compute_similarity(test_features, ref_features) | |
| # dist = np.linalg.norm(test_features - ref_features) | |
| # similarities.append(sim) | |
| # distances.append(dist) | |
| # if similarities: | |
| # best_sim = max(similarities) | |
| # avg_dist = np.mean(distances) | |
| # return best_sim, avg_dist | |
| # return 0.0, 999.0 | |
| # def get_reference_count(self) -> int: | |
| # return len(self.references) | |
| # def get_references(self) -> List[str]: | |
| # refs = [] | |
| # for ref in self.references: | |
| # try: | |
| # _, buffer = cv2.imencode('.jpg', ref['image'], [cv2.IMWRITE_JPEG_QUALITY, 50]) | |
| # import base64 | |
| # img_str = base64.b64encode(buffer).decode('utf-8') | |
| # refs.append(f"data:image/jpeg;base64,{img_str}") | |
| # except: | |
| # continue | |
| # return refs | |
| # def cleanup(self): | |
| # """Release all references""" | |
| # self.references.clear() | |
| # if ENABLE_GC: | |
| # gc.collect() | |
| # class AdvancedBlockInspector: | |
| # """ | |
| # Advanced Engine Block Inspector - CLOUD OPTIMIZED | |
| # Memory leak fixes: | |
| # - LRU caching with limits | |
| # - Automatic cleanup after processing | |
| # - Garbage collection triggers | |
| # - Reference counting limits | |
| # Improved 4-saddle detection: | |
| # - Contour-based detection | |
| # - Multiple circle detection strategies | |
| # - Structure validation integration | |
| # - Geometric validator integration | |
| # """ | |
| # def __init__(self, yolo_model_path: Optional[str] = None, onnx_path: str = "resnet18_features.onnx"): | |
| # self.yolo_detector = YOLOOBBDetector(yolo_model_path) | |
| # self.geometric_detector = GeometricSaddleDetector() | |
| # self.cnn_extractor = CNNFeatureExtractor(onnx_path=onnx_path) | |
| # self.reference_manager = MultiReferenceManager(self.cnn_extractor) | |
| # # Use optimized validator if available | |
| # if OPTIMIZED_VALIDATOR_AVAILABLE: | |
| # self.geo_validator = OptimizedGeometricValidator(expected_count=4) | |
| # self.use_optimized_validator = True | |
| # print("✓ Optimized Validator: RANSAC + Physical Dims") | |
| # else: | |
| # self.geo_validator = GeometricConstraintValidator(expected_count=4) | |
| # self.use_optimized_validator = False | |
| # # Structure validator | |
| # self.structure_validator = None | |
| # if STRUCTURE_VALIDATOR_AVAILABLE: | |
| # self.structure_validator = SaddleStructureValidator( | |
| # arc_center_tolerance=0.05, | |
| # min_arc_length_ratio=0.6, | |
| # min_structure_confidence=0.65 | |
| # ) | |
| # print("✓ Structure Validator: Semicircle + Middle Arc") | |
| # self.config = { | |
| # 'expected_saddles': 4, | |
| # 'similarity_threshold': 0.88, | |
| # 'use_yolo': self.yolo_detector.model_loaded, | |
| # 'use_geometric_fallback': True, | |
| # 'use_alignment_correction': True, | |
| # 'infer_missing_saddles': True, | |
| # 'yolo_confidence': 0.25, | |
| # 'use_geometry_validation': True, | |
| # 'use_structure_validation': STRUCTURE_VALIDATOR_AVAILABLE, | |
| # 'use_optimized_validator': self.use_optimized_validator, | |
| # 'min_brightness': 20.0, | |
| # } | |
| # # Memory tracking | |
| # self.last_saddles = [] | |
| # self.last_image = None | |
| # print("[OK] Advanced Inspector initialized (Cloud-Optimized)") | |
| # print(f" YOLO-OBB: {'Enabled' if self.config['use_yolo'] else 'Disabled'}") | |
| # print(f" Max cache size: {MAX_CACHE_SIZE}") | |
| # print(f" GC enabled: {ENABLE_GC}") | |
| # def add_reference_image(self, image: np.ndarray) -> bool: | |
| # """Add a golden template image""" | |
| # try: | |
| # # Quality check | |
| # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image | |
| # brightness = np.mean(gray) | |
| # if brightness < 20: | |
| # print(f"✗ Reference too dark (brightness: {brightness:.1f})") | |
| # return False | |
| # # Detect saddles | |
| # if self.config['use_yolo']: | |
| # saddles = self.yolo_detector.detect_saddles( | |
| # image, conf_threshold=self.config['yolo_confidence'] | |
| # ) | |
| # else: | |
| # saddles = [] | |
| # if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: | |
| # print("→ Falling back to geometric detection") | |
| # saddles = self.geometric_detector.detect_saddles(image) | |
| # # Validate with structure validator | |
| # if self.structure_validator and len(saddles) > 0: | |
| # for saddle in saddles: | |
| # if saddle.crop is not None and saddle.crop.size > 0: | |
| # saddle.structure = self.structure_validator.validate_structure(saddle.crop) | |
| # # Use optimized validator to refine | |
| # if self.use_optimized_validator and hasattr(self.geo_validator, 'validate_and_refine'): | |
| # saddles, _ = self.geo_validator.validate_and_refine(saddles, image) | |
| # if len(saddles) != self.config['expected_saddles']: | |
| # print(f"✗ Reference failed: {len(saddles)}/{self.config['expected_saddles']} saddles") | |
| # return False | |
| # success = self.reference_manager.add_reference(image, saddles) | |
| # if success and self.reference_manager.get_reference_count() == 1: | |
| # self.geo_validator.set_reference_positions(saddles) | |
| # # Cleanup | |
| # self._cleanup_after_processing() | |
| # return success | |
| # except Exception as e: | |
| # print(f"✗ Reference error: {e}") | |
| # import traceback | |
| # traceback.print_exc() | |
| # return False | |
| # def inspect_block(self, image: np.ndarray) -> BlockInspectionResult: | |
| # """Inspect engine block with memory optimization""" | |
| # start_time = time.time() | |
| # try: | |
| # # Quality check | |
| # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image | |
| # brightness = np.mean(gray) | |
| # if brightness < 15: | |
| # return BlockInspectionResult( | |
| # block_status='WASTE_IMAGE', | |
| # total_saddles=self.config['expected_saddles'], | |
| # detected_saddles=0, | |
| # defective_saddles=0, | |
| # saddle_results=[], | |
| # processing_time_ms=(time.time() - start_time) * 1000, | |
| # alignment_status='REJECTED_DARK', | |
| # block_angle=0.0 | |
| # ) | |
| # # Resize if needed | |
| # h, w = image.shape[:2] | |
| # if w > 800 or h > 600: | |
| # image = cv2.resize(image, (640, 480)) | |
| # # Detect saddles | |
| # if self.config['use_yolo']: | |
| # saddles = self.yolo_detector.detect_saddles( | |
| # image, conf_threshold=self.config['yolo_confidence'] | |
| # ) | |
| # else: | |
| # saddles = [] | |
| # if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: | |
| # print("→ Falling back to geometric detection") | |
| # saddles = self.geometric_detector.detect_saddles(image) | |
| # detected_count = len(saddles) | |
| # # Validate and refine with optimized validator | |
| # if self.use_optimized_validator and hasattr(self.geo_validator, 'validate_and_refine'): | |
| # saddles, refine_report = self.geo_validator.validate_and_refine(saddles, image) | |
| # print(f" Refined: {refine_report}") | |
| # elif len(saddles) < self.config['expected_saddles'] and self.config['infer_missing_saddles']: | |
| # print(f" Inferring {self.config['expected_saddles'] - len(saddles)} missing saddles") | |
| # saddles = self.geo_validator.infer_missing_saddles(saddles, image) | |
| # # Structure validation | |
| # if self.structure_validator and self.config.get('use_structure_validation', False): | |
| # for saddle in saddles: | |
| # if saddle.crop is not None and saddle.crop.size > 0: | |
| # saddle.structure = self.structure_validator.validate_structure(saddle.crop) | |
| # # Validate count | |
| # if len(saddles) != self.config['expected_saddles']: | |
| # alignment_status = f"ERROR: {len(saddles)}/{self.config['expected_saddles']} saddles" | |
| # return self._error_result(alignment_status, 0.0, saddles, detected_count) | |
| # # Geometry validation | |
| # if self.config.get('use_geometry_validation', True): | |
| # is_valid, reason = self.geo_validator.validate_geometry(saddles, 0.05) | |
| # if not is_valid: | |
| # return self._error_result(f"GEOMETRY: {reason}", 0.0, saddles, detected_count) | |
| # alignment_status = "OK" | |
| # block_angle = np.mean([s.angle for s in saddles]) | |
| # # Batch feature extraction | |
| # aligned_crops = [AlignmentCorrector.correct_saddle(s) for s in saddles] | |
| # batch_features = self.cnn_extractor.extract_batch(aligned_crops) | |
| # # Compare against references | |
| # saddle_results = [] | |
| # for i, saddle in enumerate(saddles): | |
| # test_features = batch_features[i] | |
| # best_sim = 0.0 | |
| # avg_dist = 999.0 | |
| # for ref in self.reference_manager.references: | |
| # if saddle.id < len(ref['features']): | |
| # ref_features = ref['features'][saddle.id] | |
| # sim = self.cnn_extractor.compute_similarity(test_features, ref_features) | |
| # dist = np.linalg.norm(test_features - ref_features) | |
| # if sim > best_sim: | |
| # best_sim = sim | |
| # avg_dist = dist | |
| # status = 'PERFECT' if best_sim >= self.config['similarity_threshold'] else 'DEFECTIVE' | |
| # saddle_results.append(SaddleInspectionResult( | |
| # saddle_id=saddle.id, | |
| # status=status, | |
| # similarity_score=best_sim, | |
| # confidence=best_sim, | |
| # feature_distance=avg_dist, | |
| # detected=saddle.confidence > 0.0 | |
| # )) | |
| # # Block decision | |
| # defective_count = sum(1 for r in saddle_results if r.status == 'DEFECTIVE') | |
| # avg_sim = np.mean([r.similarity_score for r in saddle_results]) if saddle_results else 0 | |
| # if defective_count == self.config['expected_saddles'] and avg_sim < 0.70: | |
| # block_status = 'WASTE_IMAGE' | |
| # elif detected_count == 0: | |
| # if avg_sim < 0.75: | |
| # block_status = 'WASTE_IMAGE' | |
| # else: | |
| # block_status = 'DEFECTIVE' | |
| # else: | |
| # block_status = 'DEFECTIVE' if defective_count > 0 else 'PERFECT' | |
| # processing_time = (time.time() - start_time) * 1000 | |
| # self.last_saddles = saddles | |
| # self.last_image = image | |
| # result = BlockInspectionResult( | |
| # block_status=block_status, | |
| # total_saddles=self.config['expected_saddles'], | |
| # detected_saddles=detected_count, | |
| # defective_saddles=defective_count, | |
| # saddle_results=saddle_results, | |
| # processing_time_ms=processing_time, | |
| # alignment_status=alignment_status, | |
| # block_angle=block_angle | |
| # ) | |
| # # Cleanup | |
| # self._cleanup_after_processing() | |
| # return result | |
| # except Exception as e: | |
| # print(f"✗ Inspection error: {e}") | |
| # import traceback | |
| # traceback.print_exc() | |
| # return self._error_result(str(e), 0.0, [], 0) | |
| # def get_reference_images(self) -> List[str]: | |
| # return self.reference_manager.get_references() | |
| # def _error_result(self, message: str, angle: float, saddles: List, detected_count: int) -> BlockInspectionResult: | |
| # self.last_saddles = saddles | |
| # return BlockInspectionResult( | |
| # block_status='ERROR', | |
| # total_saddles=self.config['expected_saddles'], | |
| # detected_saddles=detected_count, | |
| # defective_saddles=0, | |
| # saddle_results=[], | |
| # processing_time_ms=0.0, | |
| # alignment_status=message, | |
| # block_angle=angle | |
| # ) | |
| # def visualize_results( | |
| # self, | |
| # original_image: np.ndarray, | |
| # saddles: List[SaddleROI], | |
| # results: List[SaddleInspectionResult] | |
| # ) -> np.ndarray: | |
| # """Create detailed visualization""" | |
| # vis = original_image.copy() | |
| # for saddle, result in zip(saddles, results): | |
| # if not result.detected: | |
| # color = (255, 165, 0) # Orange | |
| # elif result.status == 'PERFECT': | |
| # color = (0, 255, 0) # Green | |
| # else: | |
| # color = (0, 0, 255) # Red | |
| # x, y, w, h = saddle.bbox | |
| # cv2.rectangle(vis, (x, y), (x+w, y+h), color, 3) | |
| # cx, cy = saddle.center | |
| # cv2.circle(vis, (cx, cy), 10, color, -1) | |
| # if abs(saddle.angle) > 1.0: | |
| # rad = np.deg2rad(saddle.angle) | |
| # end_x = int(cx + 30 * np.cos(rad)) | |
| # end_y = int(cy + 30 * np.sin(rad)) | |
| # cv2.arrowedLine(vis, (cx, cy), (end_x, end_y), color, 3) | |
| # detection_marker = "" if result.detected else "⚙" | |
| # label = f"{detection_marker} S{saddle.id}: {result.status}" | |
| # sim_text = f"{result.similarity_score:.3f}" | |
| # angle_text = f"{saddle.angle:.1f}°" | |
| # cv2.rectangle(vis, (cx-70, cy-70), (cx+70, cy+40), color, -1) | |
| # cv2.putText(vis, label, (cx-65, cy-45), | |
| # cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2) | |
| # cv2.putText(vis, sim_text, (cx-65, cy-20), | |
| # cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| # cv2.putText(vis, f"C:{saddle.confidence:.2f}", (cx-65, cy+28), | |
| # cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| # # Add angle info if it doesn't overlap text | |
| # # (Simplified visualization for better performance) | |
| # return vis | |
| # def _cleanup_after_processing(self): | |
| # """Clean up resources after processing""" | |
| # # Clear old saddles | |
| # for saddle in self.last_saddles: | |
| # if hasattr(saddle, 'cleanup'): | |
| # saddle.cleanup() | |
| # # Trigger garbage collection | |
| # if ENABLE_GC: | |
| # gc.collect() | |
| # def cleanup(self): | |
| # """Full cleanup - call when done""" | |
| # print("Cleaning up resources...") | |
| # # Cleanup components | |
| # if hasattr(self.yolo_detector, 'cleanup'): | |
| # self.yolo_detector.cleanup() | |
| # if hasattr(self.geometric_detector, 'cleanup'): | |
| # self.geometric_detector.cleanup() | |
| # if hasattr(self.cnn_extractor, 'cleanup'): | |
| # self.cnn_extractor.cleanup() | |
| # if hasattr(self.reference_manager, 'cleanup'): | |
| # self.reference_manager.cleanup() | |
| # # Clear state | |
| # self.last_saddles.clear() | |
| # self.last_image = None | |
| # # Force garbage collection | |
| # if ENABLE_GC: | |
| # gc.collect() | |
| # print("✓ Cleanup complete") | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import json | |
| from datetime import datetime | |
| from typing import List, Tuple, Dict, Optional | |
| from dataclasses import dataclass | |
| import time | |
| import os | |
| import warnings | |
| import gc | |
| from collections import OrderedDict | |
| # ============================================================================ | |
| # CLOUD DEPLOYMENT OPTIMIZATIONS | |
| # ============================================================================ | |
| IS_HF_SPACE = os.environ.get('SPACE_ID') is not None or os.environ.get('HF_SPACE') is not None | |
| IS_RENDER = os.environ.get('RENDER') is not None | |
| IS_CLOUD = IS_HF_SPACE or IS_RENDER | |
| # Memory limits for cloud | |
| MAX_IMAGE_SIZE = (1280, 960) if IS_CLOUD else (1920, 1440) | |
| MAX_CACHE_SIZE = 50 if IS_CLOUD else 100 | |
| ENABLE_GC = IS_CLOUD | |
| # Lazy imports for faster cold start | |
| _torch = None | |
| _torchvision = None | |
| _YOLO = None | |
| def _get_torch(): | |
| global _torch | |
| if _torch is None: | |
| import torch | |
| _torch = torch | |
| if IS_CLOUD: | |
| _torch.set_num_threads(2) | |
| return _torch | |
| def _get_torchvision(): | |
| global _torchvision | |
| if _torchvision is None: | |
| import torchvision | |
| _torchvision = torchvision | |
| return _torchvision | |
| def _get_yolo(): | |
| global _YOLO | |
| if _YOLO is None: | |
| try: | |
| from ultralytics import YOLO | |
| _YOLO = YOLO | |
| except ImportError: | |
| _YOLO = False | |
| return _YOLO | |
| def _yolo_available(): | |
| yolo = _get_yolo() | |
| return yolo is not False and yolo is not None | |
| warnings.filterwarnings('ignore') | |
| # Import optimized validators | |
| try: | |
| from saddle_structure_validator import SaddleStructureValidator, SaddleStructure | |
| STRUCTURE_VALIDATOR_AVAILABLE = True | |
| except ImportError: | |
| STRUCTURE_VALIDATOR_AVAILABLE = False | |
| SaddleStructure = None | |
| print("[WARN] saddle_structure_validator not available") | |
| try: | |
| from geometric_validator import OptimizedGeometricValidator, PhysicalDimensions, PixelToMMCalibrator | |
| OPTIMIZED_VALIDATOR_AVAILABLE = True | |
| except ImportError: | |
| OPTIMIZED_VALIDATOR_AVAILABLE = False | |
| print("[WARN] geometric_validator not available") | |
| if IS_HF_SPACE: | |
| print("[HF Space] Memory-optimized mode enabled") | |
| elif IS_RENDER: | |
| print("[Render] Cloud-optimized mode enabled") | |
| class SaddleROI: | |
| """Saddle region of interest with rotation and structure info""" | |
| id: int | |
| bbox: Tuple[int, int, int, int] | |
| crop: np.ndarray | |
| center: Tuple[int, int] | |
| area: int | |
| angle: float | |
| confidence: float | |
| structure: Optional[any] = None | |
| def to_dict(self): | |
| result = { | |
| 'id': int(self.id), | |
| 'bbox': [int(x) for x in self.bbox], | |
| 'center': [int(self.center[0]), int(self.center[1])], | |
| 'area': int(self.area), | |
| 'angle': float(self.angle), | |
| 'confidence': float(self.confidence) | |
| } | |
| if self.structure is not None: | |
| result['has_semicircle'] = bool(getattr(self.structure, 'has_semicircle', False)) | |
| result['has_middle_arc'] = bool(getattr(self.structure, 'has_middle_arc', False)) | |
| result['structure_confidence'] = float(getattr(self.structure, 'structure_confidence', 0.0)) | |
| return result | |
| def cleanup(self): | |
| """Free memory""" | |
| if hasattr(self, 'crop'): | |
| del self.crop | |
| self.crop = None | |
| self.structure = None | |
| class SaddleInspectionResult: | |
| """Inspection result for one saddle""" | |
| saddle_id: int | |
| status: str | |
| similarity_score: float | |
| confidence: float | |
| feature_distance: float | |
| detected: bool | |
| def to_dict(self): | |
| return { | |
| 'saddle_id': int(self.saddle_id), | |
| 'status': str(self.status), | |
| 'similarity_score': float(self.similarity_score), | |
| 'confidence': float(self.confidence), | |
| 'feature_distance': float(self.feature_distance), | |
| 'detected': bool(self.detected) | |
| } | |
| class BlockInspectionResult: | |
| """Complete block inspection result""" | |
| block_status: str | |
| total_saddles: int | |
| detected_saddles: int | |
| defective_saddles: int | |
| saddle_results: List[SaddleInspectionResult] | |
| processing_time_ms: float | |
| alignment_status: str | |
| block_angle: float | |
| def to_dict(self): | |
| return { | |
| 'block_status': str(self.block_status), | |
| 'total_saddles': int(self.total_saddles), | |
| 'detected_saddles': int(self.detected_saddles), | |
| 'defective_saddles': int(self.defective_saddles), | |
| 'saddle_results': [r.to_dict() for r in self.saddle_results], | |
| 'processing_time_ms': float(self.processing_time_ms), | |
| 'alignment_status': str(self.alignment_status), | |
| 'block_angle': float(self.block_angle) | |
| } | |
| class LRUCache: | |
| """Thread-safe LRU cache with size limit""" | |
| def __init__(self, max_size=100): | |
| self.cache = OrderedDict() | |
| self.max_size = max_size | |
| def get(self, key): | |
| if key in self.cache: | |
| self.cache.move_to_end(key) | |
| return self.cache[key] | |
| return None | |
| def put(self, key, value): | |
| if key in self.cache: | |
| self.cache.move_to_end(key) | |
| self.cache[key] = value | |
| if len(self.cache) > self.max_size: | |
| self.cache.popitem(last=False) | |
| def clear(self): | |
| self.cache.clear() | |
| if ENABLE_GC: | |
| gc.collect() | |
| class YOLOOBBDetector: | |
| """YOLO-OBB Detector with lazy loading and memory management""" | |
| def __init__(self, model_path: Optional[str] = None): | |
| self.model = None | |
| self.model_loaded = False | |
| self._model_path = model_path if model_path else 'yolo11n-obb.pt' | |
| self._initialized = False | |
| def _lazy_init(self): | |
| if self._initialized: | |
| return | |
| self._initialized = True | |
| if not _yolo_available(): | |
| print("⚠ YOLO not available - will use geometric fallback") | |
| return | |
| YOLO = _get_yolo() | |
| try: | |
| self.model = YOLO(self._model_path) | |
| self.model_loaded = True | |
| print(f"✓ YOLO-OBB loaded: {self._model_path}") | |
| except Exception as e: | |
| print(f"⚠ YOLO load failed: {e} - will use geometric fallback") | |
| self.model_loaded = False | |
| def available(self): | |
| if not self._initialized: | |
| self._lazy_init() | |
| return self.model_loaded | |
| def detect_saddles(self, image: np.ndarray, conf_threshold: float = 0.20) -> List[SaddleROI]: | |
| """Detect saddles using YOLO-OBB with memory optimization""" | |
| if not self.available or self.model is None: | |
| return [] | |
| # Resize if too large | |
| h, w = image.shape[:2] | |
| if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: | |
| scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) | |
| new_w, new_h = int(w*scale), int(h*scale) | |
| image_resized = cv2.resize(image, (new_w, new_h)) | |
| else: | |
| image_resized = image | |
| scale = 1.0 | |
| # Convert to grayscale for faster inference | |
| if len(image_resized.shape) == 3: | |
| gray = cv2.cvtColor(image_resized, cv2.COLOR_BGR2GRAY) | |
| gray_3ch = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) | |
| else: | |
| gray_3ch = cv2.cvtColor(image_resized, cv2.COLOR_GRAY2BGR) | |
| results = self.model(gray_3ch, conf=conf_threshold, verbose=False) | |
| saddles = [] | |
| for result in results: | |
| if hasattr(result, 'obb') and result.obb is not None: | |
| for i, obb in enumerate(result.obb): | |
| xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy() | |
| conf = float(obb.conf[0]) | |
| # Scale back to original coordinates | |
| xyxyxyxy = xyxyxyxy / scale | |
| angle = self._calculate_angle(xyxyxyxy) | |
| x_coords = xyxyxyxy[:, 0] | |
| y_coords = xyxyxyxy[:, 1] | |
| x_min, x_max = int(x_coords.min()), int(x_coords.max()) | |
| y_min, y_max = int(y_coords.min()), int(y_coords.max()) | |
| w = x_max - x_min | |
| h = y_max - y_min | |
| cx = int(np.mean(x_coords)) | |
| cy = int(np.mean(y_coords)) | |
| # Crop from original image | |
| crop = image[y_min:y_max, x_min:x_max].copy() | |
| area = w * h | |
| saddles.append(SaddleROI( | |
| id=i, | |
| bbox=(x_min, y_min, w, h), | |
| crop=crop, | |
| center=(cx, cy), | |
| area=area, | |
| angle=angle, | |
| confidence=conf | |
| )) | |
| elif hasattr(result, 'boxes') and result.boxes is not None: | |
| for i, box in enumerate(result.boxes): | |
| xyxy = box.xyxy[0].cpu().numpy() | |
| conf = float(box.conf[0]) | |
| # Scale back | |
| xyxy = xyxy / scale | |
| x_min, y_min, x_max, y_max = map(int, xyxy) | |
| w = x_max - x_min | |
| h = y_max - y_min | |
| cx = (x_min + x_max) // 2 | |
| cy = (y_min + y_max) // 2 | |
| crop = image[y_min:y_max, x_min:x_max].copy() | |
| area = w * h | |
| saddles.append(SaddleROI( | |
| id=i, | |
| bbox=(x_min, y_min, w, h), | |
| crop=crop, | |
| center=(cx, cy), | |
| area=area, | |
| angle=0.0, | |
| confidence=conf | |
| )) | |
| # Sort by position | |
| saddles.sort(key=lambda s: (s.center[1], s.center[0])) | |
| for i, saddle in enumerate(saddles): | |
| saddle.id = i | |
| print(f"✓ YOLO detected: {len(saddles)} saddles") | |
| # Memory cleanup | |
| if ENABLE_GC: | |
| del gray_3ch, results | |
| gc.collect() | |
| return saddles | |
| def _calculate_angle(self, xyxyxyxy: np.ndarray) -> float: | |
| p1 = xyxyxyxy[0] | |
| p2 = xyxyxyxy[1] | |
| dx = p2[0] - p1[0] | |
| dy = p2[1] - p1[1] | |
| angle = np.degrees(np.arctan2(dy, dx)) | |
| if angle > 180: | |
| angle -= 360 | |
| elif angle < -180: | |
| angle += 360 | |
| return float(angle) | |
| def cleanup(self): | |
| """Release model from memory""" | |
| if self.model is not None: | |
| del self.model | |
| self.model = None | |
| self.model_loaded = False | |
| if ENABLE_GC: | |
| gc.collect() | |
| class GeometricSaddleDetector: | |
| """Optimized geometric detector with better 4-saddle detection""" | |
| def __init__(self, max_cache_size=None): | |
| if max_cache_size is None: | |
| max_cache_size = MAX_CACHE_SIZE | |
| self._mask_cache = LRUCache(max_cache_size) | |
| def detect_saddles(self, image: np.ndarray) -> List[SaddleROI]: | |
| """Detect using improved geometric segmentation""" | |
| print(f"[GeometricDetector] Input shape: {image.shape}") | |
| h, w = image.shape[:2] | |
| # Resize if too large | |
| if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]: | |
| scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h) | |
| new_w, new_h = int(w*scale), int(h*scale) | |
| image_work = cv2.resize(image, (new_w, new_h)) | |
| print(f"[GeometricDetector] Resized to {new_w}x{new_h}") | |
| else: | |
| image_work = image | |
| scale = 1.0 | |
| # Detect circles with improved method | |
| outer_circle, inner_circle = self._detect_circles_improved(image_work) | |
| if outer_circle is None: | |
| print("[GeometricDetector] ✗ Circle detection failed") | |
| return [] | |
| ox, oy, or_ = outer_circle | |
| ix, iy, ir = inner_circle | |
| # Scale back to original | |
| if scale != 1.0: | |
| ox, oy = int(ox/scale), int(oy/scale) | |
| or_, ir = int(or_/scale), int(ir/scale) | |
| h, w = image.shape[:2] | |
| print(f"[GeometricDetector] Outer: ({ox},{oy}) r={or_}, Inner: ({ix},{iy}) r={ir}") | |
| # Validate | |
| if or_ < 50 or or_ > min(h, w): | |
| print(f"[GeometricDetector] ✗ Invalid outer radius: {or_}") | |
| return [] | |
| if ir >= or_: | |
| ir = int(or_ * 0.4) | |
| print(f"[GeometricDetector] Adjusted inner radius to {ir}") | |
| # Create annular mask | |
| annular_mask = np.zeros((h, w), dtype=np.uint8) | |
| cv2.circle(annular_mask, (ox, oy), or_, 255, -1) | |
| cv2.circle(annular_mask, (ix, iy), ir, 0, -1) | |
| mask_pixels = np.sum(annular_mask > 0) | |
| print(f"[GeometricDetector] Annular mask pixels: {mask_pixels}") | |
| if mask_pixels < 1000: | |
| print(f"[GeometricDetector] ✗ Mask too small") | |
| return [] | |
| # Find saddles using quadrant-based detection (more reliable for 4 saddles) | |
| saddles = self._detect_quadrant_based(image, annular_mask, (ox, oy), or_, ir) | |
| print(f"[GeometricDetector] Total saddles detected: {len(saddles)}") | |
| # Memory cleanup | |
| if ENABLE_GC: | |
| del annular_mask, image_work | |
| gc.collect() | |
| return saddles | |
| def _detect_quadrant_based( | |
| self, | |
| image: np.ndarray, | |
| annular_mask: np.ndarray, | |
| center: Tuple[int, int], | |
| outer_r: int, | |
| inner_r: int | |
| ) -> List[SaddleROI]: | |
| """Quadrant-based detection for 4 saddles""" | |
| h, w = annular_mask.shape | |
| ox, oy = center | |
| quadrants = [ | |
| (0, 90, "Top-Right"), | |
| (90, 180, "Top-Left"), | |
| (180, 270, "Bottom-Left"), | |
| (270, 360, "Bottom-Right") | |
| ] | |
| saddles = [] | |
| for idx, (start_angle, end_angle, name) in enumerate(quadrants): | |
| mask_key = (h, w, ox, oy, outer_r, inner_r, start_angle, end_angle) | |
| sector_mask = self._mask_cache.get(mask_key) | |
| if sector_mask is None: | |
| sector_mask = self._create_sector_mask( | |
| (h, w), (ox, oy), outer_r, inner_r, start_angle, end_angle | |
| ) | |
| self._mask_cache.put(mask_key, sector_mask) | |
| saddle_mask = cv2.bitwise_and(annular_mask, sector_mask) | |
| ys, xs = np.where(saddle_mask > 0) | |
| if len(xs) == 0 or len(ys) == 0: | |
| print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): No pixels") | |
| continue | |
| x_min, x_max = xs.min(), xs.max() | |
| y_min, y_max = ys.min(), ys.max() | |
| if (x_max - x_min) < 10 or (y_max - y_min) < 10: | |
| print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): Box too small") | |
| continue | |
| crop = image[y_min:y_max+1, x_min:x_max+1].copy() | |
| mask_crop = saddle_mask[y_min:y_max+1, x_min:x_max+1].copy() | |
| crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop) | |
| cx = int(np.mean(xs)) | |
| cy = int(np.mean(ys)) | |
| area = np.sum(saddle_mask > 0) | |
| saddles.append(SaddleROI( | |
| id=idx, | |
| bbox=(x_min, y_min, x_max - x_min, y_max - y_min), | |
| crop=crop_masked, | |
| center=(cx, cy), | |
| area=area, | |
| angle=0.0, | |
| confidence=0.9 # Higher confidence for geometric detection | |
| )) | |
| return saddles | |
| def _detect_circles_improved(self, image: np.ndarray) -> Tuple[Optional[Tuple], Optional[Tuple]]: | |
| """Improved circle detection with multiple strategies""" | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image | |
| h, w = gray.shape | |
| # Strategy 1: Standard HoughCircles with optimized params | |
| gray_blur = cv2.GaussianBlur(gray, (9, 9), 2) | |
| minRadius = int(min(h, w) * 0.2) | |
| maxRadius = int(min(h, w) * 0.7) | |
| # Try multiple parameter sets | |
| param_sets = [ | |
| (80, 25), # Standard | |
| (60, 30), # More lenient | |
| (100, 20), # More strict | |
| ] | |
| for param1, param2 in param_sets: | |
| circles = cv2.HoughCircles( | |
| gray_blur, | |
| cv2.HOUGH_GRADIENT, | |
| dp=1.0, | |
| minDist=80, | |
| param1=param1, | |
| param2=param2, | |
| minRadius=minRadius, | |
| maxRadius=maxRadius | |
| ) | |
| if circles is not None and len(circles[0]) >= 1: | |
| circles = np.round(circles[0, :]).astype(int) | |
| circles = sorted(circles, key=lambda c: c[2], reverse=True) | |
| outer = tuple(circles[0]) | |
| inner_radius = int(outer[2] * 0.4) | |
| inner = (outer[0], outer[1], inner_radius) | |
| print(f"✓ HoughCircles success (param1={param1}, param2={param2})") | |
| return outer, inner | |
| # Strategy 2: Contour-based detection | |
| print("→ Using contour fallback") | |
| edges = cv2.Canny(gray_blur, 30, 100) | |
| # Dilate to connect edges | |
| kernel = np.ones((3, 3), np.uint8) | |
| edges = cv2.dilate(edges, kernel, iterations=2) | |
| contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if contours: | |
| # Find largest contour | |
| largest = max(contours, key=cv2.contourArea) | |
| (cx, cy), radius = cv2.minEnclosingCircle(largest) | |
| center_x, center_y = int(cx), int(cy) | |
| outer_radius = int(radius * 0.85) | |
| inner_radius = int(outer_radius * 0.4) | |
| print(f"✓ Contour-based: center=({center_x},{center_y}), radius={outer_radius}") | |
| return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) | |
| # Strategy 3: Dimension-based estimate | |
| center_x, center_y = w // 2, h // 2 | |
| outer_radius = min(w, h) // 2 - 50 | |
| inner_radius = int(outer_radius * 0.4) | |
| print(f"→ Using dimension estimate: center=({center_x},{center_y}), radius={outer_radius}") | |
| return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius)) | |
| def _create_sector_mask(self, shape, center, outer_r, inner_r, start_angle, end_angle): | |
| """Create sector mask with caching""" | |
| h, w = shape | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| y, x = np.ogrid[:h, :w] | |
| dx = x - center[0] | |
| dy = y - center[1] | |
| angles = np.rad2deg(np.arctan2(dy, dx)) % 360 | |
| radius = np.sqrt(dx**2 + dy**2) | |
| if start_angle < end_angle: | |
| angle_mask = (angles >= start_angle) & (angles < end_angle) | |
| else: | |
| angle_mask = (angles >= start_angle) | (angles < end_angle) | |
| radius_mask = (radius >= inner_r) & (radius <= outer_r) | |
| sector = angle_mask & radius_mask | |
| mask[sector] = 255 | |
| return mask | |
| def cleanup(self): | |
| """Release cache""" | |
| self._mask_cache.clear() | |
| class AlignmentCorrector: | |
| """Affine transformation for rotation correction""" | |
| def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray: | |
| if image is None or image.size == 0: | |
| return image | |
| h, w = image.shape[:2] | |
| center = (w // 2, h // 2) | |
| M = cv2.getRotationMatrix2D(center, -angle, 1.0) | |
| cos = np.abs(M[0, 0]) | |
| sin = np.abs(M[0, 1]) | |
| new_w = int((h * sin) + (w * cos)) | |
| new_h = int((h * cos) + (w * sin)) | |
| M[0, 2] += (new_w / 2) - center[0] | |
| M[1, 2] += (new_h / 2) - center[1] | |
| rotated = cv2.warpAffine(image, M, (new_w, new_h), | |
| flags=cv2.INTER_LINEAR, | |
| borderMode=cv2.BORDER_CONSTANT, | |
| borderValue=(0, 0, 0)) | |
| return rotated | |
| def correct_saddle(saddle: SaddleROI) -> np.ndarray: | |
| if saddle.crop is None or saddle.crop.size == 0: | |
| return np.zeros((224, 224, 3), dtype=np.uint8) | |
| if abs(saddle.angle) < 1.0: | |
| return saddle.crop | |
| return AlignmentCorrector.correct_rotation(saddle.crop, saddle.angle) | |
| class CNNFeatureExtractor: | |
| """ResNet-18 feature extraction with memory optimization""" | |
| def __init__(self, onnx_path: str = "resnet18_features.onnx"): | |
| self.onnx_path = onnx_path | |
| self.session = None | |
| self.model = None | |
| self.device = None | |
| self.feature_dim = 1024 | |
| self._initialized = False | |
| def _lazy_init(self): | |
| if self._initialized: | |
| return | |
| self._initialized = True | |
| # Use PyTorch with fused features | |
| self._init_pytorch_model() | |
| def _init_pytorch_model(self): | |
| torch = _get_torch() | |
| torchvision = _get_torchvision() | |
| from torchvision.models import resnet18, ResNet18_Weights | |
| import torch.nn as nn | |
| self.device = torch.device('cpu') | |
| weights = ResNet18_Weights.IMAGENET1K_V1 | |
| base_model = resnet18(weights=weights) | |
| class FeatureFusionModel(nn.Module): | |
| def __init__(self, base): | |
| super().__init__() | |
| self.conv1 = base.conv1 | |
| self.bn1 = base.bn1 | |
| self.relu = base.relu | |
| self.maxpool = base.maxpool | |
| self.layer1 = base.layer1 | |
| self.layer2 = base.layer2 | |
| self.layer3 = base.layer3 | |
| self.layer4 = base.layer4 | |
| self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
| self.maxpool_global = nn.AdaptiveMaxPool2d((1, 1)) | |
| def forward(self, x): | |
| x = self.conv1(x) | |
| x = self.bn1(x) | |
| x = self.relu(x) | |
| x = self.maxpool(x) | |
| x = self.layer1(x) | |
| x = self.layer2(x) | |
| l3 = self.layer3(x) | |
| l4 = self.layer4(l3) | |
| f3_avg = self.avgpool(l3).view(l3.size(0), -1) | |
| f3_max = self.maxpool_global(l3).view(l3.size(0), -1) | |
| f4_avg = self.avgpool(l4).view(l4.size(0), -1) | |
| return torch.cat([f3_avg, f3_max, f4_avg], dim=1) | |
| self.model = FeatureFusionModel(base_model) | |
| self.model.to(self.device) | |
| self.model.eval() | |
| print(f"✓ PyTorch FeatureFusion ResNet-18 (dim={self.feature_dim})") | |
| def _preprocess_image(self, image: np.ndarray) -> np.ndarray: | |
| if image is None or image.size == 0: | |
| return np.zeros((3, 224, 224), dtype=np.float32) | |
| if len(image.shape) == 2: | |
| image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) | |
| elif len(image.shape) == 3 and image.shape[2] == 4: | |
| image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB) | |
| elif len(image.shape) == 3 and image.shape[2] == 3: | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| image = cv2.resize(image, (224, 224)) | |
| image = image.astype(np.float32) / 255.0 | |
| mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) | |
| std = np.array([0.229, 0.224, 0.225], dtype=np.float32) | |
| image = (image - mean) / std | |
| image = image.transpose(2, 0, 1) | |
| return image | |
| def extract_features(self, image: np.ndarray) -> np.ndarray: | |
| self._lazy_init() | |
| batch = self._preprocess_image(image)[np.newaxis, ...] | |
| features = self.extract_batch(batch) | |
| return features[0] | |
| def extract_batch(self, images: List[np.ndarray]) -> np.ndarray: | |
| self._lazy_init() | |
| if isinstance(images, np.ndarray) and len(images.shape) == 4: | |
| batch = images.astype(np.float32) | |
| else: | |
| preprocessed = [self._preprocess_image(img) for img in images] | |
| batch = np.stack(preprocessed, axis=0).astype(np.float32) | |
| torch = _get_torch() | |
| batch_tensor = torch.from_numpy(batch).to(self.device) | |
| with torch.no_grad(): | |
| features = self.model(batch_tensor) | |
| features = features.squeeze(-1).squeeze(-1).cpu().numpy() | |
| norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8 | |
| features = features / norms | |
| # Memory cleanup | |
| if ENABLE_GC: | |
| del batch_tensor | |
| gc.collect() | |
| return features | |
| def compute_similarity(self, features1: np.ndarray, features2: np.ndarray) -> float: | |
| similarity = np.dot(features1, features2) / ( | |
| np.linalg.norm(features1) * np.linalg.norm(features2) + 1e-8 | |
| ) | |
| return float(np.clip(similarity, 0, 1)) | |
| def cleanup(self): | |
| """Release model from memory""" | |
| if self.model is not None: | |
| del self.model | |
| self.model = None | |
| if ENABLE_GC: | |
| gc.collect() | |
| class GeometricConstraintValidator: | |
| """Validate saddle positions using geometric constraints - RELAXED""" | |
| def __init__(self, expected_count: int = 4): | |
| self.expected_count = expected_count | |
| self.reference_positions = None | |
| self.reference_distances = {} | |
| def set_reference_positions(self, saddles: List[SaddleROI]): | |
| if len(saddles) != self.expected_count: | |
| print(f"⚠ Reference has {len(saddles)} saddles, expected {self.expected_count}") | |
| return False | |
| centers = [s.center for s in saddles] | |
| centroid = ( | |
| int(np.mean([c[0] for c in centers])), | |
| int(np.mean([c[1] for c in centers])) | |
| ) | |
| self.reference_positions = [ | |
| (c[0] - centroid[0], c[1] - centroid[1]) for c in centers | |
| ] | |
| self.reference_distances = {} | |
| for i in range(len(saddles)): | |
| for j in range(i+1, len(saddles)): | |
| dist = np.linalg.norm( | |
| np.array(saddles[i].center) - np.array(saddles[j].center) | |
| ) | |
| self.reference_distances[(i, j)] = dist | |
| print(f"✓ Reference geometry learned: {len(saddles)} saddles") | |
| return True | |
| def infer_missing_saddles( | |
| self, | |
| detected_saddles: List[SaddleROI], | |
| image: np.ndarray | |
| ) -> List[SaddleROI]: | |
| """Infer missing saddles based on reference geometry""" | |
| if len(detected_saddles) >= self.expected_count: | |
| return detected_saddles | |
| if self.reference_positions is None: | |
| print("⚠ No reference geometry - cannot infer") | |
| return detected_saddles | |
| if len(detected_saddles) == 0: | |
| print("⚠ No saddles detected - cannot infer") | |
| return detected_saddles | |
| centers = [s.center for s in detected_saddles] | |
| centroid = ( | |
| int(np.mean([c[0] for c in centers])), | |
| int(np.mean([c[1] for c in centers])) | |
| ) | |
| scale = self._estimate_scale(detected_saddles) | |
| detected_ids = {s.id for s in detected_saddles} | |
| all_saddles = list(detected_saddles) | |
| for expected_id in range(self.expected_count): | |
| if expected_id not in detected_ids: | |
| rel_x, rel_y = self.reference_positions[expected_id] | |
| expected_x = int(centroid[0] + rel_x * scale) | |
| expected_y = int(centroid[1] + rel_y * scale) | |
| crop_size = 150 # Increased crop size | |
| x_min = max(0, expected_x - crop_size // 2) | |
| y_min = max(0, expected_y - crop_size // 2) | |
| x_max = min(image.shape[1], expected_x + crop_size // 2) | |
| y_max = min(image.shape[0], expected_y + crop_size // 2) | |
| crop = image[y_min:y_max, x_min:x_max].copy() | |
| inferred = SaddleROI( | |
| id=expected_id, | |
| bbox=(x_min, y_min, x_max - x_min, y_max - y_min), | |
| crop=crop, | |
| center=(expected_x, expected_y), | |
| area=(x_max - x_min) * (y_max - y_min), | |
| angle=0.0, | |
| confidence=0.5 # Mark as inferred | |
| ) | |
| all_saddles.append(inferred) | |
| print(f" ⚙ Inferred Saddle {expected_id} at ({expected_x}, {expected_y})") | |
| all_saddles.sort(key=lambda s: s.id) | |
| return all_saddles | |
| def _estimate_scale(self, saddles: List[SaddleROI]) -> float: | |
| if len(saddles) < 2 or not self.reference_distances: | |
| return 1.0 | |
| if len(saddles) >= 2: | |
| s0, s1 = saddles[0], saddles[1] | |
| current_dist = np.linalg.norm( | |
| np.array(s0.center) - np.array(s1.center) | |
| ) | |
| key = (min(s0.id, s1.id), max(s0.id, s1.id)) | |
| if key in self.reference_distances: | |
| ref_dist = self.reference_distances[key] | |
| if ref_dist > 0: | |
| scale = current_dist / ref_dist | |
| return scale | |
| return 1.0 | |
| def validate_geometry(self, saddles: List[SaddleROI], tolerance: float = 0.30) -> Tuple[bool, str]: | |
| """RELAXED geometry validation""" | |
| if len(saddles) != self.expected_count: | |
| return False, f"Expected {self.expected_count} saddles, got {len(saddles)}" | |
| if self.reference_distances is None or not self.reference_distances: | |
| # No reference - accept geometry | |
| return True, "No reference geometry (accepting)" | |
| sorted_saddles = sorted(saddles, key=lambda s: s.id) | |
| centers = [np.array(s.center) for s in sorted_saddles] | |
| # Skip collinearity check - too strict | |
| # Distance validation with relaxed tolerance | |
| max_deviation = 0.0 | |
| for i in range(len(sorted_saddles) - 1): | |
| s0, s1 = sorted_saddles[i], sorted_saddles[i + 1] | |
| current_dist = np.linalg.norm(centers[i + 1] - centers[i]) | |
| key = (min(s0.id, s1.id), max(s0.id, s1.id)) | |
| if key in self.reference_distances: | |
| ref_dist = self.reference_distances[key] | |
| if ref_dist > 0: | |
| deviation = abs(current_dist - ref_dist) / ref_dist | |
| max_deviation = max(max_deviation, deviation) | |
| if deviation > tolerance: | |
| print(f" Distance S{s0.id}-S{s1.id} off by {deviation*100:.1f}%") | |
| if max_deviation > tolerance: | |
| return False, f"Max distance deviation: {max_deviation*100:.1f}%" | |
| return True, "Geometry valid" | |
| class MultiReferenceManager: | |
| """Manage multiple golden template images with memory optimization""" | |
| def __init__(self, feature_extractor: CNNFeatureExtractor): | |
| self.feature_extractor = feature_extractor | |
| self.references = [] | |
| self.max_references = 20 if not IS_CLOUD else 10 | |
| def add_reference(self, image: np.ndarray, saddles: List[SaddleROI]) -> bool: | |
| if len(saddles) != 4: | |
| print(f"⚠ Reference must have 4 saddles, got {len(saddles)}") | |
| return False | |
| # Limit number of references | |
| if len(self.references) >= self.max_references: | |
| print(f"⚠ Max references ({self.max_references}) reached, removing oldest") | |
| old_ref = self.references.pop(0) | |
| # Cleanup | |
| if ENABLE_GC: | |
| del old_ref | |
| gc.collect() | |
| features_list = [] | |
| for saddle in saddles: | |
| aligned_crop = AlignmentCorrector.correct_saddle(saddle) | |
| features = self.feature_extractor.extract_features(aligned_crop) | |
| features_list.append(features) | |
| # Compress image for storage | |
| if image.shape[0] > 800 or image.shape[1] > 800: | |
| scale = min(800/image.shape[1], 800/image.shape[0]) | |
| new_w, new_h = int(image.shape[1]*scale), int(image.shape[0]*scale) | |
| image_stored = cv2.resize(image, (new_w, new_h)) | |
| else: | |
| image_stored = image | |
| self.references.append({ | |
| 'image': image_stored, | |
| 'saddles': saddles, | |
| 'features': features_list | |
| }) | |
| print(f"✓ Reference {len(self.references)} added") | |
| return True | |
| def match_saddle(self, saddle: SaddleROI) -> Tuple[float, float]: | |
| if not self.references: | |
| # No references - return neutral score | |
| return 0.90, 0.5 # High similarity when no reference | |
| aligned_crop = AlignmentCorrector.correct_saddle(saddle) | |
| test_features = self.feature_extractor.extract_features(aligned_crop) | |
| similarities = [] | |
| distances = [] | |
| for ref in self.references: | |
| if saddle.id < len(ref['features']): | |
| ref_features = ref['features'][saddle.id] | |
| sim = self.feature_extractor.compute_similarity(test_features, ref_features) | |
| dist = np.linalg.norm(test_features - ref_features) | |
| similarities.append(sim) | |
| distances.append(dist) | |
| if similarities: | |
| best_sim = max(similarities) | |
| avg_dist = np.mean(distances) | |
| return best_sim, avg_dist | |
| return 0.90, 0.5 # Default neutral score | |
| def get_reference_count(self) -> int: | |
| return len(self.references) | |
| def get_references(self) -> List[str]: | |
| refs = [] | |
| for ref in self.references: | |
| try: | |
| _, buffer = cv2.imencode('.jpg', ref['image'], [cv2.IMWRITE_JPEG_QUALITY, 50]) | |
| import base64 | |
| img_str = base64.b64encode(buffer).decode('utf-8') | |
| refs.append(f"data:image/jpeg;base64,{img_str}") | |
| except: | |
| continue | |
| return refs | |
| def cleanup(self): | |
| """Release all references""" | |
| self.references.clear() | |
| if ENABLE_GC: | |
| gc.collect() | |
| class AdvancedBlockInspector: | |
| """ | |
| Advanced Engine Block Inspector - CLOUD OPTIMIZED & RELAXED | |
| Key improvements: | |
| - Works WITHOUT reference images | |
| - Relaxed thresholds | |
| - Better error handling | |
| - Improved saddle detection fallback | |
| """ | |
| def __init__(self, yolo_model_path: Optional[str] = None, onnx_path: str = "resnet18_features.onnx"): | |
| self.yolo_detector = YOLOOBBDetector(yolo_model_path) | |
| self.geometric_detector = GeometricSaddleDetector() | |
| self.cnn_extractor = CNNFeatureExtractor(onnx_path=onnx_path) | |
| self.reference_manager = MultiReferenceManager(self.cnn_extractor) | |
| # Use base validator (more flexible) | |
| self.geo_validator = GeometricConstraintValidator(expected_count=4) | |
| self.use_optimized_validator = False | |
| # Structure validator | |
| self.structure_validator = None | |
| if STRUCTURE_VALIDATOR_AVAILABLE: | |
| self.structure_validator = SaddleStructureValidator( | |
| arc_center_tolerance=0.05, | |
| min_arc_length_ratio=0.6, | |
| min_structure_confidence=0.65 | |
| ) | |
| print("✓ Structure Validator: Semicircle + Middle Arc") | |
| self.config = { | |
| 'expected_saddles': 4, | |
| 'similarity_threshold': 0.75, # RELAXED from 0.88 | |
| 'use_yolo': self.yolo_detector.model_loaded, | |
| 'use_geometric_fallback': True, | |
| 'use_alignment_correction': True, | |
| 'infer_missing_saddles': True, | |
| 'yolo_confidence': 0.20, # RELAXED from 0.25 | |
| 'use_geometry_validation': False, # DISABLED initially | |
| 'use_structure_validation': False, # DISABLED initially | |
| 'use_optimized_validator': False, | |
| 'min_brightness': 15.0, # RELAXED from 20.0 | |
| 'require_references': False, # NEW: work without references | |
| } | |
| # Memory tracking | |
| self.last_saddles = [] | |
| self.last_image = None | |
| print("[OK] Advanced Inspector initialized (Relaxed Mode)") | |
| print(f" YOLO-OBB: {'Enabled' if self.config['use_yolo'] else 'Disabled (using geometric)'}") | |
| print(f" Similarity threshold: {self.config['similarity_threshold']}") | |
| print(f" Requires references: {self.config['require_references']}") | |
| print(f" Max cache size: {MAX_CACHE_SIZE}") | |
| print(f" GC enabled: {ENABLE_GC}") | |
| def add_reference_image(self, image: np.ndarray) -> bool: | |
| """Add a golden template image""" | |
| try: | |
| # Quality check | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image | |
| brightness = np.mean(gray) | |
| if brightness < 15: | |
| print(f"✗ Reference too dark (brightness: {brightness:.1f})") | |
| return False | |
| # Detect saddles | |
| if self.config['use_yolo']: | |
| saddles = self.yolo_detector.detect_saddles( | |
| image, conf_threshold=self.config['yolo_confidence'] | |
| ) | |
| else: | |
| saddles = [] | |
| if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: | |
| print("→ Falling back to geometric detection") | |
| saddles = self.geometric_detector.detect_saddles(image) | |
| if len(saddles) != self.config['expected_saddles']: | |
| print(f"✗ Reference failed: {len(saddles)}/{self.config['expected_saddles']} saddles") | |
| return False | |
| success = self.reference_manager.add_reference(image, saddles) | |
| if success and self.reference_manager.get_reference_count() == 1: | |
| self.geo_validator.set_reference_positions(saddles) | |
| # Enable validation after first reference | |
| self.config['use_geometry_validation'] = True | |
| # Cleanup | |
| self._cleanup_after_processing() | |
| return success | |
| except Exception as e: | |
| print(f"✗ Reference error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def inspect_block(self, image: np.ndarray) -> BlockInspectionResult: | |
| """Inspect engine block with memory optimization - RELAXED MODE""" | |
| start_time = time.time() | |
| try: | |
| # Quality check | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image | |
| brightness = np.mean(gray) | |
| if brightness < 10: # Very relaxed | |
| return BlockInspectionResult( | |
| block_status='WASTE_IMAGE', | |
| total_saddles=self.config['expected_saddles'], | |
| detected_saddles=0, | |
| defective_saddles=0, | |
| saddle_results=[], | |
| processing_time_ms=(time.time() - start_time) * 1000, | |
| alignment_status='REJECTED_TOO_DARK', | |
| block_angle=0.0 | |
| ) | |
| # Detect saddles | |
| if self.config['use_yolo']: | |
| saddles = self.yolo_detector.detect_saddles( | |
| image, conf_threshold=self.config['yolo_confidence'] | |
| ) | |
| else: | |
| saddles = [] | |
| if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']: | |
| print("→ Falling back to geometric detection") | |
| saddles = self.geometric_detector.detect_saddles(image) | |
| detected_count = len(saddles) | |
| print(f" Initially detected: {detected_count} saddles") | |
| # Infer missing saddles if we have reference | |
| if len(saddles) < self.config['expected_saddles'] and self.config['infer_missing_saddles']: | |
| if self.reference_manager.get_reference_count() > 0: | |
| print(f" Inferring {self.config['expected_saddles'] - len(saddles)} missing saddles") | |
| saddles = self.geo_validator.infer_missing_saddles(saddles, image) | |
| else: | |
| print(" ⚠ No references - cannot infer missing saddles") | |
| # Validate count | |
| if len(saddles) != self.config['expected_saddles']: | |
| alignment_status = f"SADDLE_COUNT: {len(saddles)}/{self.config['expected_saddles']}" | |
| # If no references, this might still be OK | |
| if not self.config['require_references'] and len(saddles) > 0: | |
| print(f" Warning: {alignment_status} but continuing without references") | |
| else: | |
| return self._error_result(alignment_status, 0.0, saddles, detected_count) | |
| # Geometry validation (only if enabled and references exist) | |
| if self.config.get('use_geometry_validation', False) and self.reference_manager.get_reference_count() > 0: | |
| is_valid, reason = self.geo_validator.validate_geometry(saddles, tolerance=0.30) | |
| if not is_valid: | |
| print(f" Geometry validation failed: {reason}") | |
| # Don't reject - just warn | |
| alignment_status = "OK" | |
| block_angle = np.mean([s.angle for s in saddles]) if saddles else 0.0 | |
| # Feature extraction and comparison | |
| if len(saddles) == self.config['expected_saddles']: | |
| aligned_crops = [AlignmentCorrector.correct_saddle(s) for s in saddles] | |
| batch_features = self.cnn_extractor.extract_batch(aligned_crops) | |
| # Compare against references | |
| saddle_results = [] | |
| for i, saddle in enumerate(saddles): | |
| test_features = batch_features[i] | |
| best_sim, avg_dist = self.reference_manager.match_saddle(saddle) | |
| # Relaxed status determination | |
| status = 'PERFECT' if best_sim >= self.config['similarity_threshold'] else 'DEFECTIVE' | |
| saddle_results.append(SaddleInspectionResult( | |
| saddle_id=saddle.id, | |
| status=status, | |
| similarity_score=best_sim, | |
| confidence=saddle.confidence, | |
| feature_distance=avg_dist, | |
| detected=saddle.confidence > 0.0 | |
| )) | |
| else: | |
| # Partial detection | |
| saddle_results = [] | |
| for saddle in saddles: | |
| best_sim, avg_dist = self.reference_manager.match_saddle(saddle) | |
| saddle_results.append(SaddleInspectionResult( | |
| saddle_id=saddle.id, | |
| status='UNKNOWN', | |
| similarity_score=best_sim, | |
| confidence=saddle.confidence, | |
| feature_distance=avg_dist, | |
| detected=saddle.confidence > 0.0 | |
| )) | |
| # Block decision - RELAXED | |
| defective_count = sum(1 for r in saddle_results if r.status == 'DEFECTIVE') | |
| avg_sim = np.mean([r.similarity_score for r in saddle_results]) if saddle_results else 0 | |
| # Determine block status | |
| if len(saddles) == 0: | |
| block_status = 'WASTE_IMAGE' | |
| elif len(saddles) < self.config['expected_saddles']: | |
| if self.config['require_references']: | |
| block_status = 'DEFECTIVE' | |
| else: | |
| # No references required - accept partial detection | |
| block_status = 'UNKNOWN' | |
| elif defective_count == 0: | |
| block_status = 'PERFECT' | |
| elif defective_count <= 2: # Allow up to 2 defective | |
| block_status = 'ACCEPTABLE' | |
| else: | |
| block_status = 'DEFECTIVE' | |
| processing_time = (time.time() - start_time) * 1000 | |
| self.last_saddles = saddles | |
| self.last_image = image | |
| result = BlockInspectionResult( | |
| block_status=block_status, | |
| total_saddles=self.config['expected_saddles'], | |
| detected_saddles=detected_count, | |
| defective_saddles=defective_count, | |
| saddle_results=saddle_results, | |
| processing_time_ms=processing_time, | |
| alignment_status=alignment_status, | |
| block_angle=block_angle | |
| ) | |
| print(f" Result: {block_status} ({detected_count}/{self.config['expected_saddles']} detected, {defective_count} defective)") | |
| # Cleanup | |
| self._cleanup_after_processing() | |
| return result | |
| except Exception as e: | |
| print(f"✗ Inspection error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return self._error_result(f"EXCEPTION: {str(e)}", 0.0, [], 0) | |
| def get_reference_images(self) -> List[str]: | |
| return self.reference_manager.get_references() | |
| def _error_result(self, message: str, angle: float, saddles: List, detected_count: int) -> BlockInspectionResult: | |
| self.last_saddles = saddles | |
| return BlockInspectionResult( | |
| block_status='ERROR', | |
| total_saddles=self.config['expected_saddles'], | |
| detected_saddles=detected_count, | |
| defective_saddles=0, | |
| saddle_results=[], | |
| processing_time_ms=0.0, | |
| alignment_status=message, | |
| block_angle=angle | |
| ) | |
| def visualize_results( | |
| self, | |
| original_image: np.ndarray, | |
| saddles: List[SaddleROI], | |
| results: List[SaddleInspectionResult] | |
| ) -> np.ndarray: | |
| """Create detailed visualization""" | |
| vis = original_image.copy() | |
| for saddle, result in zip(saddles, results): | |
| # Color coding | |
| if result.status == 'UNKNOWN': | |
| color = (255, 255, 0) # Yellow | |
| elif not result.detected: | |
| color = (255, 165, 0) # Orange | |
| elif result.status == 'PERFECT' or result.status == 'ACCEPTABLE': | |
| color = (0, 255, 0) # Green | |
| else: | |
| color = (0, 0, 255) # Red | |
| x, y, w, h = saddle.bbox | |
| cv2.rectangle(vis, (x, y), (x+w, y+h), color, 3) | |
| cx, cy = saddle.center | |
| cv2.circle(vis, (cx, cy), 10, color, -1) | |
| if abs(saddle.angle) > 1.0: | |
| rad = np.deg2rad(saddle.angle) | |
| end_x = int(cx + 30 * np.cos(rad)) | |
| end_y = int(cy + 30 * np.sin(rad)) | |
| cv2.arrowedLine(vis, (cx, cy), (end_x, end_y), color, 3) | |
| detection_marker = "" if result.detected else "⚙" | |
| label = f"{detection_marker} S{saddle.id}: {result.status}" | |
| sim_text = f"{result.similarity_score:.3f}" | |
| angle_text = f"{saddle.angle:.1f}°" | |
| cv2.rectangle(vis, (cx-70, cy-70), (cx+70, cy+40), color, -1) | |
| cv2.putText(vis, label, (cx-65, cy-45), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2) | |
| cv2.putText(vis, sim_text, (cx-65, cy-20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| cv2.putText(vis, angle_text, (cx-65, cy+5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2) | |
| cv2.putText(vis, f"C:{saddle.confidence:.2f}", (cx-65, cy+28), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| return vis | |
| def _cleanup_after_processing(self): | |
| """Clean up resources after processing""" | |
| # Clear old saddles | |
| for saddle in self.last_saddles: | |
| if hasattr(saddle, 'cleanup'): | |
| saddle.cleanup() | |
| # Trigger garbage collection | |
| if ENABLE_GC: | |
| gc.collect() | |
| def cleanup(self): | |
| """Full cleanup - call when done""" | |
| print("Cleaning up resources...") | |
| # Cleanup components | |
| if hasattr(self.yolo_detector, 'cleanup'): | |
| self.yolo_detector.cleanup() | |
| if hasattr(self.geometric_detector, 'cleanup'): | |
| self.geometric_detector.cleanup() | |
| if hasattr(self.cnn_extractor, 'cleanup'): | |
| self.cnn_extractor.cleanup() | |
| if hasattr(self.reference_manager, 'cleanup'): | |
| self.reference_manager.cleanup() | |
| # Clear state | |
| self.last_saddles.clear() | |
| self.last_image = None | |
| # Force garbage collection | |
| if ENABLE_GC: | |
| gc.collect() | |
| print("✓ Cleanup complete") |