import cv2 import numpy as np import onnxruntime as ort from typing import List, Tuple, Optional, Any from app.core.config import settings import os import logging logger = logging.getLogger("face-engine") class FaceEngine: _instance = None detector = None eye_detector = None ort_session = None def __new__(cls): if cls._instance is None: cls._instance = super(FaceEngine, cls).__new__(cls) cls._instance.initialize() return cls._instance def initialize(self): if self.detector is not None: return logger.info("Initializing Stable FaceEngine (Haar Cascade + ArcFace ONNX)...") # 1. Initialize Haar Cascade try: cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' eye_path = cv2.data.haarcascades + 'haarcascade_eye.xml' self.detector = cv2.CascadeClassifier(cascade_path) self.eye_detector = cv2.CascadeClassifier(eye_path) if self.detector.empty() or self.eye_detector.empty(): logger.error("Failed to load one or more Haar Cascade files") else: logger.info("✅ Haar Cascades loaded: Face & Eye") except Exception as e: logger.error(f"Failed to load Haar Cascades: {e}") # 2. Initialize ONNX Runtime for ArcFace model_path = settings.MODEL_PATH if not os.path.exists(model_path): # Fallback check model_path = os.path.join(os.getcwd(), "app", "models", "w600k_mbf.onnx") if os.path.exists(model_path): try: # CPU optimization for multi-worker environments # We limit intra-op threads to 1 to prevent 'thread thrashing' when # running multiple Gunicorn workers. sess_options = ort.SessionOptions() sess_options.intra_op_num_threads = 1 sess_options.inter_op_num_threads = 1 sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL providers = ['CPUExecutionProvider'] self.ort_session = ort.InferenceSession(model_path, sess_options=sess_options, providers=providers) logger.info(f"✅ ArcFace ONNX Session loaded with thread-optimized config from {model_path}") except Exception as e: logger.error(f"Failed to load ONNX session: {e}") else: logger.critical(f"ArcFace model not found at {model_path}") def _align_face(self, image_bgr: np.ndarray, face_box: Tuple[int, int, int, int], eye_centers: List[np.ndarray]) -> np.ndarray: """ Align face using eye centers for better ArcFace accuracy. Produces a 112x112 cropped and aligned face. """ (x, y, w, h) = face_box if len(eye_centers) >= 2: # Sort eyes by X coordinate to get left and right eye_centers = sorted(eye_centers, key=lambda p: p[0]) left_eye, right_eye = eye_centers[0], eye_centers[1] # Desired eye positions in 112x112 ArcFace input target_left = (38.29, 51.69) target_right = (73.53, 51.5) # Calculate angle and distance dy = right_eye[1] - left_eye[1] dx = right_eye[0] - left_eye[0] dist = np.sqrt(dx**2 + dy**2) angle = np.degrees(np.arctan2(dy, dx)) # Scale factor (reference distance between eyes in 112x112 is ~35.2 units) reference_dist = target_right[0] - target_left[0] scale = reference_dist / max(1e-6, dist) # Midpoint in source and target src_mid = ((left_eye[0] + right_eye[0]) / 2.0, (left_eye[1] + right_eye[1]) / 2.0) dst_mid = ((target_left[0] + target_right[0]) / 2.0, (target_left[1] + target_right[1]) / 2.0) # Get rotation and scale matrix around eye midpoint M = cv2.getRotationMatrix2D(src_mid, angle, scale) # Adjust translation to move src_mid to dst_mid M[0, 2] += (dst_mid[0] - src_mid[0]) M[1, 2] += (dst_mid[1] - src_mid[1]) # Perform warp aligned_face = cv2.warpAffine(image_bgr, M, (112, 112), borderMode=cv2.BORDER_REPLICATE) return aligned_face # Fallback crop margin = int(w * 0.1) y1, y2 = max(0, y - margin), min(image_bgr.shape[0], y + h + margin) x1, x2 = max(0, x - margin), min(image_bgr.shape[1], x + w + margin) face_img = image_bgr[y1:y2, x1:x2] face_img = cv2.resize(face_img, (112, 112)) return face_img def _normalize_brightness(self, image_bgr: np.ndarray) -> np.ndarray: """Apply CLAHE to normalize lighting sensitivity.""" try: lab = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) cl = clahe.apply(l) limg = cv2.merge((cl, a, b)) return cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) except Exception as e: logger.warning(f"Brightness normalization failed: {e}") return image_bgr def _preprocess_face(self, image_bgr: np.ndarray, x: int, y: int, w: int, h: int, eye_centers: Optional[List[np.ndarray]] = None) -> np.ndarray: """Preprocess face for ArcFace: Align, Crop, Resize, Normalize.""" if eye_centers and len(eye_centers) >= 2: face_img = self._align_face(image_bgr, (x, y, w, h), eye_centers) else: margin = int(w * 0.1) y1, y2 = max(0, y - margin), min(image_bgr.shape[0], y + h + margin) x1, x2 = max(0, x - margin), min(image_bgr.shape[1], x + w + margin) face_img = image_bgr[y1:y2, x1:x2] face_img = cv2.resize(face_img, (112, 112)) face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) face_img = face_img.astype(np.float32) face_img = (face_img - 127.5) / 128.0 face_img = np.transpose(face_img, (2, 0, 1)) face_img = np.expand_dims(face_img, axis=0) return face_img def process_complete(self, image_bgr: np.ndarray) -> dict: """Optimized single-pass face processing with speed improvements.""" result: Any = { "face_detected": False, "guidance": "searching", "embedding": None, "eye_data": {"left_eye": [], "right_eye": [], "eye_count": 0}, "box": None } # Apply brightness normalization for better low-light handling image_bgr = self._normalize_brightness(image_bgr) try: if self.detector is None: return result # Speed Optimization: Downscale image for detection if it's large ih, iw = image_bgr.shape[:2] scaling_factor = 1.0 if iw > 640: scaling_factor = 640.0 / iw detect_img = cv2.resize(image_bgr, (0, 0), fx=scaling_factor, fy=scaling_factor) else: detect_img = image_bgr gray = cv2.cvtColor(detect_img, cv2.COLOR_BGR2GRAY) # detectMultiScale parameters optimized for balance faces = self.detector.detectMultiScale(gray, 1.2, 5, minSize=(60, 60)) if len(faces) == 0: return result # 1. Best face selection best_face = sorted(faces, key=lambda f: f[2]*f[3], reverse=True)[0] x, y, w, h = [int(v / scaling_factor) for v in best_face] # Ensure coordinates are within frame x, y = max(0, x), max(0, y) w, h = min(w, iw - x), min(h, ih - y) result["face_detected"] = True result["box"] = [x, y, w, h] # Guidance if h < ih * 0.25: result["guidance"] = "move closer" elif h > ih * 0.85: result["guidance"] = "move back" else: cx, cy = x + w/2.0, y + h/2.0 if cx < iw * 0.25 or cx > iw * 0.75 or cy < ih * 0.25 or cy > ih * 0.75: result["guidance"] = "look center" else: result["guidance"] = "perfect" # 2. Optimized Eye Detection (Search only upper 60% of face region) eye_centers = [] eye_list = [] # Crop upper part of face for eyes eye_roi_h = int(h * 0.6) roi_gray = cv2.cvtColor(image_bgr[y:y+eye_roi_h, x:x+w], cv2.COLOR_BGR2GRAY) eyes = self.eye_detector.detectMultiScale(roi_gray, 1.05, 4, minSize=(w//8, w//8)) for eye in eyes: ex, ey_box, ew, eh = [int(v) for v in eye] # Global centers for alignment eye_centers.append(np.array([float(x + ex + ew/2.0), float(y + ey_box + eh/2.0)])) # ROI-relative boxes for EAR eye_list.append([[ex, ey_box], [ex+ew, ey_box+eh]]) if len(eye_list) > 0: # Sort eyes by X to identify left/right eye_list = sorted(eye_list, key=lambda e: e[0][0]) result["eye_data"] = { "left_eye": eye_list[0], "right_eye": eye_list[1] if len(eye_list) > 1 else [], "eye_count": len(eye_list) } # 3. Embedding extraction with Alignment face_tensor = self._preprocess_face(image_bgr, x, y, w, h, eye_centers) if self.ort_session is not None: input_name = self.ort_session.get_inputs()[0].name outputs = self.ort_session.run(None, {input_name: face_tensor}) embedding = outputs[0][0] # Normalize embedding norm = np.linalg.norm(embedding) if norm != 0: embedding = embedding / norm result["embedding"] = embedding return result except Exception as e: logger.error(f"Unified processing failed: {e}") return result def extract_embedding(self, image_bgr: np.ndarray) -> Optional[np.ndarray]: data = self.process_complete(image_bgr) return data["embedding"] def detect_only(self, image_bgr: np.ndarray) -> Tuple[bool, str]: data = self.process_complete(image_bgr) return data["face_detected"], data["guidance"] def get_guidance(self, image_bgr: np.ndarray) -> str: _, guidance = self.detect_only(image_bgr) return guidance def get_face_data(self, image_bgr: np.ndarray): data = self.process_complete(image_bgr) if not data["face_detected"]: return None return { "face_detected": True, "left_eye": data["eye_data"]["left_eye"], "right_eye": data["eye_data"]["right_eye"], "eye_count": data["eye_data"]["eye_count"] } # Singleton instance face_engine = FaceEngine()