Spaces:
Sleeping
Sleeping
| import os | |
| import urllib.request | |
| import cv2 | |
| import numpy as np | |
| from backend.database import get_all_embeddings | |
| # Model configuration paths | |
| MODELS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models") | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| YUNET_PATH = os.path.join(MODELS_DIR, "face_detection_yunet_2023mar.onnx") | |
| SFACE_PATH = os.path.join(MODELS_DIR, "face_recognition_sface_2021dec.onnx") | |
| # Hugging Face Direct Download URLs | |
| YUNET_URL = "https://huggingface.co/opencv/face_detection_yunet/resolve/main/face_detection_yunet_2023mar.onnx" | |
| SFACE_URL = "https://huggingface.co/opencv/face_recognition_sface/resolve/main/face_recognition_sface_2021dec.onnx" | |
| def download_model(url, save_path): | |
| if os.path.exists(save_path) and os.path.getsize(save_path) > 10000: | |
| return True | |
| print(f"[*] Downloading AI model {os.path.basename(save_path)} from Hugging Face...") | |
| try: | |
| req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) | |
| with urllib.request.urlopen(req) as response, open(save_path, 'wb') as out_file: | |
| out_file.write(response.read()) | |
| print(f"[+] Download complete: {os.path.basename(save_path)} ({os.path.getsize(save_path)} bytes)!") | |
| return True | |
| except Exception as e: | |
| print(f"[-] Error downloading model {os.path.basename(save_path)}: {e}") | |
| if os.path.exists(save_path): | |
| os.remove(save_path) | |
| return False | |
| class FaceEngine: | |
| def __init__(self, cosine_threshold: float = 0.363): | |
| # 1. Download models if missing | |
| download_model(YUNET_URL, YUNET_PATH) | |
| download_model(SFACE_URL, SFACE_PATH) | |
| # 2. Load models | |
| print("[*] Loading YuNet Face Detector...") | |
| # Set default input size (320x240), it will be updated dynamically based on frame dimensions | |
| self.detector = cv2.FaceDetectorYN.create(YUNET_PATH, "", (320, 240)) | |
| print("[*] Loading SFace Face Recognizer...") | |
| self.recognizer = cv2.FaceRecognizerSF.create(SFACE_PATH, "") | |
| self.cosine_threshold = cosine_threshold | |
| self.known_faces = [] | |
| # 3. Load known embeddings into memory | |
| self.reload_embeddings() | |
| print(f"[+] Face Engine ready! {len(self.known_faces)} face templates loaded in memory cache.") | |
| def reload_embeddings(self): | |
| """Fetches all employee face templates from the DB and caches them in memory.""" | |
| try: | |
| self.known_faces = get_all_embeddings() | |
| print(f"[+] Reloaded face embeddings in cache. Total templates: {len(self.known_faces)}") | |
| except Exception as e: | |
| print(f"[-] Failed to reload face embeddings: {e}") | |
| self.known_faces = [] | |
| def detect_faces(self, frame): | |
| """Detects faces in a frame and returns bounding boxes and 5-point landmarks.""" | |
| h, w = frame.shape[:2] | |
| self.detector.setInputSize((w, h)) | |
| retval, faces = self.detector.detect(frame) | |
| return retval, faces | |
| def extract_embedding(self, frame, face_landmarks): | |
| """Aligns, crops a face from landmarks, and extracts the 128-dimensional embedding vector.""" | |
| try: | |
| aligned_face = self.recognizer.alignCrop(frame, face_landmarks) | |
| feature = self.recognizer.feature(aligned_face) | |
| return feature | |
| except Exception as e: | |
| print(f"[-] Embedding extraction failed: {e}") | |
| return None | |
| def match_face(self, query_embedding): | |
| """ | |
| Compares query embedding with cached templates using Cosine Similarity. | |
| Returns the best matching employee or None. | |
| """ | |
| if not self.known_faces: | |
| return None, "Unknown", 0.0 | |
| best_score = -1.0 | |
| best_emp_id = None | |
| best_emp_name = None | |
| # Store individual best scores per employee to aggregate multi-angle samples | |
| employee_best_matches = {} | |
| for face_template in self.known_faces: | |
| emp_id = face_template["employee_id"] | |
| emp_name = face_template["name"] | |
| known_emb = face_template["embedding"] | |
| try: | |
| # SFace cosine matching yields similarity score (higher is better, threshold is 0.363) | |
| score = self.recognizer.match(query_embedding, known_emb, cv2.FaceRecognizerSF_FR_COSINE) | |
| if emp_id not in employee_best_matches or score > employee_best_matches[emp_id]["score"]: | |
| employee_best_matches[emp_id] = { | |
| "name": emp_name, | |
| "score": score | |
| } | |
| except Exception as e: | |
| print(f"[-] Match calculation failed: {e}") | |
| continue | |
| # Find the absolute best match among all employees | |
| for emp_id, data in employee_best_matches.items(): | |
| if data["score"] > best_score: | |
| best_score = data["score"] | |
| best_emp_id = emp_id | |
| best_emp_name = data["name"] | |
| # Evaluate threshold | |
| if best_score >= self.cosine_threshold: | |
| return best_emp_id, best_emp_name, float(best_score) | |
| else: | |
| return None, "Unknown", float(best_score) if best_score != -1.0 else 0.0 | |