edusync / app /services /face_recognition.py
sakshusat's picture
feat: implement FaceEngine service with Haar Cascade detection, eye-based alignment, and ArcFace embedding extraction
1cf37bf
import cv2
import numpy as np
import onnxruntime as ort
from typing import List, Tuple, Optional, Any
from app.core.config import settings
import os
import logging
logger = logging.getLogger("face-engine")
class FaceEngine:
_instance = None
detector = None
eye_detector = None
ort_session = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(FaceEngine, cls).__new__(cls)
cls._instance.initialize()
return cls._instance
def initialize(self):
if self.detector is not None:
return
logger.info("Initializing Stable FaceEngine (Haar Cascade + ArcFace ONNX)...")
# 1. Initialize Haar Cascade
try:
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
eye_path = cv2.data.haarcascades + 'haarcascade_eye.xml'
self.detector = cv2.CascadeClassifier(cascade_path)
self.eye_detector = cv2.CascadeClassifier(eye_path)
if self.detector.empty() or self.eye_detector.empty():
logger.error("Failed to load one or more Haar Cascade files")
else:
logger.info("✅ Haar Cascades loaded: Face & Eye")
except Exception as e:
logger.error(f"Failed to load Haar Cascades: {e}")
# 2. Initialize ONNX Runtime for ArcFace
model_path = settings.MODEL_PATH
if not os.path.exists(model_path):
# Fallback check
model_path = os.path.join(os.getcwd(), "app", "models", "w600k_mbf.onnx")
if os.path.exists(model_path):
try:
# CPU optimization for multi-worker environments
# We limit intra-op threads to 1 to prevent 'thread thrashing' when
# running multiple Gunicorn workers.
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 1
sess_options.inter_op_num_threads = 1
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
providers = ['CPUExecutionProvider']
self.ort_session = ort.InferenceSession(model_path, sess_options=sess_options, providers=providers)
logger.info(f"✅ ArcFace ONNX Session loaded with thread-optimized config from {model_path}")
except Exception as e:
logger.error(f"Failed to load ONNX session: {e}")
else:
logger.critical(f"ArcFace model not found at {model_path}")
def _align_face(self, image_bgr: np.ndarray, face_box: Tuple[int, int, int, int], eye_centers: List[np.ndarray]) -> np.ndarray:
"""
Align face using eye centers for better ArcFace accuracy.
Produces a 112x112 cropped and aligned face.
"""
(x, y, w, h) = face_box
if len(eye_centers) >= 2:
# Sort eyes by X coordinate to get left and right
eye_centers = sorted(eye_centers, key=lambda p: p[0])
left_eye, right_eye = eye_centers[0], eye_centers[1]
# Desired eye positions in 112x112 ArcFace input
target_left = (38.29, 51.69)
target_right = (73.53, 51.5)
# Calculate angle and distance
dy = right_eye[1] - left_eye[1]
dx = right_eye[0] - left_eye[0]
dist = np.sqrt(dx**2 + dy**2)
angle = np.degrees(np.arctan2(dy, dx))
# Scale factor (reference distance between eyes in 112x112 is ~35.2 units)
reference_dist = target_right[0] - target_left[0]
scale = reference_dist / max(1e-6, dist)
# Midpoint in source and target
src_mid = ((left_eye[0] + right_eye[0]) / 2.0, (left_eye[1] + right_eye[1]) / 2.0)
dst_mid = ((target_left[0] + target_right[0]) / 2.0, (target_left[1] + target_right[1]) / 2.0)
# Get rotation and scale matrix around eye midpoint
M = cv2.getRotationMatrix2D(src_mid, angle, scale)
# Adjust translation to move src_mid to dst_mid
M[0, 2] += (dst_mid[0] - src_mid[0])
M[1, 2] += (dst_mid[1] - src_mid[1])
# Perform warp
aligned_face = cv2.warpAffine(image_bgr, M, (112, 112), borderMode=cv2.BORDER_REPLICATE)
return aligned_face
# Fallback crop
margin = int(w * 0.1)
y1, y2 = max(0, y - margin), min(image_bgr.shape[0], y + h + margin)
x1, x2 = max(0, x - margin), min(image_bgr.shape[1], x + w + margin)
face_img = image_bgr[y1:y2, x1:x2]
face_img = cv2.resize(face_img, (112, 112))
return face_img
def _normalize_brightness(self, image_bgr: np.ndarray) -> np.ndarray:
"""Apply CLAHE to normalize lighting sensitivity."""
try:
lab = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
cl = clahe.apply(l)
limg = cv2.merge((cl, a, b))
return cv2.cvtColor(limg, cv2.COLOR_LAB2BGR)
except Exception as e:
logger.warning(f"Brightness normalization failed: {e}")
return image_bgr
def _preprocess_face(self, image_bgr: np.ndarray, x: int, y: int, w: int, h: int, eye_centers: Optional[List[np.ndarray]] = None) -> np.ndarray:
"""Preprocess face for ArcFace: Align, Crop, Resize, Normalize."""
if eye_centers and len(eye_centers) >= 2:
face_img = self._align_face(image_bgr, (x, y, w, h), eye_centers)
else:
margin = int(w * 0.1)
y1, y2 = max(0, y - margin), min(image_bgr.shape[0], y + h + margin)
x1, x2 = max(0, x - margin), min(image_bgr.shape[1], x + w + margin)
face_img = image_bgr[y1:y2, x1:x2]
face_img = cv2.resize(face_img, (112, 112))
face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
face_img = face_img.astype(np.float32)
face_img = (face_img - 127.5) / 128.0
face_img = np.transpose(face_img, (2, 0, 1))
face_img = np.expand_dims(face_img, axis=0)
return face_img
def process_complete(self, image_bgr: np.ndarray) -> dict:
"""Optimized single-pass face processing with speed improvements."""
result: Any = {
"face_detected": False,
"guidance": "searching",
"embedding": None,
"eye_data": {"left_eye": [], "right_eye": [], "eye_count": 0},
"box": None
}
# Apply brightness normalization for better low-light handling
image_bgr = self._normalize_brightness(image_bgr)
try:
if self.detector is None:
return result
# Speed Optimization: Downscale image for detection if it's large
ih, iw = image_bgr.shape[:2]
scaling_factor = 1.0
if iw > 640:
scaling_factor = 640.0 / iw
detect_img = cv2.resize(image_bgr, (0, 0), fx=scaling_factor, fy=scaling_factor)
else:
detect_img = image_bgr
gray = cv2.cvtColor(detect_img, cv2.COLOR_BGR2GRAY)
# detectMultiScale parameters optimized for balance
faces = self.detector.detectMultiScale(gray, 1.2, 5, minSize=(60, 60))
if len(faces) == 0:
return result
# 1. Best face selection
best_face = sorted(faces, key=lambda f: f[2]*f[3], reverse=True)[0]
x, y, w, h = [int(v / scaling_factor) for v in best_face]
# Ensure coordinates are within frame
x, y = max(0, x), max(0, y)
w, h = min(w, iw - x), min(h, ih - y)
result["face_detected"] = True
result["box"] = [x, y, w, h]
# Guidance
if h < ih * 0.25: result["guidance"] = "move closer"
elif h > ih * 0.85: result["guidance"] = "move back"
else:
cx, cy = x + w/2.0, y + h/2.0
if cx < iw * 0.25 or cx > iw * 0.75 or cy < ih * 0.25 or cy > ih * 0.75:
result["guidance"] = "look center"
else:
result["guidance"] = "perfect"
# 2. Optimized Eye Detection (Search only upper 60% of face region)
eye_centers = []
eye_list = []
# Crop upper part of face for eyes
eye_roi_h = int(h * 0.6)
roi_gray = cv2.cvtColor(image_bgr[y:y+eye_roi_h, x:x+w], cv2.COLOR_BGR2GRAY)
eyes = self.eye_detector.detectMultiScale(roi_gray, 1.05, 4, minSize=(w//8, w//8))
for eye in eyes:
ex, ey_box, ew, eh = [int(v) for v in eye]
# Global centers for alignment
eye_centers.append(np.array([float(x + ex + ew/2.0), float(y + ey_box + eh/2.0)]))
# ROI-relative boxes for EAR
eye_list.append([[ex, ey_box], [ex+ew, ey_box+eh]])
if len(eye_list) > 0:
# Sort eyes by X to identify left/right
eye_list = sorted(eye_list, key=lambda e: e[0][0])
result["eye_data"] = {
"left_eye": eye_list[0],
"right_eye": eye_list[1] if len(eye_list) > 1 else [],
"eye_count": len(eye_list)
}
# 3. Embedding extraction with Alignment
face_tensor = self._preprocess_face(image_bgr, x, y, w, h, eye_centers)
if self.ort_session is not None:
input_name = self.ort_session.get_inputs()[0].name
outputs = self.ort_session.run(None, {input_name: face_tensor})
embedding = outputs[0][0]
# Normalize embedding
norm = np.linalg.norm(embedding)
if norm != 0:
embedding = embedding / norm
result["embedding"] = embedding
return result
except Exception as e:
logger.error(f"Unified processing failed: {e}")
return result
def extract_embedding(self, image_bgr: np.ndarray) -> Optional[np.ndarray]:
data = self.process_complete(image_bgr)
return data["embedding"]
def detect_only(self, image_bgr: np.ndarray) -> Tuple[bool, str]:
data = self.process_complete(image_bgr)
return data["face_detected"], data["guidance"]
def get_guidance(self, image_bgr: np.ndarray) -> str:
_, guidance = self.detect_only(image_bgr)
return guidance
def get_face_data(self, image_bgr: np.ndarray):
data = self.process_complete(image_bgr)
if not data["face_detected"]: return None
return {
"face_detected": True,
"left_eye": data["eye_data"]["left_eye"],
"right_eye": data["eye_data"]["right_eye"],
"eye_count": data["eye_data"]["eye_count"]
}
# Singleton instance
face_engine = FaceEngine()