import os import urllib.request import cv2 import numpy as np from backend.database import get_all_embeddings # Model configuration paths MODELS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models") os.makedirs(MODELS_DIR, exist_ok=True) YUNET_PATH = os.path.join(MODELS_DIR, "face_detection_yunet_2023mar.onnx") SFACE_PATH = os.path.join(MODELS_DIR, "face_recognition_sface_2021dec.onnx") # Hugging Face Direct Download URLs YUNET_URL = "https://huggingface.co/opencv/face_detection_yunet/resolve/main/face_detection_yunet_2023mar.onnx" SFACE_URL = "https://huggingface.co/opencv/face_recognition_sface/resolve/main/face_recognition_sface_2021dec.onnx" def download_model(url, save_path): if os.path.exists(save_path) and os.path.getsize(save_path) > 10000: return True print(f"[*] Downloading AI model {os.path.basename(save_path)} from Hugging Face...") try: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req) as response, open(save_path, 'wb') as out_file: out_file.write(response.read()) print(f"[+] Download complete: {os.path.basename(save_path)} ({os.path.getsize(save_path)} bytes)!") return True except Exception as e: print(f"[-] Error downloading model {os.path.basename(save_path)}: {e}") if os.path.exists(save_path): os.remove(save_path) return False class FaceEngine: def __init__(self, cosine_threshold: float = 0.363): # 1. Download models if missing download_model(YUNET_URL, YUNET_PATH) download_model(SFACE_URL, SFACE_PATH) # 2. Load models print("[*] Loading YuNet Face Detector...") # Set default input size (320x240), it will be updated dynamically based on frame dimensions self.detector = cv2.FaceDetectorYN.create(YUNET_PATH, "", (320, 240)) print("[*] Loading SFace Face Recognizer...") self.recognizer = cv2.FaceRecognizerSF.create(SFACE_PATH, "") self.cosine_threshold = cosine_threshold self.known_faces = [] # 3. Load known embeddings into memory self.reload_embeddings() print(f"[+] Face Engine ready! {len(self.known_faces)} face templates loaded in memory cache.") def reload_embeddings(self): """Fetches all employee face templates from the DB and caches them in memory.""" try: self.known_faces = get_all_embeddings() print(f"[+] Reloaded face embeddings in cache. Total templates: {len(self.known_faces)}") except Exception as e: print(f"[-] Failed to reload face embeddings: {e}") self.known_faces = [] def detect_faces(self, frame): """Detects faces in a frame and returns bounding boxes and 5-point landmarks.""" h, w = frame.shape[:2] self.detector.setInputSize((w, h)) retval, faces = self.detector.detect(frame) return retval, faces def extract_embedding(self, frame, face_landmarks): """Aligns, crops a face from landmarks, and extracts the 128-dimensional embedding vector.""" try: aligned_face = self.recognizer.alignCrop(frame, face_landmarks) feature = self.recognizer.feature(aligned_face) return feature except Exception as e: print(f"[-] Embedding extraction failed: {e}") return None def match_face(self, query_embedding): """ Compares query embedding with cached templates using Cosine Similarity. Returns the best matching employee or None. """ if not self.known_faces: return None, "Unknown", 0.0 best_score = -1.0 best_emp_id = None best_emp_name = None # Store individual best scores per employee to aggregate multi-angle samples employee_best_matches = {} for face_template in self.known_faces: emp_id = face_template["employee_id"] emp_name = face_template["name"] known_emb = face_template["embedding"] try: # SFace cosine matching yields similarity score (higher is better, threshold is 0.363) score = self.recognizer.match(query_embedding, known_emb, cv2.FaceRecognizerSF_FR_COSINE) if emp_id not in employee_best_matches or score > employee_best_matches[emp_id]["score"]: employee_best_matches[emp_id] = { "name": emp_name, "score": score } except Exception as e: print(f"[-] Match calculation failed: {e}") continue # Find the absolute best match among all employees for emp_id, data in employee_best_matches.items(): if data["score"] > best_score: best_score = data["score"] best_emp_id = emp_id best_emp_name = data["name"] # Evaluate threshold if best_score >= self.cosine_threshold: return best_emp_id, best_emp_name, float(best_score) else: return None, "Unknown", float(best_score) if best_score != -1.0 else 0.0