SecureAttendAI / backend /face_engine.py
Nishant Katiyar
Deploy biometric node to HF Spaces
b561839
Raw
History Blame Contribute Delete
5.34 kB
import os
import urllib.request
import cv2
import numpy as np
from backend.database import get_all_embeddings
# Model configuration paths
MODELS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models")
os.makedirs(MODELS_DIR, exist_ok=True)
YUNET_PATH = os.path.join(MODELS_DIR, "face_detection_yunet_2023mar.onnx")
SFACE_PATH = os.path.join(MODELS_DIR, "face_recognition_sface_2021dec.onnx")
# Hugging Face Direct Download URLs
YUNET_URL = "https://huggingface.co/opencv/face_detection_yunet/resolve/main/face_detection_yunet_2023mar.onnx"
SFACE_URL = "https://huggingface.co/opencv/face_recognition_sface/resolve/main/face_recognition_sface_2021dec.onnx"
def download_model(url, save_path):
if os.path.exists(save_path) and os.path.getsize(save_path) > 10000:
return True
print(f"[*] Downloading AI model {os.path.basename(save_path)} from Hugging Face...")
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response, open(save_path, 'wb') as out_file:
out_file.write(response.read())
print(f"[+] Download complete: {os.path.basename(save_path)} ({os.path.getsize(save_path)} bytes)!")
return True
except Exception as e:
print(f"[-] Error downloading model {os.path.basename(save_path)}: {e}")
if os.path.exists(save_path):
os.remove(save_path)
return False
class FaceEngine:
def __init__(self, cosine_threshold: float = 0.363):
# 1. Download models if missing
download_model(YUNET_URL, YUNET_PATH)
download_model(SFACE_URL, SFACE_PATH)
# 2. Load models
print("[*] Loading YuNet Face Detector...")
# Set default input size (320x240), it will be updated dynamically based on frame dimensions
self.detector = cv2.FaceDetectorYN.create(YUNET_PATH, "", (320, 240))
print("[*] Loading SFace Face Recognizer...")
self.recognizer = cv2.FaceRecognizerSF.create(SFACE_PATH, "")
self.cosine_threshold = cosine_threshold
self.known_faces = []
# 3. Load known embeddings into memory
self.reload_embeddings()
print(f"[+] Face Engine ready! {len(self.known_faces)} face templates loaded in memory cache.")
def reload_embeddings(self):
"""Fetches all employee face templates from the DB and caches them in memory."""
try:
self.known_faces = get_all_embeddings()
print(f"[+] Reloaded face embeddings in cache. Total templates: {len(self.known_faces)}")
except Exception as e:
print(f"[-] Failed to reload face embeddings: {e}")
self.known_faces = []
def detect_faces(self, frame):
"""Detects faces in a frame and returns bounding boxes and 5-point landmarks."""
h, w = frame.shape[:2]
self.detector.setInputSize((w, h))
retval, faces = self.detector.detect(frame)
return retval, faces
def extract_embedding(self, frame, face_landmarks):
"""Aligns, crops a face from landmarks, and extracts the 128-dimensional embedding vector."""
try:
aligned_face = self.recognizer.alignCrop(frame, face_landmarks)
feature = self.recognizer.feature(aligned_face)
return feature
except Exception as e:
print(f"[-] Embedding extraction failed: {e}")
return None
def match_face(self, query_embedding):
"""
Compares query embedding with cached templates using Cosine Similarity.
Returns the best matching employee or None.
"""
if not self.known_faces:
return None, "Unknown", 0.0
best_score = -1.0
best_emp_id = None
best_emp_name = None
# Store individual best scores per employee to aggregate multi-angle samples
employee_best_matches = {}
for face_template in self.known_faces:
emp_id = face_template["employee_id"]
emp_name = face_template["name"]
known_emb = face_template["embedding"]
try:
# SFace cosine matching yields similarity score (higher is better, threshold is 0.363)
score = self.recognizer.match(query_embedding, known_emb, cv2.FaceRecognizerSF_FR_COSINE)
if emp_id not in employee_best_matches or score > employee_best_matches[emp_id]["score"]:
employee_best_matches[emp_id] = {
"name": emp_name,
"score": score
}
except Exception as e:
print(f"[-] Match calculation failed: {e}")
continue
# Find the absolute best match among all employees
for emp_id, data in employee_best_matches.items():
if data["score"] > best_score:
best_score = data["score"]
best_emp_id = emp_id
best_emp_name = data["name"]
# Evaluate threshold
if best_score >= self.cosine_threshold:
return best_emp_id, best_emp_name, float(best_score)
else:
return None, "Unknown", float(best_score) if best_score != -1.0 else 0.0