inspector-model / inspector_engine.py
eho69's picture
upgrade
bfc5e2a verified
# import cv2
# import numpy as np
# from PIL import Image
# import io
# import json
# from datetime import datetime
# from typing import List, Tuple, Dict, Optional
# from dataclasses import dataclass
# import time
# import os
# import warnings
# import gc
# from collections import OrderedDict
# # ============================================================================
# # CLOUD DEPLOYMENT OPTIMIZATIONS
# # ============================================================================
# IS_HF_SPACE = os.environ.get('SPACE_ID') is not None or os.environ.get('HF_SPACE') is not None
# IS_RENDER = os.environ.get('RENDER') is not None
# IS_CLOUD = IS_HF_SPACE or IS_RENDER
# # Memory limits for cloud
# MAX_IMAGE_SIZE = (1280, 960) if IS_CLOUD else (1920, 1440)
# MAX_CACHE_SIZE = 50 if IS_CLOUD else 100
# ENABLE_GC = IS_CLOUD # Force garbage collection on cloud
# # Lazy imports for faster cold start
# _torch = None
# _torchvision = None
# _YOLO = None
# def _get_torch():
# global _torch
# if _torch is None:
# import torch
# _torch = torch
# if IS_CLOUD:
# _torch.set_num_threads(2)
# return _torch
# def _get_torchvision():
# global _torchvision
# if _torchvision is None:
# import torchvision
# _torchvision = torchvision
# return _torchvision
# def _get_yolo():
# global _YOLO
# if _YOLO is None:
# try:
# from ultralytics import YOLO
# _YOLO = YOLO
# except ImportError:
# _YOLO = False
# return _YOLO
# def _yolo_available():
# yolo = _get_yolo()
# return yolo is not False and yolo is not None
# warnings.filterwarnings('ignore')
# # Import optimized validators
# try:
# from saddle_structure_validator import SaddleStructureValidator, SaddleStructure
# STRUCTURE_VALIDATOR_AVAILABLE = True
# except ImportError:
# STRUCTURE_VALIDATOR_AVAILABLE = False
# SaddleStructure = None
# print("[WARN] saddle_structure_validator not available")
# try:
# from geometric_validator import OptimizedGeometricValidator, PhysicalDimensions, PixelToMMCalibrator
# OPTIMIZED_VALIDATOR_AVAILABLE = True
# except ImportError:
# OPTIMIZED_VALIDATOR_AVAILABLE = False
# print("[WARN] geometric_validator not available")
# if IS_HF_SPACE:
# print("[HF Space] Memory-optimized mode enabled")
# elif IS_RENDER:
# print("[Render] Cloud-optimized mode enabled")
# @dataclass
# class SaddleROI:
# """Saddle region of interest with rotation and structure info"""
# id: int
# bbox: Tuple[int, int, int, int]
# crop: np.ndarray
# center: Tuple[int, int]
# area: int
# angle: float
# confidence: float
# structure: Optional[any] = None
# def to_dict(self):
# result = {
# 'id': int(self.id),
# 'bbox': [int(x) for x in self.bbox],
# 'center': [int(self.center[0]), int(self.center[1])],
# 'area': int(self.area),
# 'angle': float(self.angle),
# 'confidence': float(self.confidence)
# }
# if self.structure is not None:
# result['has_semicircle'] = bool(getattr(self.structure, 'has_semicircle', False))
# result['has_middle_arc'] = bool(getattr(self.structure, 'has_middle_arc', False))
# result['structure_confidence'] = float(getattr(self.structure, 'structure_confidence', 0.0))
# return result
# def cleanup(self):
# """Free memory"""
# if hasattr(self, 'crop'):
# del self.crop
# self.crop = None
# self.structure = None
# @dataclass
# class SaddleInspectionResult:
# """Inspection result for one saddle"""
# saddle_id: int
# status: str
# similarity_score: float
# confidence: float
# feature_distance: float
# detected: bool
# def to_dict(self):
# return {
# 'saddle_id': int(self.saddle_id),
# 'status': str(self.status),
# 'similarity_score': float(self.similarity_score),
# 'confidence': float(self.confidence),
# 'feature_distance': float(self.feature_distance),
# 'detected': bool(self.detected)
# }
# @dataclass
# class BlockInspectionResult:
# """Complete block inspection result"""
# block_status: str
# total_saddles: int
# detected_saddles: int
# defective_saddles: int
# saddle_results: List[SaddleInspectionResult]
# processing_time_ms: float
# alignment_status: str
# block_angle: float
# def to_dict(self):
# return {
# 'block_status': str(self.block_status),
# 'total_saddles': int(self.total_saddles),
# 'detected_saddles': int(self.detected_saddles),
# 'defective_saddles': int(self.defective_saddles),
# 'saddle_results': [r.to_dict() for r in self.saddle_results],
# 'processing_time_ms': float(self.processing_time_ms),
# 'alignment_status': str(self.alignment_status),
# 'block_angle': float(self.block_angle)
# }
# class LRUCache:
# """Thread-safe LRU cache with size limit"""
# def __init__(self, max_size=100):
# self.cache = OrderedDict()
# self.max_size = max_size
# def get(self, key):
# if key in self.cache:
# self.cache.move_to_end(key)
# return self.cache[key]
# return None
# def put(self, key, value):
# if key in self.cache:
# self.cache.move_to_end(key)
# self.cache[key] = value
# if len(self.cache) > self.max_size:
# self.cache.popitem(last=False)
# def clear(self):
# self.cache.clear()
# if ENABLE_GC:
# gc.collect()
# class YOLOOBBDetector:
# """YOLO-OBB Detector with lazy loading and memory management"""
# def __init__(self, model_path: Optional[str] = None):
# self.model = None
# self.model_loaded = False
# self._model_path = model_path if model_path else 'yolo26n-obb.pt'
# self._initialized = False
# def _lazy_init(self):
# if self._initialized:
# return
# self._initialized = True
# if not _yolo_available():
# print("⚠ YOLO not available")
# return
# YOLO = _get_yolo()
# try:
# self.model = YOLO(self._model_path)
# self.model_loaded = True
# print(f"✓ YOLO-OBB loaded: {self._model_path}")
# except Exception as e:
# try:
# fallback = 'yolo11n-obb.pt' if self._model_path == 'yolo26n-obb.pt' else 'yolo26n-obb.pt'
# self.model = YOLO(fallback)
# self.model_loaded = True
# print(f"✓ YOLO-OBB fallback: {fallback}")
# except:
# print(f"✗ YOLO load failed: {e}")
# self.model_loaded = False
# @property
# def available(self):
# if not self._initialized:
# self._lazy_init()
# return self.model_loaded
# def detect_saddles(self, image: np.ndarray, conf_threshold: float = 0.25) -> List[SaddleROI]:
# """Detect saddles using YOLO-OBB with memory optimization"""
# if not self.available or self.model is None:
# return []
# # Resize if too large
# h, w = image.shape[:2]
# if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]:
# scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h)
# new_w, new_h = int(w*scale), int(h*scale)
# image_resized = cv2.resize(image, (new_w, new_h))
# else:
# image_resized = image
# scale = 1.0
# # Convert to grayscale for faster inference
# if len(image_resized.shape) == 3:
# gray = cv2.cvtColor(image_resized, cv2.COLOR_BGR2GRAY)
# gray_3ch = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
# else:
# gray_3ch = cv2.cvtColor(image_resized, cv2.COLOR_GRAY2BGR)
# results = self.model(gray_3ch, conf=conf_threshold, verbose=False)
# saddles = []
# for result in results:
# if hasattr(result, 'obb') and result.obb is not None:
# for i, obb in enumerate(result.obb):
# xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy()
# conf = float(obb.conf[0])
# # Scale back to original coordinates
# xyxyxyxy = xyxyxyxy / scale
# angle = self._calculate_angle(xyxyxyxy)
# x_coords = xyxyxyxy[:, 0]
# y_coords = xyxyxyxy[:, 1]
# x_min, x_max = int(x_coords.min()), int(x_coords.max())
# y_min, y_max = int(y_coords.min()), int(y_coords.max())
# w = x_max - x_min
# h = y_max - y_min
# cx = int(np.mean(x_coords))
# cy = int(np.mean(y_coords))
# # Crop from original image
# crop = image[y_min:y_max, x_min:x_max].copy()
# area = w * h
# saddles.append(SaddleROI(
# id=i,
# bbox=(x_min, y_min, w, h),
# crop=crop,
# center=(cx, cy),
# area=area,
# angle=angle,
# confidence=conf
# ))
# elif hasattr(result, 'boxes') and result.boxes is not None:
# for i, box in enumerate(result.boxes):
# xyxy = box.xyxy[0].cpu().numpy()
# conf = float(box.conf[0])
# # Scale back
# xyxy = xyxy / scale
# x_min, y_min, x_max, y_max = map(int, xyxy)
# w = x_max - x_min
# h = y_max - y_min
# cx = (x_min + x_max) // 2
# cy = (y_min + y_max) // 2
# crop = image[y_min:y_max, x_min:x_max].copy()
# area = w * h
# saddles.append(SaddleROI(
# id=i,
# bbox=(x_min, y_min, w, h),
# crop=crop,
# center=(cx, cy),
# area=area,
# angle=0.0,
# confidence=conf
# ))
# # Sort by position
# saddles.sort(key=lambda s: (s.center[1], s.center[0]))
# for i, saddle in enumerate(saddles):
# saddle.id = i
# print(f"✓ YOLO detected: {len(saddles)} saddles")
# # Memory cleanup
# if ENABLE_GC:
# del gray_3ch, results
# gc.collect()
# return saddles
# def _calculate_angle(self, xyxyxyxy: np.ndarray) -> float:
# p1 = xyxyxyxy[0]
# p2 = xyxyxyxy[1]
# dx = p2[0] - p1[0]
# dy = p2[1] - p1[1]
# angle = np.degrees(np.arctan2(dy, dx))
# if angle > 180:
# angle -= 360
# elif angle < -180:
# angle += 360
# return float(angle)
# def cleanup(self):
# """Release model from memory"""
# if self.model is not None:
# del self.model
# self.model = None
# self.model_loaded = False
# if ENABLE_GC:
# gc.collect()
# class GeometricSaddleDetector:
# """Optimized geometric detector with better 4-saddle detection"""
# def __init__(self, max_cache_size=None):
# if max_cache_size is None:
# max_cache_size = MAX_CACHE_SIZE
# self._mask_cache = LRUCache(max_cache_size)
# def detect_saddles(self, image: np.ndarray) -> List[SaddleROI]:
# """Detect using improved geometric segmentation"""
# print(f"[GeometricDetector] Input shape: {image.shape}")
# h, w = image.shape[:2]
# # Resize if too large
# if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]:
# scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h)
# new_w, new_h = int(w*scale), int(h*scale)
# image_work = cv2.resize(image, (new_w, new_h))
# print(f"[GeometricDetector] Resized to {new_w}x{new_h}")
# else:
# image_work = image
# scale = 1.0
# # Detect circles with improved method
# outer_circle, inner_circle = self._detect_circles_improved(image_work)
# if outer_circle is None:
# print("[GeometricDetector] ✗ Circle detection failed")
# return []
# ox, oy, or_ = outer_circle
# ix, iy, ir = inner_circle
# # Scale back to original
# if scale != 1.0:
# ox, oy = int(ox/scale), int(oy/scale)
# or_, ir = int(or_/scale), int(ir/scale)
# h, w = image.shape[:2]
# print(f"[GeometricDetector] Outer: ({ox},{oy}) r={or_}, Inner: ({ix},{iy}) r={ir}")
# # Validate
# if or_ < 50 or or_ > min(h, w):
# print(f"[GeometricDetector] ✗ Invalid outer radius: {or_}")
# return []
# if ir >= or_:
# ir = int(or_ * 0.4)
# print(f"[GeometricDetector] Adjusted inner radius to {ir}")
# # Create annular mask
# annular_mask = np.zeros((h, w), dtype=np.uint8)
# cv2.circle(annular_mask, (ox, oy), or_, 255, -1)
# cv2.circle(annular_mask, (ix, iy), ir, 0, -1)
# mask_pixels = np.sum(annular_mask > 0)
# print(f"[GeometricDetector] Annular mask pixels: {mask_pixels}")
# if mask_pixels < 1000:
# print(f"[GeometricDetector] ✗ Mask too small")
# return []
# # Find saddles using contour detection instead of fixed quadrants
# saddles = self._detect_saddles_from_annular_region(image, annular_mask, (ox, oy), or_, ir)
# print(f"[GeometricDetector] Total saddles detected: {len(saddles)}")
# # Memory cleanup
# if ENABLE_GC:
# del annular_mask, image_work
# gc.collect()
# return saddles
# def _detect_saddles_from_annular_region(
# self,
# image: np.ndarray,
# annular_mask: np.ndarray,
# center: Tuple[int, int],
# outer_r: int,
# inner_r: int
# ) -> List[SaddleROI]:
# """
# Improved saddle detection using contour-based approach
# """
# h, w = annular_mask.shape
# # Apply mask to image
# if len(image.shape) == 3:
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# else:
# gray = image
# masked = cv2.bitwise_and(gray, gray, mask=annular_mask)
# # Enhance contrast
# masked = cv2.equalizeHist(masked)
# # Edge detection on masked region
# edges = cv2.Canny(masked, 50, 150)
# # Find contours
# contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# # Filter and group contours
# valid_contours = []
# for cnt in contours:
# area = cv2.contourArea(cnt)
# if area > 500: # Minimum area threshold
# valid_contours.append(cnt)
# if len(valid_contours) < 4:
# # Fallback to quadrant-based detection
# print("[GeometricDetector] Using quadrant fallback")
# return self._detect_quadrant_based(image, annular_mask, center, outer_r, inner_r)
# # Sort contours by position (left to right, top to bottom)
# valid_contours = sorted(valid_contours, key=lambda c: (cv2.boundingRect(c)[1], cv2.boundingRect(c)[0]))
# # Take top 4 contours
# saddles = []
# for idx, cnt in enumerate(valid_contours[:4]):
# x, y, w_box, h_box = cv2.boundingRect(cnt)
# # Validate size
# if w_box < 10 or h_box < 10:
# continue
# # Extract crop
# crop = image[y:y+h_box, x:x+w_box].copy()
# mask_crop = annular_mask[y:y+h_box, x:x+w_box].copy()
# crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop)
# # Calculate center
# M = cv2.moments(cnt)
# if M['m00'] != 0:
# cx = int(M['m10'] / M['m00'])
# cy = int(M['m01'] / M['m00'])
# else:
# cx = x + w_box // 2
# cy = y + h_box // 2
# area = int(M['m00'])
# saddles.append(SaddleROI(
# id=idx,
# bbox=(x, y, w_box, h_box),
# crop=crop_masked,
# center=(cx, cy),
# area=area,
# angle=0.0,
# confidence=0.8 # Geometric detection confidence
# ))
# return saddles
# def _detect_quadrant_based(
# self,
# image: np.ndarray,
# annular_mask: np.ndarray,
# center: Tuple[int, int],
# outer_r: int,
# inner_r: int
# ) -> List[SaddleROI]:
# """Fallback quadrant-based detection"""
# h, w = annular_mask.shape
# ox, oy = center
# quadrants = [
# (0, 90, "Top-Right"),
# (90, 180, "Top-Left"),
# (180, 270, "Bottom-Left"),
# (270, 360, "Bottom-Right")
# ]
# saddles = []
# for idx, (start_angle, end_angle, name) in enumerate(quadrants):
# mask_key = (h, w, ox, oy, outer_r, inner_r, start_angle, end_angle)
# sector_mask = self._mask_cache.get(mask_key)
# if sector_mask is None:
# sector_mask = self._create_sector_mask(
# (h, w), (ox, oy), outer_r, inner_r, start_angle, end_angle
# )
# self._mask_cache.put(mask_key, sector_mask)
# saddle_mask = cv2.bitwise_and(annular_mask, sector_mask)
# ys, xs = np.where(saddle_mask > 0)
# if len(xs) == 0 or len(ys) == 0:
# print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): No pixels")
# continue
# x_min, x_max = xs.min(), xs.max()
# y_min, y_max = ys.min(), ys.max()
# if (x_max - x_min) < 10 or (y_max - y_min) < 10:
# print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): Box too small")
# continue
# crop = image[y_min:y_max+1, x_min:x_max+1].copy()
# mask_crop = saddle_mask[y_min:y_max+1, x_min:x_max+1].copy()
# crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop)
# cx = int(np.mean(xs))
# cy = int(np.mean(ys))
# area = np.sum(saddle_mask > 0)
# saddles.append(SaddleROI(
# id=idx,
# bbox=(x_min, y_min, x_max - x_min, y_max - y_min),
# crop=crop_masked,
# center=(cx, cy),
# area=area,
# angle=0.0,
# confidence=1.0
# ))
# return saddles
# def _detect_circles_improved(self, image: np.ndarray) -> Tuple[Optional[Tuple], Optional[Tuple]]:
# """Improved circle detection with multiple strategies"""
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
# h, w = gray.shape
# # Strategy 1: Standard HoughCircles with optimized params
# gray_blur = cv2.GaussianBlur(gray, (9, 9), 2)
# minRadius = int(min(h, w) * 0.2)
# maxRadius = int(min(h, w) * 0.7)
# # Try multiple parameter sets
# param_sets = [
# (80, 25), # Standard
# (60, 30), # More lenient
# (100, 20), # More strict
# ]
# for param1, param2 in param_sets:
# circles = cv2.HoughCircles(
# gray_blur,
# cv2.HOUGH_GRADIENT,
# dp=1.0,
# minDist=80,
# param1=param1,
# param2=param2,
# minRadius=minRadius,
# maxRadius=maxRadius
# )
# if circles is not None and len(circles[0]) >= 1:
# circles = np.round(circles[0, :]).astype(int)
# circles = sorted(circles, key=lambda c: c[2], reverse=True)
# outer = tuple(circles[0])
# inner_radius = int(outer[2] * 0.4)
# inner = (outer[0], outer[1], inner_radius)
# print(f"✓ HoughCircles success (param1={param1}, param2={param2})")
# return outer, inner
# # Strategy 2: Contour-based detection
# print("→ Using contour fallback")
# edges = cv2.Canny(gray_blur, 30, 100)
# # Dilate to connect edges
# kernel = np.ones((3, 3), np.uint8)
# edges = cv2.dilate(edges, kernel, iterations=2)
# contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# if contours:
# # Find largest contour
# largest = max(contours, key=cv2.contourArea)
# (cx, cy), radius = cv2.minEnclosingCircle(largest)
# center_x, center_y = int(cx), int(cy)
# outer_radius = int(radius * 0.85)
# inner_radius = int(outer_radius * 0.4)
# print(f"✓ Contour-based: center=({center_x},{center_y}), radius={outer_radius}")
# return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius))
# # Strategy 3: Dimension-based estimate
# center_x, center_y = w // 2, h // 2
# outer_radius = min(w, h) // 2 - 50
# inner_radius = int(outer_radius * 0.4)
# print(f"→ Using dimension estimate: center=({center_x},{center_y}), radius={outer_radius}")
# return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius))
# def _create_sector_mask(self, shape, center, outer_r, inner_r, start_angle, end_angle):
# """Create sector mask with caching"""
# h, w = shape
# mask = np.zeros((h, w), dtype=np.uint8)
# y, x = np.ogrid[:h, :w]
# dx = x - center[0]
# dy = y - center[1]
# angles = np.rad2deg(np.arctan2(dy, dx)) % 360
# radius = np.sqrt(dx**2 + dy**2)
# if start_angle < end_angle:
# angle_mask = (angles >= start_angle) & (angles < end_angle)
# else:
# angle_mask = (angles >= start_angle) | (angles < end_angle)
# radius_mask = (radius >= inner_r) & (radius <= outer_r)
# sector = angle_mask & radius_mask
# mask[sector] = 255
# return mask
# def cleanup(self):
# """Release cache"""
# self._mask_cache.clear()
# class AlignmentCorrector:
# """Affine transformation for rotation correction"""
# @staticmethod
# def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray:
# h, w = image.shape[:2]
# center = (w // 2, h // 2)
# M = cv2.getRotationMatrix2D(center, -angle, 1.0)
# cos = np.abs(M[0, 0])
# sin = np.abs(M[0, 1])
# new_w = int((h * sin) + (w * cos))
# new_h = int((h * cos) + (w * sin))
# M[0, 2] += (new_w / 2) - center[0]
# M[1, 2] += (new_h / 2) - center[1]
# rotated = cv2.warpAffine(image, M, (new_w, new_h),
# flags=cv2.INTER_LINEAR,
# borderMode=cv2.BORDER_CONSTANT,
# borderValue=(0, 0, 0))
# return rotated
# @staticmethod
# def correct_saddle(saddle: SaddleROI) -> np.ndarray:
# if abs(saddle.angle) < 1.0:
# return saddle.crop
# return AlignmentCorrector.correct_rotation(saddle.crop, saddle.angle)
# class CNNFeatureExtractor:
# """ResNet-18 feature extraction with memory optimization"""
# def __init__(self, onnx_path: str = "resnet18_features.onnx"):
# self.onnx_path = onnx_path
# self.session = None
# self.model = None
# self.device = None
# self.feature_dim = 1024
# self._initialized = False
# def _lazy_init(self):
# if self._initialized:
# return
# self._initialized = True
# # Use PyTorch with fused features
# self._init_pytorch_model()
# def _init_pytorch_model(self):
# torch = _get_torch()
# torchvision = _get_torchvision()
# from torchvision.models import resnet18, ResNet18_Weights
# import torch.nn as nn
# self.device = torch.device('cpu')
# weights = ResNet18_Weights.IMAGENET1K_V1
# base_model = resnet18(weights=weights)
# class FeatureFusionModel(nn.Module):
# def __init__(self, base):
# super().__init__()
# self.conv1 = base.conv1
# self.bn1 = base.bn1
# self.relu = base.relu
# self.maxpool = base.maxpool
# self.layer1 = base.layer1
# self.layer2 = base.layer2
# self.layer3 = base.layer3
# self.layer4 = base.layer4
# self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
# self.maxpool_global = nn.AdaptiveMaxPool2d((1, 1))
# def forward(self, x):
# x = self.conv1(x)
# x = self.bn1(x)
# x = self.relu(x)
# x = self.maxpool(x)
# x = self.layer1(x)
# x = self.layer2(x)
# l3 = self.layer3(x)
# l4 = self.layer4(l3)
# f3_avg = self.avgpool(l3).view(l3.size(0), -1)
# f3_max = self.maxpool_global(l3).view(l3.size(0), -1)
# f4_avg = self.avgpool(l4).view(l4.size(0), -1)
# return torch.cat([f3_avg, f3_max, f4_avg], dim=1)
# self.model = FeatureFusionModel(base_model)
# self.model.to(self.device)
# self.model.eval()
# print(f"✓ PyTorch FeatureFusion ResNet-18 (dim={self.feature_dim})")
# def _preprocess_image(self, image: np.ndarray) -> np.ndarray:
# if image is None or image.size == 0:
# return np.zeros((3, 224, 224), dtype=np.float32)
# if len(image.shape) == 2:
# image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
# elif len(image.shape) == 3 and image.shape[2] == 4:
# image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB)
# elif len(image.shape) == 3 and image.shape[2] == 3:
# image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# image = cv2.resize(image, (224, 224))
# image = image.astype(np.float32) / 255.0
# mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
# std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
# image = (image - mean) / std
# image = image.transpose(2, 0, 1)
# return image
# def extract_features(self, image: np.ndarray) -> np.ndarray:
# self._lazy_init()
# batch = self._preprocess_image(image)[np.newaxis, ...]
# features = self.extract_batch(batch)
# return features[0]
# def extract_batch(self, images: List[np.ndarray]) -> np.ndarray:
# self._lazy_init()
# if isinstance(images, np.ndarray) and len(images.shape) == 4:
# batch = images.astype(np.float32)
# else:
# preprocessed = [self._preprocess_image(img) for img in images]
# batch = np.stack(preprocessed, axis=0).astype(np.float32)
# torch = _get_torch()
# batch_tensor = torch.from_numpy(batch).to(self.device)
# with torch.no_grad():
# features = self.model(batch_tensor)
# features = features.squeeze(-1).squeeze(-1).cpu().numpy()
# norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8
# features = features / norms
# # Memory cleanup
# if ENABLE_GC:
# del batch_tensor
# gc.collect()
# return features
# def compute_similarity(self, features1: np.ndarray, features2: np.ndarray) -> float:
# similarity = np.dot(features1, features2) / (
# np.linalg.norm(features1) * np.linalg.norm(features2) + 1e-8
# )
# return float(np.clip(similarity, 0, 1))
# def cleanup(self):
# """Release model from memory"""
# if self.model is not None:
# del self.model
# self.model = None
# if ENABLE_GC:
# gc.collect()
# class GeometricConstraintValidator:
# """Validate saddle positions using geometric constraints"""
# def __init__(self, expected_count: int = 4):
# self.expected_count = expected_count
# self.reference_positions = None
# self.reference_distances = {}
# def set_reference_positions(self, saddles: List[SaddleROI]):
# if len(saddles) != self.expected_count:
# print(f"⚠ Reference has {len(saddles)} saddles, expected {self.expected_count}")
# return False
# centers = [s.center for s in saddles]
# centroid = (
# int(np.mean([c[0] for c in centers])),
# int(np.mean([c[1] for c in centers]))
# )
# self.reference_positions = [
# (c[0] - centroid[0], c[1] - centroid[1]) for c in centers
# ]
# self.reference_distances = {}
# for i in range(len(saddles)):
# for j in range(i+1, len(saddles)):
# dist = np.linalg.norm(
# np.array(saddles[i].center) - np.array(saddles[j].center)
# )
# self.reference_distances[(i, j)] = dist
# print(f"✓ Reference geometry learned: {len(saddles)} saddles")
# return True
# def infer_missing_saddles(
# self,
# detected_saddles: List[SaddleROI],
# image: np.ndarray
# ) -> List[SaddleROI]:
# if len(detected_saddles) >= self.expected_count:
# return detected_saddles
# if self.reference_positions is None:
# print("⚠ No reference geometry")
# return detected_saddles
# if len(detected_saddles) < 2:
# print("⚠ Need at least 2 saddles to infer")
# return detected_saddles
# centers = [s.center for s in detected_saddles]
# centroid = (
# int(np.mean([c[0] for c in centers])),
# int(np.mean([c[1] for c in centers]))
# )
# scale = self._estimate_scale(detected_saddles)
# detected_ids = {s.id for s in detected_saddles}
# all_saddles = list(detected_saddles)
# for expected_id in range(self.expected_count):
# if expected_id not in detected_ids:
# rel_x, rel_y = self.reference_positions[expected_id]
# expected_x = int(centroid[0] + rel_x * scale)
# expected_y = int(centroid[1] + rel_y * scale)
# crop_size = 100
# x_min = max(0, expected_x - crop_size // 2)
# y_min = max(0, expected_y - crop_size // 2)
# x_max = min(image.shape[1], expected_x + crop_size // 2)
# y_max = min(image.shape[0], expected_y + crop_size // 2)
# crop = image[y_min:y_max, x_min:x_max].copy()
# inferred = SaddleROI(
# id=expected_id,
# bbox=(x_min, y_min, x_max - x_min, y_max - y_min),
# crop=crop,
# center=(expected_x, expected_y),
# area=(x_max - x_min) * (y_max - y_min),
# angle=0.0,
# confidence=0.0
# )
# all_saddles.append(inferred)
# print(f" ⚙ Inferred Saddle {expected_id} at ({expected_x}, {expected_y})")
# all_saddles.sort(key=lambda s: s.id)
# return all_saddles
# def _estimate_scale(self, saddles: List[SaddleROI]) -> float:
# if len(saddles) < 2 or not self.reference_distances:
# return 1.0
# if len(saddles) >= 2:
# s0, s1 = saddles[0], saddles[1]
# current_dist = np.linalg.norm(
# np.array(s0.center) - np.array(s1.center)
# )
# key = (min(s0.id, s1.id), max(s0.id, s1.id))
# if key in self.reference_distances:
# ref_dist = self.reference_distances[key]
# scale = current_dist / ref_dist
# return scale
# return 1.0
# def validate_geometry(self, saddles: List[SaddleROI], tolerance: float = 0.05) -> Tuple[bool, str]:
# if len(saddles) != self.expected_count:
# return False, f"Expected {self.expected_count} saddles, got {len(saddles)}"
# if self.reference_distances is None or not self.reference_distances:
# return True, "No reference geometry to validate"
# sorted_saddles = sorted(saddles, key=lambda s: s.id)
# centers = [np.array(s.center) for s in sorted_saddles]
# # Collinearity check
# if len(centers) >= 3:
# v1 = centers[1] - centers[0]
# for i in range(2, len(centers)):
# v2 = centers[i] - centers[0]
# cross = abs(v1[0] * v2[1] - v1[1] * v2[0])
# line_length = np.linalg.norm(v1) * np.linalg.norm(v2)
# if line_length > 0:
# deviation = cross / line_length
# if deviation > 0.1:
# return False, f"Saddles not collinear (deviation: {deviation:.2f})"
# # Distance validation
# for i in range(len(sorted_saddles) - 1):
# s0, s1 = sorted_saddles[i], sorted_saddles[i + 1]
# current_dist = np.linalg.norm(centers[i + 1] - centers[i])
# key = (min(s0.id, s1.id), max(s0.id, s1.id))
# if key in self.reference_distances:
# ref_dist = self.reference_distances[key]
# if ref_dist > 0:
# deviation = abs(current_dist - ref_dist) / ref_dist
# if deviation > tolerance:
# return False, f"Distance S{s0.id}-S{s1.id} off by {deviation*100:.1f}%"
# return True, "Geometry valid"
# class MultiReferenceManager:
# """Manage multiple golden template images with memory optimization"""
# def __init__(self, feature_extractor: CNNFeatureExtractor):
# self.feature_extractor = feature_extractor
# self.references = []
# self.max_references = 20 if not IS_CLOUD else 10
# def add_reference(self, image: np.ndarray, saddles: List[SaddleROI]) -> bool:
# if len(saddles) != 4:
# print(f"⚠ Reference must have 4 saddles, got {len(saddles)}")
# return False
# # Limit number of references
# if len(self.references) >= self.max_references:
# print(f"⚠ Max references ({self.max_references}) reached, removing oldest")
# old_ref = self.references.pop(0)
# # Cleanup
# if ENABLE_GC:
# del old_ref
# gc.collect()
# features_list = []
# for saddle in saddles:
# aligned_crop = AlignmentCorrector.correct_saddle(saddle)
# features = self.feature_extractor.extract_features(aligned_crop)
# features_list.append(features)
# # Compress image for storage
# if image.shape[0] > 800 or image.shape[1] > 800:
# scale = min(800/image.shape[1], 800/image.shape[0])
# new_w, new_h = int(image.shape[1]*scale), int(image.shape[0]*scale)
# image_stored = cv2.resize(image, (new_w, new_h))
# else:
# image_stored = image
# self.references.append({
# 'image': image_stored,
# 'saddles': saddles,
# 'features': features_list
# })
# print(f"✓ Reference {len(self.references)} added")
# return True
# def match_saddle(self, saddle: SaddleROI) -> Tuple[float, float]:
# if not self.references:
# return 0.0, 999.0
# aligned_crop = AlignmentCorrector.correct_saddle(saddle)
# test_features = self.feature_extractor.extract_features(aligned_crop)
# similarities = []
# distances = []
# for ref in self.references:
# if saddle.id < len(ref['features']):
# ref_features = ref['features'][saddle.id]
# sim = self.feature_extractor.compute_similarity(test_features, ref_features)
# dist = np.linalg.norm(test_features - ref_features)
# similarities.append(sim)
# distances.append(dist)
# if similarities:
# best_sim = max(similarities)
# avg_dist = np.mean(distances)
# return best_sim, avg_dist
# return 0.0, 999.0
# def get_reference_count(self) -> int:
# return len(self.references)
# def get_references(self) -> List[str]:
# refs = []
# for ref in self.references:
# try:
# _, buffer = cv2.imencode('.jpg', ref['image'], [cv2.IMWRITE_JPEG_QUALITY, 50])
# import base64
# img_str = base64.b64encode(buffer).decode('utf-8')
# refs.append(f"data:image/jpeg;base64,{img_str}")
# except:
# continue
# return refs
# def cleanup(self):
# """Release all references"""
# self.references.clear()
# if ENABLE_GC:
# gc.collect()
# class AdvancedBlockInspector:
# """
# Advanced Engine Block Inspector - CLOUD OPTIMIZED
# Memory leak fixes:
# - LRU caching with limits
# - Automatic cleanup after processing
# - Garbage collection triggers
# - Reference counting limits
# Improved 4-saddle detection:
# - Contour-based detection
# - Multiple circle detection strategies
# - Structure validation integration
# - Geometric validator integration
# """
# def __init__(self, yolo_model_path: Optional[str] = None, onnx_path: str = "resnet18_features.onnx"):
# self.yolo_detector = YOLOOBBDetector(yolo_model_path)
# self.geometric_detector = GeometricSaddleDetector()
# self.cnn_extractor = CNNFeatureExtractor(onnx_path=onnx_path)
# self.reference_manager = MultiReferenceManager(self.cnn_extractor)
# # Use optimized validator if available
# if OPTIMIZED_VALIDATOR_AVAILABLE:
# self.geo_validator = OptimizedGeometricValidator(expected_count=4)
# self.use_optimized_validator = True
# print("✓ Optimized Validator: RANSAC + Physical Dims")
# else:
# self.geo_validator = GeometricConstraintValidator(expected_count=4)
# self.use_optimized_validator = False
# # Structure validator
# self.structure_validator = None
# if STRUCTURE_VALIDATOR_AVAILABLE:
# self.structure_validator = SaddleStructureValidator(
# arc_center_tolerance=0.05,
# min_arc_length_ratio=0.6,
# min_structure_confidence=0.65
# )
# print("✓ Structure Validator: Semicircle + Middle Arc")
# self.config = {
# 'expected_saddles': 4,
# 'similarity_threshold': 0.88,
# 'use_yolo': self.yolo_detector.model_loaded,
# 'use_geometric_fallback': True,
# 'use_alignment_correction': True,
# 'infer_missing_saddles': True,
# 'yolo_confidence': 0.25,
# 'use_geometry_validation': True,
# 'use_structure_validation': STRUCTURE_VALIDATOR_AVAILABLE,
# 'use_optimized_validator': self.use_optimized_validator,
# 'min_brightness': 20.0,
# }
# # Memory tracking
# self.last_saddles = []
# self.last_image = None
# print("[OK] Advanced Inspector initialized (Cloud-Optimized)")
# print(f" YOLO-OBB: {'Enabled' if self.config['use_yolo'] else 'Disabled'}")
# print(f" Max cache size: {MAX_CACHE_SIZE}")
# print(f" GC enabled: {ENABLE_GC}")
# def add_reference_image(self, image: np.ndarray) -> bool:
# """Add a golden template image"""
# try:
# # Quality check
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
# brightness = np.mean(gray)
# if brightness < 20:
# print(f"✗ Reference too dark (brightness: {brightness:.1f})")
# return False
# # Detect saddles
# if self.config['use_yolo']:
# saddles = self.yolo_detector.detect_saddles(
# image, conf_threshold=self.config['yolo_confidence']
# )
# else:
# saddles = []
# if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']:
# print("→ Falling back to geometric detection")
# saddles = self.geometric_detector.detect_saddles(image)
# # Validate with structure validator
# if self.structure_validator and len(saddles) > 0:
# for saddle in saddles:
# if saddle.crop is not None and saddle.crop.size > 0:
# saddle.structure = self.structure_validator.validate_structure(saddle.crop)
# # Use optimized validator to refine
# if self.use_optimized_validator and hasattr(self.geo_validator, 'validate_and_refine'):
# saddles, _ = self.geo_validator.validate_and_refine(saddles, image)
# if len(saddles) != self.config['expected_saddles']:
# print(f"✗ Reference failed: {len(saddles)}/{self.config['expected_saddles']} saddles")
# return False
# success = self.reference_manager.add_reference(image, saddles)
# if success and self.reference_manager.get_reference_count() == 1:
# self.geo_validator.set_reference_positions(saddles)
# # Cleanup
# self._cleanup_after_processing()
# return success
# except Exception as e:
# print(f"✗ Reference error: {e}")
# import traceback
# traceback.print_exc()
# return False
# def inspect_block(self, image: np.ndarray) -> BlockInspectionResult:
# """Inspect engine block with memory optimization"""
# start_time = time.time()
# try:
# # Quality check
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
# brightness = np.mean(gray)
# if brightness < 15:
# return BlockInspectionResult(
# block_status='WASTE_IMAGE',
# total_saddles=self.config['expected_saddles'],
# detected_saddles=0,
# defective_saddles=0,
# saddle_results=[],
# processing_time_ms=(time.time() - start_time) * 1000,
# alignment_status='REJECTED_DARK',
# block_angle=0.0
# )
# # Resize if needed
# h, w = image.shape[:2]
# if w > 800 or h > 600:
# image = cv2.resize(image, (640, 480))
# # Detect saddles
# if self.config['use_yolo']:
# saddles = self.yolo_detector.detect_saddles(
# image, conf_threshold=self.config['yolo_confidence']
# )
# else:
# saddles = []
# if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']:
# print("→ Falling back to geometric detection")
# saddles = self.geometric_detector.detect_saddles(image)
# detected_count = len(saddles)
# # Validate and refine with optimized validator
# if self.use_optimized_validator and hasattr(self.geo_validator, 'validate_and_refine'):
# saddles, refine_report = self.geo_validator.validate_and_refine(saddles, image)
# print(f" Refined: {refine_report}")
# elif len(saddles) < self.config['expected_saddles'] and self.config['infer_missing_saddles']:
# print(f" Inferring {self.config['expected_saddles'] - len(saddles)} missing saddles")
# saddles = self.geo_validator.infer_missing_saddles(saddles, image)
# # Structure validation
# if self.structure_validator and self.config.get('use_structure_validation', False):
# for saddle in saddles:
# if saddle.crop is not None and saddle.crop.size > 0:
# saddle.structure = self.structure_validator.validate_structure(saddle.crop)
# # Validate count
# if len(saddles) != self.config['expected_saddles']:
# alignment_status = f"ERROR: {len(saddles)}/{self.config['expected_saddles']} saddles"
# return self._error_result(alignment_status, 0.0, saddles, detected_count)
# # Geometry validation
# if self.config.get('use_geometry_validation', True):
# is_valid, reason = self.geo_validator.validate_geometry(saddles, 0.05)
# if not is_valid:
# return self._error_result(f"GEOMETRY: {reason}", 0.0, saddles, detected_count)
# alignment_status = "OK"
# block_angle = np.mean([s.angle for s in saddles])
# # Batch feature extraction
# aligned_crops = [AlignmentCorrector.correct_saddle(s) for s in saddles]
# batch_features = self.cnn_extractor.extract_batch(aligned_crops)
# # Compare against references
# saddle_results = []
# for i, saddle in enumerate(saddles):
# test_features = batch_features[i]
# best_sim = 0.0
# avg_dist = 999.0
# for ref in self.reference_manager.references:
# if saddle.id < len(ref['features']):
# ref_features = ref['features'][saddle.id]
# sim = self.cnn_extractor.compute_similarity(test_features, ref_features)
# dist = np.linalg.norm(test_features - ref_features)
# if sim > best_sim:
# best_sim = sim
# avg_dist = dist
# status = 'PERFECT' if best_sim >= self.config['similarity_threshold'] else 'DEFECTIVE'
# saddle_results.append(SaddleInspectionResult(
# saddle_id=saddle.id,
# status=status,
# similarity_score=best_sim,
# confidence=best_sim,
# feature_distance=avg_dist,
# detected=saddle.confidence > 0.0
# ))
# # Block decision
# defective_count = sum(1 for r in saddle_results if r.status == 'DEFECTIVE')
# avg_sim = np.mean([r.similarity_score for r in saddle_results]) if saddle_results else 0
# if defective_count == self.config['expected_saddles'] and avg_sim < 0.70:
# block_status = 'WASTE_IMAGE'
# elif detected_count == 0:
# if avg_sim < 0.75:
# block_status = 'WASTE_IMAGE'
# else:
# block_status = 'DEFECTIVE'
# else:
# block_status = 'DEFECTIVE' if defective_count > 0 else 'PERFECT'
# processing_time = (time.time() - start_time) * 1000
# self.last_saddles = saddles
# self.last_image = image
# result = BlockInspectionResult(
# block_status=block_status,
# total_saddles=self.config['expected_saddles'],
# detected_saddles=detected_count,
# defective_saddles=defective_count,
# saddle_results=saddle_results,
# processing_time_ms=processing_time,
# alignment_status=alignment_status,
# block_angle=block_angle
# )
# # Cleanup
# self._cleanup_after_processing()
# return result
# except Exception as e:
# print(f"✗ Inspection error: {e}")
# import traceback
# traceback.print_exc()
# return self._error_result(str(e), 0.0, [], 0)
# def get_reference_images(self) -> List[str]:
# return self.reference_manager.get_references()
# def _error_result(self, message: str, angle: float, saddles: List, detected_count: int) -> BlockInspectionResult:
# self.last_saddles = saddles
# return BlockInspectionResult(
# block_status='ERROR',
# total_saddles=self.config['expected_saddles'],
# detected_saddles=detected_count,
# defective_saddles=0,
# saddle_results=[],
# processing_time_ms=0.0,
# alignment_status=message,
# block_angle=angle
# )
# def visualize_results(
# self,
# original_image: np.ndarray,
# saddles: List[SaddleROI],
# results: List[SaddleInspectionResult]
# ) -> np.ndarray:
# """Create detailed visualization"""
# vis = original_image.copy()
# for saddle, result in zip(saddles, results):
# if not result.detected:
# color = (255, 165, 0) # Orange
# elif result.status == 'PERFECT':
# color = (0, 255, 0) # Green
# else:
# color = (0, 0, 255) # Red
# x, y, w, h = saddle.bbox
# cv2.rectangle(vis, (x, y), (x+w, y+h), color, 3)
# cx, cy = saddle.center
# cv2.circle(vis, (cx, cy), 10, color, -1)
# if abs(saddle.angle) > 1.0:
# rad = np.deg2rad(saddle.angle)
# end_x = int(cx + 30 * np.cos(rad))
# end_y = int(cy + 30 * np.sin(rad))
# cv2.arrowedLine(vis, (cx, cy), (end_x, end_y), color, 3)
# detection_marker = "" if result.detected else "⚙"
# label = f"{detection_marker} S{saddle.id}: {result.status}"
# sim_text = f"{result.similarity_score:.3f}"
# angle_text = f"{saddle.angle:.1f}°"
# cv2.rectangle(vis, (cx-70, cy-70), (cx+70, cy+40), color, -1)
# cv2.putText(vis, label, (cx-65, cy-45),
# cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2)
# cv2.putText(vis, sim_text, (cx-65, cy-20),
# cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
# cv2.putText(vis, f"C:{saddle.confidence:.2f}", (cx-65, cy+28),
# cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
# # Add angle info if it doesn't overlap text
# # (Simplified visualization for better performance)
# return vis
# def _cleanup_after_processing(self):
# """Clean up resources after processing"""
# # Clear old saddles
# for saddle in self.last_saddles:
# if hasattr(saddle, 'cleanup'):
# saddle.cleanup()
# # Trigger garbage collection
# if ENABLE_GC:
# gc.collect()
# def cleanup(self):
# """Full cleanup - call when done"""
# print("Cleaning up resources...")
# # Cleanup components
# if hasattr(self.yolo_detector, 'cleanup'):
# self.yolo_detector.cleanup()
# if hasattr(self.geometric_detector, 'cleanup'):
# self.geometric_detector.cleanup()
# if hasattr(self.cnn_extractor, 'cleanup'):
# self.cnn_extractor.cleanup()
# if hasattr(self.reference_manager, 'cleanup'):
# self.reference_manager.cleanup()
# # Clear state
# self.last_saddles.clear()
# self.last_image = None
# # Force garbage collection
# if ENABLE_GC:
# gc.collect()
# print("✓ Cleanup complete")
import cv2
import numpy as np
from PIL import Image
import io
import json
from datetime import datetime
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
import time
import os
import warnings
import gc
from collections import OrderedDict
# ============================================================================
# CLOUD DEPLOYMENT OPTIMIZATIONS
# ============================================================================
IS_HF_SPACE = os.environ.get('SPACE_ID') is not None or os.environ.get('HF_SPACE') is not None
IS_RENDER = os.environ.get('RENDER') is not None
IS_CLOUD = IS_HF_SPACE or IS_RENDER
# Memory limits for cloud
MAX_IMAGE_SIZE = (1280, 960) if IS_CLOUD else (1920, 1440)
MAX_CACHE_SIZE = 50 if IS_CLOUD else 100
ENABLE_GC = IS_CLOUD
# Lazy imports for faster cold start
_torch = None
_torchvision = None
_YOLO = None
def _get_torch():
global _torch
if _torch is None:
import torch
_torch = torch
if IS_CLOUD:
_torch.set_num_threads(2)
return _torch
def _get_torchvision():
global _torchvision
if _torchvision is None:
import torchvision
_torchvision = torchvision
return _torchvision
def _get_yolo():
global _YOLO
if _YOLO is None:
try:
from ultralytics import YOLO
_YOLO = YOLO
except ImportError:
_YOLO = False
return _YOLO
def _yolo_available():
yolo = _get_yolo()
return yolo is not False and yolo is not None
warnings.filterwarnings('ignore')
# Import optimized validators
try:
from saddle_structure_validator import SaddleStructureValidator, SaddleStructure
STRUCTURE_VALIDATOR_AVAILABLE = True
except ImportError:
STRUCTURE_VALIDATOR_AVAILABLE = False
SaddleStructure = None
print("[WARN] saddle_structure_validator not available")
try:
from geometric_validator import OptimizedGeometricValidator, PhysicalDimensions, PixelToMMCalibrator
OPTIMIZED_VALIDATOR_AVAILABLE = True
except ImportError:
OPTIMIZED_VALIDATOR_AVAILABLE = False
print("[WARN] geometric_validator not available")
if IS_HF_SPACE:
print("[HF Space] Memory-optimized mode enabled")
elif IS_RENDER:
print("[Render] Cloud-optimized mode enabled")
@dataclass
class SaddleROI:
"""Saddle region of interest with rotation and structure info"""
id: int
bbox: Tuple[int, int, int, int]
crop: np.ndarray
center: Tuple[int, int]
area: int
angle: float
confidence: float
structure: Optional[any] = None
def to_dict(self):
result = {
'id': int(self.id),
'bbox': [int(x) for x in self.bbox],
'center': [int(self.center[0]), int(self.center[1])],
'area': int(self.area),
'angle': float(self.angle),
'confidence': float(self.confidence)
}
if self.structure is not None:
result['has_semicircle'] = bool(getattr(self.structure, 'has_semicircle', False))
result['has_middle_arc'] = bool(getattr(self.structure, 'has_middle_arc', False))
result['structure_confidence'] = float(getattr(self.structure, 'structure_confidence', 0.0))
return result
def cleanup(self):
"""Free memory"""
if hasattr(self, 'crop'):
del self.crop
self.crop = None
self.structure = None
@dataclass
class SaddleInspectionResult:
"""Inspection result for one saddle"""
saddle_id: int
status: str
similarity_score: float
confidence: float
feature_distance: float
detected: bool
def to_dict(self):
return {
'saddle_id': int(self.saddle_id),
'status': str(self.status),
'similarity_score': float(self.similarity_score),
'confidence': float(self.confidence),
'feature_distance': float(self.feature_distance),
'detected': bool(self.detected)
}
@dataclass
class BlockInspectionResult:
"""Complete block inspection result"""
block_status: str
total_saddles: int
detected_saddles: int
defective_saddles: int
saddle_results: List[SaddleInspectionResult]
processing_time_ms: float
alignment_status: str
block_angle: float
def to_dict(self):
return {
'block_status': str(self.block_status),
'total_saddles': int(self.total_saddles),
'detected_saddles': int(self.detected_saddles),
'defective_saddles': int(self.defective_saddles),
'saddle_results': [r.to_dict() for r in self.saddle_results],
'processing_time_ms': float(self.processing_time_ms),
'alignment_status': str(self.alignment_status),
'block_angle': float(self.block_angle)
}
class LRUCache:
"""Thread-safe LRU cache with size limit"""
def __init__(self, max_size=100):
self.cache = OrderedDict()
self.max_size = max_size
def get(self, key):
if key in self.cache:
self.cache.move_to_end(key)
return self.cache[key]
return None
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.max_size:
self.cache.popitem(last=False)
def clear(self):
self.cache.clear()
if ENABLE_GC:
gc.collect()
class YOLOOBBDetector:
"""YOLO-OBB Detector with lazy loading and memory management"""
def __init__(self, model_path: Optional[str] = None):
self.model = None
self.model_loaded = False
self._model_path = model_path if model_path else 'yolo11n-obb.pt'
self._initialized = False
def _lazy_init(self):
if self._initialized:
return
self._initialized = True
if not _yolo_available():
print("⚠ YOLO not available - will use geometric fallback")
return
YOLO = _get_yolo()
try:
self.model = YOLO(self._model_path)
self.model_loaded = True
print(f"✓ YOLO-OBB loaded: {self._model_path}")
except Exception as e:
print(f"⚠ YOLO load failed: {e} - will use geometric fallback")
self.model_loaded = False
@property
def available(self):
if not self._initialized:
self._lazy_init()
return self.model_loaded
def detect_saddles(self, image: np.ndarray, conf_threshold: float = 0.20) -> List[SaddleROI]:
"""Detect saddles using YOLO-OBB with memory optimization"""
if not self.available or self.model is None:
return []
# Resize if too large
h, w = image.shape[:2]
if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]:
scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h)
new_w, new_h = int(w*scale), int(h*scale)
image_resized = cv2.resize(image, (new_w, new_h))
else:
image_resized = image
scale = 1.0
# Convert to grayscale for faster inference
if len(image_resized.shape) == 3:
gray = cv2.cvtColor(image_resized, cv2.COLOR_BGR2GRAY)
gray_3ch = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
else:
gray_3ch = cv2.cvtColor(image_resized, cv2.COLOR_GRAY2BGR)
results = self.model(gray_3ch, conf=conf_threshold, verbose=False)
saddles = []
for result in results:
if hasattr(result, 'obb') and result.obb is not None:
for i, obb in enumerate(result.obb):
xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy()
conf = float(obb.conf[0])
# Scale back to original coordinates
xyxyxyxy = xyxyxyxy / scale
angle = self._calculate_angle(xyxyxyxy)
x_coords = xyxyxyxy[:, 0]
y_coords = xyxyxyxy[:, 1]
x_min, x_max = int(x_coords.min()), int(x_coords.max())
y_min, y_max = int(y_coords.min()), int(y_coords.max())
w = x_max - x_min
h = y_max - y_min
cx = int(np.mean(x_coords))
cy = int(np.mean(y_coords))
# Crop from original image
crop = image[y_min:y_max, x_min:x_max].copy()
area = w * h
saddles.append(SaddleROI(
id=i,
bbox=(x_min, y_min, w, h),
crop=crop,
center=(cx, cy),
area=area,
angle=angle,
confidence=conf
))
elif hasattr(result, 'boxes') and result.boxes is not None:
for i, box in enumerate(result.boxes):
xyxy = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0])
# Scale back
xyxy = xyxy / scale
x_min, y_min, x_max, y_max = map(int, xyxy)
w = x_max - x_min
h = y_max - y_min
cx = (x_min + x_max) // 2
cy = (y_min + y_max) // 2
crop = image[y_min:y_max, x_min:x_max].copy()
area = w * h
saddles.append(SaddleROI(
id=i,
bbox=(x_min, y_min, w, h),
crop=crop,
center=(cx, cy),
area=area,
angle=0.0,
confidence=conf
))
# Sort by position
saddles.sort(key=lambda s: (s.center[1], s.center[0]))
for i, saddle in enumerate(saddles):
saddle.id = i
print(f"✓ YOLO detected: {len(saddles)} saddles")
# Memory cleanup
if ENABLE_GC:
del gray_3ch, results
gc.collect()
return saddles
def _calculate_angle(self, xyxyxyxy: np.ndarray) -> float:
p1 = xyxyxyxy[0]
p2 = xyxyxyxy[1]
dx = p2[0] - p1[0]
dy = p2[1] - p1[1]
angle = np.degrees(np.arctan2(dy, dx))
if angle > 180:
angle -= 360
elif angle < -180:
angle += 360
return float(angle)
def cleanup(self):
"""Release model from memory"""
if self.model is not None:
del self.model
self.model = None
self.model_loaded = False
if ENABLE_GC:
gc.collect()
class GeometricSaddleDetector:
"""Optimized geometric detector with better 4-saddle detection"""
def __init__(self, max_cache_size=None):
if max_cache_size is None:
max_cache_size = MAX_CACHE_SIZE
self._mask_cache = LRUCache(max_cache_size)
def detect_saddles(self, image: np.ndarray) -> List[SaddleROI]:
"""Detect using improved geometric segmentation"""
print(f"[GeometricDetector] Input shape: {image.shape}")
h, w = image.shape[:2]
# Resize if too large
if w > MAX_IMAGE_SIZE[0] or h > MAX_IMAGE_SIZE[1]:
scale = min(MAX_IMAGE_SIZE[0]/w, MAX_IMAGE_SIZE[1]/h)
new_w, new_h = int(w*scale), int(h*scale)
image_work = cv2.resize(image, (new_w, new_h))
print(f"[GeometricDetector] Resized to {new_w}x{new_h}")
else:
image_work = image
scale = 1.0
# Detect circles with improved method
outer_circle, inner_circle = self._detect_circles_improved(image_work)
if outer_circle is None:
print("[GeometricDetector] ✗ Circle detection failed")
return []
ox, oy, or_ = outer_circle
ix, iy, ir = inner_circle
# Scale back to original
if scale != 1.0:
ox, oy = int(ox/scale), int(oy/scale)
or_, ir = int(or_/scale), int(ir/scale)
h, w = image.shape[:2]
print(f"[GeometricDetector] Outer: ({ox},{oy}) r={or_}, Inner: ({ix},{iy}) r={ir}")
# Validate
if or_ < 50 or or_ > min(h, w):
print(f"[GeometricDetector] ✗ Invalid outer radius: {or_}")
return []
if ir >= or_:
ir = int(or_ * 0.4)
print(f"[GeometricDetector] Adjusted inner radius to {ir}")
# Create annular mask
annular_mask = np.zeros((h, w), dtype=np.uint8)
cv2.circle(annular_mask, (ox, oy), or_, 255, -1)
cv2.circle(annular_mask, (ix, iy), ir, 0, -1)
mask_pixels = np.sum(annular_mask > 0)
print(f"[GeometricDetector] Annular mask pixels: {mask_pixels}")
if mask_pixels < 1000:
print(f"[GeometricDetector] ✗ Mask too small")
return []
# Find saddles using quadrant-based detection (more reliable for 4 saddles)
saddles = self._detect_quadrant_based(image, annular_mask, (ox, oy), or_, ir)
print(f"[GeometricDetector] Total saddles detected: {len(saddles)}")
# Memory cleanup
if ENABLE_GC:
del annular_mask, image_work
gc.collect()
return saddles
def _detect_quadrant_based(
self,
image: np.ndarray,
annular_mask: np.ndarray,
center: Tuple[int, int],
outer_r: int,
inner_r: int
) -> List[SaddleROI]:
"""Quadrant-based detection for 4 saddles"""
h, w = annular_mask.shape
ox, oy = center
quadrants = [
(0, 90, "Top-Right"),
(90, 180, "Top-Left"),
(180, 270, "Bottom-Left"),
(270, 360, "Bottom-Right")
]
saddles = []
for idx, (start_angle, end_angle, name) in enumerate(quadrants):
mask_key = (h, w, ox, oy, outer_r, inner_r, start_angle, end_angle)
sector_mask = self._mask_cache.get(mask_key)
if sector_mask is None:
sector_mask = self._create_sector_mask(
(h, w), (ox, oy), outer_r, inner_r, start_angle, end_angle
)
self._mask_cache.put(mask_key, sector_mask)
saddle_mask = cv2.bitwise_and(annular_mask, sector_mask)
ys, xs = np.where(saddle_mask > 0)
if len(xs) == 0 or len(ys) == 0:
print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): No pixels")
continue
x_min, x_max = xs.min(), xs.max()
y_min, y_max = ys.min(), ys.max()
if (x_max - x_min) < 10 or (y_max - y_min) < 10:
print(f"[GeometricDetector] ⚠ Quadrant {idx} ({name}): Box too small")
continue
crop = image[y_min:y_max+1, x_min:x_max+1].copy()
mask_crop = saddle_mask[y_min:y_max+1, x_min:x_max+1].copy()
crop_masked = cv2.bitwise_and(crop, crop, mask=mask_crop)
cx = int(np.mean(xs))
cy = int(np.mean(ys))
area = np.sum(saddle_mask > 0)
saddles.append(SaddleROI(
id=idx,
bbox=(x_min, y_min, x_max - x_min, y_max - y_min),
crop=crop_masked,
center=(cx, cy),
area=area,
angle=0.0,
confidence=0.9 # Higher confidence for geometric detection
))
return saddles
def _detect_circles_improved(self, image: np.ndarray) -> Tuple[Optional[Tuple], Optional[Tuple]]:
"""Improved circle detection with multiple strategies"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
h, w = gray.shape
# Strategy 1: Standard HoughCircles with optimized params
gray_blur = cv2.GaussianBlur(gray, (9, 9), 2)
minRadius = int(min(h, w) * 0.2)
maxRadius = int(min(h, w) * 0.7)
# Try multiple parameter sets
param_sets = [
(80, 25), # Standard
(60, 30), # More lenient
(100, 20), # More strict
]
for param1, param2 in param_sets:
circles = cv2.HoughCircles(
gray_blur,
cv2.HOUGH_GRADIENT,
dp=1.0,
minDist=80,
param1=param1,
param2=param2,
minRadius=minRadius,
maxRadius=maxRadius
)
if circles is not None and len(circles[0]) >= 1:
circles = np.round(circles[0, :]).astype(int)
circles = sorted(circles, key=lambda c: c[2], reverse=True)
outer = tuple(circles[0])
inner_radius = int(outer[2] * 0.4)
inner = (outer[0], outer[1], inner_radius)
print(f"✓ HoughCircles success (param1={param1}, param2={param2})")
return outer, inner
# Strategy 2: Contour-based detection
print("→ Using contour fallback")
edges = cv2.Canny(gray_blur, 30, 100)
# Dilate to connect edges
kernel = np.ones((3, 3), np.uint8)
edges = cv2.dilate(edges, kernel, iterations=2)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
# Find largest contour
largest = max(contours, key=cv2.contourArea)
(cx, cy), radius = cv2.minEnclosingCircle(largest)
center_x, center_y = int(cx), int(cy)
outer_radius = int(radius * 0.85)
inner_radius = int(outer_radius * 0.4)
print(f"✓ Contour-based: center=({center_x},{center_y}), radius={outer_radius}")
return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius))
# Strategy 3: Dimension-based estimate
center_x, center_y = w // 2, h // 2
outer_radius = min(w, h) // 2 - 50
inner_radius = int(outer_radius * 0.4)
print(f"→ Using dimension estimate: center=({center_x},{center_y}), radius={outer_radius}")
return ((center_x, center_y, outer_radius), (center_x, center_y, inner_radius))
def _create_sector_mask(self, shape, center, outer_r, inner_r, start_angle, end_angle):
"""Create sector mask with caching"""
h, w = shape
mask = np.zeros((h, w), dtype=np.uint8)
y, x = np.ogrid[:h, :w]
dx = x - center[0]
dy = y - center[1]
angles = np.rad2deg(np.arctan2(dy, dx)) % 360
radius = np.sqrt(dx**2 + dy**2)
if start_angle < end_angle:
angle_mask = (angles >= start_angle) & (angles < end_angle)
else:
angle_mask = (angles >= start_angle) | (angles < end_angle)
radius_mask = (radius >= inner_r) & (radius <= outer_r)
sector = angle_mask & radius_mask
mask[sector] = 255
return mask
def cleanup(self):
"""Release cache"""
self._mask_cache.clear()
class AlignmentCorrector:
"""Affine transformation for rotation correction"""
@staticmethod
def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray:
if image is None or image.size == 0:
return image
h, w = image.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, -angle, 1.0)
cos = np.abs(M[0, 0])
sin = np.abs(M[0, 1])
new_w = int((h * sin) + (w * cos))
new_h = int((h * cos) + (w * sin))
M[0, 2] += (new_w / 2) - center[0]
M[1, 2] += (new_h / 2) - center[1]
rotated = cv2.warpAffine(image, M, (new_w, new_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0))
return rotated
@staticmethod
def correct_saddle(saddle: SaddleROI) -> np.ndarray:
if saddle.crop is None or saddle.crop.size == 0:
return np.zeros((224, 224, 3), dtype=np.uint8)
if abs(saddle.angle) < 1.0:
return saddle.crop
return AlignmentCorrector.correct_rotation(saddle.crop, saddle.angle)
class CNNFeatureExtractor:
"""ResNet-18 feature extraction with memory optimization"""
def __init__(self, onnx_path: str = "resnet18_features.onnx"):
self.onnx_path = onnx_path
self.session = None
self.model = None
self.device = None
self.feature_dim = 1024
self._initialized = False
def _lazy_init(self):
if self._initialized:
return
self._initialized = True
# Use PyTorch with fused features
self._init_pytorch_model()
def _init_pytorch_model(self):
torch = _get_torch()
torchvision = _get_torchvision()
from torchvision.models import resnet18, ResNet18_Weights
import torch.nn as nn
self.device = torch.device('cpu')
weights = ResNet18_Weights.IMAGENET1K_V1
base_model = resnet18(weights=weights)
class FeatureFusionModel(nn.Module):
def __init__(self, base):
super().__init__()
self.conv1 = base.conv1
self.bn1 = base.bn1
self.relu = base.relu
self.maxpool = base.maxpool
self.layer1 = base.layer1
self.layer2 = base.layer2
self.layer3 = base.layer3
self.layer4 = base.layer4
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.maxpool_global = nn.AdaptiveMaxPool2d((1, 1))
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
l3 = self.layer3(x)
l4 = self.layer4(l3)
f3_avg = self.avgpool(l3).view(l3.size(0), -1)
f3_max = self.maxpool_global(l3).view(l3.size(0), -1)
f4_avg = self.avgpool(l4).view(l4.size(0), -1)
return torch.cat([f3_avg, f3_max, f4_avg], dim=1)
self.model = FeatureFusionModel(base_model)
self.model.to(self.device)
self.model.eval()
print(f"✓ PyTorch FeatureFusion ResNet-18 (dim={self.feature_dim})")
def _preprocess_image(self, image: np.ndarray) -> np.ndarray:
if image is None or image.size == 0:
return np.zeros((3, 224, 224), dtype=np.float32)
if len(image.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
elif len(image.shape) == 3 and image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB)
elif len(image.shape) == 3 and image.shape[2] == 3:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (224, 224))
image = image.astype(np.float32) / 255.0
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
image = (image - mean) / std
image = image.transpose(2, 0, 1)
return image
def extract_features(self, image: np.ndarray) -> np.ndarray:
self._lazy_init()
batch = self._preprocess_image(image)[np.newaxis, ...]
features = self.extract_batch(batch)
return features[0]
def extract_batch(self, images: List[np.ndarray]) -> np.ndarray:
self._lazy_init()
if isinstance(images, np.ndarray) and len(images.shape) == 4:
batch = images.astype(np.float32)
else:
preprocessed = [self._preprocess_image(img) for img in images]
batch = np.stack(preprocessed, axis=0).astype(np.float32)
torch = _get_torch()
batch_tensor = torch.from_numpy(batch).to(self.device)
with torch.no_grad():
features = self.model(batch_tensor)
features = features.squeeze(-1).squeeze(-1).cpu().numpy()
norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8
features = features / norms
# Memory cleanup
if ENABLE_GC:
del batch_tensor
gc.collect()
return features
def compute_similarity(self, features1: np.ndarray, features2: np.ndarray) -> float:
similarity = np.dot(features1, features2) / (
np.linalg.norm(features1) * np.linalg.norm(features2) + 1e-8
)
return float(np.clip(similarity, 0, 1))
def cleanup(self):
"""Release model from memory"""
if self.model is not None:
del self.model
self.model = None
if ENABLE_GC:
gc.collect()
class GeometricConstraintValidator:
"""Validate saddle positions using geometric constraints - RELAXED"""
def __init__(self, expected_count: int = 4):
self.expected_count = expected_count
self.reference_positions = None
self.reference_distances = {}
def set_reference_positions(self, saddles: List[SaddleROI]):
if len(saddles) != self.expected_count:
print(f"⚠ Reference has {len(saddles)} saddles, expected {self.expected_count}")
return False
centers = [s.center for s in saddles]
centroid = (
int(np.mean([c[0] for c in centers])),
int(np.mean([c[1] for c in centers]))
)
self.reference_positions = [
(c[0] - centroid[0], c[1] - centroid[1]) for c in centers
]
self.reference_distances = {}
for i in range(len(saddles)):
for j in range(i+1, len(saddles)):
dist = np.linalg.norm(
np.array(saddles[i].center) - np.array(saddles[j].center)
)
self.reference_distances[(i, j)] = dist
print(f"✓ Reference geometry learned: {len(saddles)} saddles")
return True
def infer_missing_saddles(
self,
detected_saddles: List[SaddleROI],
image: np.ndarray
) -> List[SaddleROI]:
"""Infer missing saddles based on reference geometry"""
if len(detected_saddles) >= self.expected_count:
return detected_saddles
if self.reference_positions is None:
print("⚠ No reference geometry - cannot infer")
return detected_saddles
if len(detected_saddles) == 0:
print("⚠ No saddles detected - cannot infer")
return detected_saddles
centers = [s.center for s in detected_saddles]
centroid = (
int(np.mean([c[0] for c in centers])),
int(np.mean([c[1] for c in centers]))
)
scale = self._estimate_scale(detected_saddles)
detected_ids = {s.id for s in detected_saddles}
all_saddles = list(detected_saddles)
for expected_id in range(self.expected_count):
if expected_id not in detected_ids:
rel_x, rel_y = self.reference_positions[expected_id]
expected_x = int(centroid[0] + rel_x * scale)
expected_y = int(centroid[1] + rel_y * scale)
crop_size = 150 # Increased crop size
x_min = max(0, expected_x - crop_size // 2)
y_min = max(0, expected_y - crop_size // 2)
x_max = min(image.shape[1], expected_x + crop_size // 2)
y_max = min(image.shape[0], expected_y + crop_size // 2)
crop = image[y_min:y_max, x_min:x_max].copy()
inferred = SaddleROI(
id=expected_id,
bbox=(x_min, y_min, x_max - x_min, y_max - y_min),
crop=crop,
center=(expected_x, expected_y),
area=(x_max - x_min) * (y_max - y_min),
angle=0.0,
confidence=0.5 # Mark as inferred
)
all_saddles.append(inferred)
print(f" ⚙ Inferred Saddle {expected_id} at ({expected_x}, {expected_y})")
all_saddles.sort(key=lambda s: s.id)
return all_saddles
def _estimate_scale(self, saddles: List[SaddleROI]) -> float:
if len(saddles) < 2 or not self.reference_distances:
return 1.0
if len(saddles) >= 2:
s0, s1 = saddles[0], saddles[1]
current_dist = np.linalg.norm(
np.array(s0.center) - np.array(s1.center)
)
key = (min(s0.id, s1.id), max(s0.id, s1.id))
if key in self.reference_distances:
ref_dist = self.reference_distances[key]
if ref_dist > 0:
scale = current_dist / ref_dist
return scale
return 1.0
def validate_geometry(self, saddles: List[SaddleROI], tolerance: float = 0.30) -> Tuple[bool, str]:
"""RELAXED geometry validation"""
if len(saddles) != self.expected_count:
return False, f"Expected {self.expected_count} saddles, got {len(saddles)}"
if self.reference_distances is None or not self.reference_distances:
# No reference - accept geometry
return True, "No reference geometry (accepting)"
sorted_saddles = sorted(saddles, key=lambda s: s.id)
centers = [np.array(s.center) for s in sorted_saddles]
# Skip collinearity check - too strict
# Distance validation with relaxed tolerance
max_deviation = 0.0
for i in range(len(sorted_saddles) - 1):
s0, s1 = sorted_saddles[i], sorted_saddles[i + 1]
current_dist = np.linalg.norm(centers[i + 1] - centers[i])
key = (min(s0.id, s1.id), max(s0.id, s1.id))
if key in self.reference_distances:
ref_dist = self.reference_distances[key]
if ref_dist > 0:
deviation = abs(current_dist - ref_dist) / ref_dist
max_deviation = max(max_deviation, deviation)
if deviation > tolerance:
print(f" Distance S{s0.id}-S{s1.id} off by {deviation*100:.1f}%")
if max_deviation > tolerance:
return False, f"Max distance deviation: {max_deviation*100:.1f}%"
return True, "Geometry valid"
class MultiReferenceManager:
"""Manage multiple golden template images with memory optimization"""
def __init__(self, feature_extractor: CNNFeatureExtractor):
self.feature_extractor = feature_extractor
self.references = []
self.max_references = 20 if not IS_CLOUD else 10
def add_reference(self, image: np.ndarray, saddles: List[SaddleROI]) -> bool:
if len(saddles) != 4:
print(f"⚠ Reference must have 4 saddles, got {len(saddles)}")
return False
# Limit number of references
if len(self.references) >= self.max_references:
print(f"⚠ Max references ({self.max_references}) reached, removing oldest")
old_ref = self.references.pop(0)
# Cleanup
if ENABLE_GC:
del old_ref
gc.collect()
features_list = []
for saddle in saddles:
aligned_crop = AlignmentCorrector.correct_saddle(saddle)
features = self.feature_extractor.extract_features(aligned_crop)
features_list.append(features)
# Compress image for storage
if image.shape[0] > 800 or image.shape[1] > 800:
scale = min(800/image.shape[1], 800/image.shape[0])
new_w, new_h = int(image.shape[1]*scale), int(image.shape[0]*scale)
image_stored = cv2.resize(image, (new_w, new_h))
else:
image_stored = image
self.references.append({
'image': image_stored,
'saddles': saddles,
'features': features_list
})
print(f"✓ Reference {len(self.references)} added")
return True
def match_saddle(self, saddle: SaddleROI) -> Tuple[float, float]:
if not self.references:
# No references - return neutral score
return 0.90, 0.5 # High similarity when no reference
aligned_crop = AlignmentCorrector.correct_saddle(saddle)
test_features = self.feature_extractor.extract_features(aligned_crop)
similarities = []
distances = []
for ref in self.references:
if saddle.id < len(ref['features']):
ref_features = ref['features'][saddle.id]
sim = self.feature_extractor.compute_similarity(test_features, ref_features)
dist = np.linalg.norm(test_features - ref_features)
similarities.append(sim)
distances.append(dist)
if similarities:
best_sim = max(similarities)
avg_dist = np.mean(distances)
return best_sim, avg_dist
return 0.90, 0.5 # Default neutral score
def get_reference_count(self) -> int:
return len(self.references)
def get_references(self) -> List[str]:
refs = []
for ref in self.references:
try:
_, buffer = cv2.imencode('.jpg', ref['image'], [cv2.IMWRITE_JPEG_QUALITY, 50])
import base64
img_str = base64.b64encode(buffer).decode('utf-8')
refs.append(f"data:image/jpeg;base64,{img_str}")
except:
continue
return refs
def cleanup(self):
"""Release all references"""
self.references.clear()
if ENABLE_GC:
gc.collect()
class AdvancedBlockInspector:
"""
Advanced Engine Block Inspector - CLOUD OPTIMIZED & RELAXED
Key improvements:
- Works WITHOUT reference images
- Relaxed thresholds
- Better error handling
- Improved saddle detection fallback
"""
def __init__(self, yolo_model_path: Optional[str] = None, onnx_path: str = "resnet18_features.onnx"):
self.yolo_detector = YOLOOBBDetector(yolo_model_path)
self.geometric_detector = GeometricSaddleDetector()
self.cnn_extractor = CNNFeatureExtractor(onnx_path=onnx_path)
self.reference_manager = MultiReferenceManager(self.cnn_extractor)
# Use base validator (more flexible)
self.geo_validator = GeometricConstraintValidator(expected_count=4)
self.use_optimized_validator = False
# Structure validator
self.structure_validator = None
if STRUCTURE_VALIDATOR_AVAILABLE:
self.structure_validator = SaddleStructureValidator(
arc_center_tolerance=0.05,
min_arc_length_ratio=0.6,
min_structure_confidence=0.65
)
print("✓ Structure Validator: Semicircle + Middle Arc")
self.config = {
'expected_saddles': 4,
'similarity_threshold': 0.75, # RELAXED from 0.88
'use_yolo': self.yolo_detector.model_loaded,
'use_geometric_fallback': True,
'use_alignment_correction': True,
'infer_missing_saddles': True,
'yolo_confidence': 0.20, # RELAXED from 0.25
'use_geometry_validation': False, # DISABLED initially
'use_structure_validation': False, # DISABLED initially
'use_optimized_validator': False,
'min_brightness': 15.0, # RELAXED from 20.0
'require_references': False, # NEW: work without references
}
# Memory tracking
self.last_saddles = []
self.last_image = None
print("[OK] Advanced Inspector initialized (Relaxed Mode)")
print(f" YOLO-OBB: {'Enabled' if self.config['use_yolo'] else 'Disabled (using geometric)'}")
print(f" Similarity threshold: {self.config['similarity_threshold']}")
print(f" Requires references: {self.config['require_references']}")
print(f" Max cache size: {MAX_CACHE_SIZE}")
print(f" GC enabled: {ENABLE_GC}")
def add_reference_image(self, image: np.ndarray) -> bool:
"""Add a golden template image"""
try:
# Quality check
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
brightness = np.mean(gray)
if brightness < 15:
print(f"✗ Reference too dark (brightness: {brightness:.1f})")
return False
# Detect saddles
if self.config['use_yolo']:
saddles = self.yolo_detector.detect_saddles(
image, conf_threshold=self.config['yolo_confidence']
)
else:
saddles = []
if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']:
print("→ Falling back to geometric detection")
saddles = self.geometric_detector.detect_saddles(image)
if len(saddles) != self.config['expected_saddles']:
print(f"✗ Reference failed: {len(saddles)}/{self.config['expected_saddles']} saddles")
return False
success = self.reference_manager.add_reference(image, saddles)
if success and self.reference_manager.get_reference_count() == 1:
self.geo_validator.set_reference_positions(saddles)
# Enable validation after first reference
self.config['use_geometry_validation'] = True
# Cleanup
self._cleanup_after_processing()
return success
except Exception as e:
print(f"✗ Reference error: {e}")
import traceback
traceback.print_exc()
return False
def inspect_block(self, image: np.ndarray) -> BlockInspectionResult:
"""Inspect engine block with memory optimization - RELAXED MODE"""
start_time = time.time()
try:
# Quality check
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
brightness = np.mean(gray)
if brightness < 10: # Very relaxed
return BlockInspectionResult(
block_status='WASTE_IMAGE',
total_saddles=self.config['expected_saddles'],
detected_saddles=0,
defective_saddles=0,
saddle_results=[],
processing_time_ms=(time.time() - start_time) * 1000,
alignment_status='REJECTED_TOO_DARK',
block_angle=0.0
)
# Detect saddles
if self.config['use_yolo']:
saddles = self.yolo_detector.detect_saddles(
image, conf_threshold=self.config['yolo_confidence']
)
else:
saddles = []
if len(saddles) < self.config['expected_saddles'] and self.config['use_geometric_fallback']:
print("→ Falling back to geometric detection")
saddles = self.geometric_detector.detect_saddles(image)
detected_count = len(saddles)
print(f" Initially detected: {detected_count} saddles")
# Infer missing saddles if we have reference
if len(saddles) < self.config['expected_saddles'] and self.config['infer_missing_saddles']:
if self.reference_manager.get_reference_count() > 0:
print(f" Inferring {self.config['expected_saddles'] - len(saddles)} missing saddles")
saddles = self.geo_validator.infer_missing_saddles(saddles, image)
else:
print(" ⚠ No references - cannot infer missing saddles")
# Validate count
if len(saddles) != self.config['expected_saddles']:
alignment_status = f"SADDLE_COUNT: {len(saddles)}/{self.config['expected_saddles']}"
# If no references, this might still be OK
if not self.config['require_references'] and len(saddles) > 0:
print(f" Warning: {alignment_status} but continuing without references")
else:
return self._error_result(alignment_status, 0.0, saddles, detected_count)
# Geometry validation (only if enabled and references exist)
if self.config.get('use_geometry_validation', False) and self.reference_manager.get_reference_count() > 0:
is_valid, reason = self.geo_validator.validate_geometry(saddles, tolerance=0.30)
if not is_valid:
print(f" Geometry validation failed: {reason}")
# Don't reject - just warn
alignment_status = "OK"
block_angle = np.mean([s.angle for s in saddles]) if saddles else 0.0
# Feature extraction and comparison
if len(saddles) == self.config['expected_saddles']:
aligned_crops = [AlignmentCorrector.correct_saddle(s) for s in saddles]
batch_features = self.cnn_extractor.extract_batch(aligned_crops)
# Compare against references
saddle_results = []
for i, saddle in enumerate(saddles):
test_features = batch_features[i]
best_sim, avg_dist = self.reference_manager.match_saddle(saddle)
# Relaxed status determination
status = 'PERFECT' if best_sim >= self.config['similarity_threshold'] else 'DEFECTIVE'
saddle_results.append(SaddleInspectionResult(
saddle_id=saddle.id,
status=status,
similarity_score=best_sim,
confidence=saddle.confidence,
feature_distance=avg_dist,
detected=saddle.confidence > 0.0
))
else:
# Partial detection
saddle_results = []
for saddle in saddles:
best_sim, avg_dist = self.reference_manager.match_saddle(saddle)
saddle_results.append(SaddleInspectionResult(
saddle_id=saddle.id,
status='UNKNOWN',
similarity_score=best_sim,
confidence=saddle.confidence,
feature_distance=avg_dist,
detected=saddle.confidence > 0.0
))
# Block decision - RELAXED
defective_count = sum(1 for r in saddle_results if r.status == 'DEFECTIVE')
avg_sim = np.mean([r.similarity_score for r in saddle_results]) if saddle_results else 0
# Determine block status
if len(saddles) == 0:
block_status = 'WASTE_IMAGE'
elif len(saddles) < self.config['expected_saddles']:
if self.config['require_references']:
block_status = 'DEFECTIVE'
else:
# No references required - accept partial detection
block_status = 'UNKNOWN'
elif defective_count == 0:
block_status = 'PERFECT'
elif defective_count <= 2: # Allow up to 2 defective
block_status = 'ACCEPTABLE'
else:
block_status = 'DEFECTIVE'
processing_time = (time.time() - start_time) * 1000
self.last_saddles = saddles
self.last_image = image
result = BlockInspectionResult(
block_status=block_status,
total_saddles=self.config['expected_saddles'],
detected_saddles=detected_count,
defective_saddles=defective_count,
saddle_results=saddle_results,
processing_time_ms=processing_time,
alignment_status=alignment_status,
block_angle=block_angle
)
print(f" Result: {block_status} ({detected_count}/{self.config['expected_saddles']} detected, {defective_count} defective)")
# Cleanup
self._cleanup_after_processing()
return result
except Exception as e:
print(f"✗ Inspection error: {e}")
import traceback
traceback.print_exc()
return self._error_result(f"EXCEPTION: {str(e)}", 0.0, [], 0)
def get_reference_images(self) -> List[str]:
return self.reference_manager.get_references()
def _error_result(self, message: str, angle: float, saddles: List, detected_count: int) -> BlockInspectionResult:
self.last_saddles = saddles
return BlockInspectionResult(
block_status='ERROR',
total_saddles=self.config['expected_saddles'],
detected_saddles=detected_count,
defective_saddles=0,
saddle_results=[],
processing_time_ms=0.0,
alignment_status=message,
block_angle=angle
)
def visualize_results(
self,
original_image: np.ndarray,
saddles: List[SaddleROI],
results: List[SaddleInspectionResult]
) -> np.ndarray:
"""Create detailed visualization"""
vis = original_image.copy()
for saddle, result in zip(saddles, results):
# Color coding
if result.status == 'UNKNOWN':
color = (255, 255, 0) # Yellow
elif not result.detected:
color = (255, 165, 0) # Orange
elif result.status == 'PERFECT' or result.status == 'ACCEPTABLE':
color = (0, 255, 0) # Green
else:
color = (0, 0, 255) # Red
x, y, w, h = saddle.bbox
cv2.rectangle(vis, (x, y), (x+w, y+h), color, 3)
cx, cy = saddle.center
cv2.circle(vis, (cx, cy), 10, color, -1)
if abs(saddle.angle) > 1.0:
rad = np.deg2rad(saddle.angle)
end_x = int(cx + 30 * np.cos(rad))
end_y = int(cy + 30 * np.sin(rad))
cv2.arrowedLine(vis, (cx, cy), (end_x, end_y), color, 3)
detection_marker = "" if result.detected else "⚙"
label = f"{detection_marker} S{saddle.id}: {result.status}"
sim_text = f"{result.similarity_score:.3f}"
angle_text = f"{saddle.angle:.1f}°"
cv2.rectangle(vis, (cx-70, cy-70), (cx+70, cy+40), color, -1)
cv2.putText(vis, label, (cx-65, cy-45),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2)
cv2.putText(vis, sim_text, (cx-65, cy-20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
cv2.putText(vis, angle_text, (cx-65, cy+5),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 2)
cv2.putText(vis, f"C:{saddle.confidence:.2f}", (cx-65, cy+28),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
return vis
def _cleanup_after_processing(self):
"""Clean up resources after processing"""
# Clear old saddles
for saddle in self.last_saddles:
if hasattr(saddle, 'cleanup'):
saddle.cleanup()
# Trigger garbage collection
if ENABLE_GC:
gc.collect()
def cleanup(self):
"""Full cleanup - call when done"""
print("Cleaning up resources...")
# Cleanup components
if hasattr(self.yolo_detector, 'cleanup'):
self.yolo_detector.cleanup()
if hasattr(self.geometric_detector, 'cleanup'):
self.geometric_detector.cleanup()
if hasattr(self.cnn_extractor, 'cleanup'):
self.cnn_extractor.cleanup()
if hasattr(self.reference_manager, 'cleanup'):
self.reference_manager.cleanup()
# Clear state
self.last_saddles.clear()
self.last_image = None
# Force garbage collection
if ENABLE_GC:
gc.collect()
print("✓ Cleanup complete")