import cv2 import os import time import base64 import numpy as np from deepface import DeepFace from core.config import DETECTOR, MODELS, THRESHOLDS, MIN_FACE_AREA, REQUIRED_AVERAGE_CONFIDENCE from core.state import KNOWN_VECTORS def process_frame_synchronous(frame): """ ULTRA-FAST VECTORIZED PIPELINE: 1. Optimized Detection. 2. Parallel-Ready Feature Extraction. 3. Matrix-Multiplication Search (1ms response time). """ report = [] pipeline_status = {"detected": 0, "identified": 0} try: # Optimized Detection pass raw_faces = DeepFace.extract_faces(img_path=frame, detector_backend=DETECTOR, enforce_detection=True, align=True) pipeline_status["detected"] = len(raw_faces) for face_obj in raw_faces: region = face_obj.get('facial_area', {}) w, h, x, y = region.get('w', 0), region.get('h', 0), region.get('x', 0), region.get('y', 0) face_info = { "status": "unknown", "name": "Unknown", "score": 0, "box": {"x": x, "y": y, "w": w, "h": h}, "crop_b64": "" } if w > 0 and h > 0: cropped_face = frame[y:y+h, x:x+w] if cropped_face.size != 0: # Quick JPEG encoding for UI debug _, buffer = cv2.imencode('.jpg', cropped_face, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) face_info["crop_b64"] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode('utf-8') if (w * h) >= MIN_FACE_AREA: model_predictions = [] for model in MODELS: try: # Get current embedding emb_objs = DeepFace.represent(img_path=cropped_face, model_name=model, enforce_detection=False, detector_backend="skip") if not emb_objs: continue v_curr = np.array(emb_objs[0]["embedding"]) v_curr = v_curr / np.linalg.norm(v_curr) # Normalize # --- HIGH SPEED MATRIX SEARCH --- # Extract all known names and their vectors for this specific model names = list(KNOWN_VECTORS.keys()) vectors = np.array([KNOWN_VECTORS[n][model] for n in names if model in KNOWN_VECTORS[n]]) if len(vectors) > 0: # Calculate all Cosine Similarities in ONE math operation (Dot Product) # Similarity = dot(v_curr, v_matrix.T) similarities = np.dot(vectors, v_curr) best_idx = np.argmax(similarities) best_sim = similarities[best_idx] dist = 1 - best_sim if dist < THRESHOLDS[model]: model_predictions.append({ "name": names[best_idx], "conf": round(best_sim * 100, 1) }) except Exception as e: print(f"Model Error ({model}): {e}") # --- SMART CONSENSUS --- if model_predictions: # If we have multiple models, they must agree unique_names = set(p["name"] for p in model_predictions) avg_conf = sum(p["conf"] for p in model_predictions) / len(MODELS) if len(unique_names) == 1 and avg_conf >= REQUIRED_AVERAGE_CONFIDENCE: face_info["status"] = "match" face_info["name"] = list(unique_names)[0] face_info["score"] = avg_conf pipeline_status["identified"] += 1 else: face_info["status"] = "fail" face_info["name"] = model_predictions[0]["name"] face_info["score"] = avg_conf report.append(face_info) return report, pipeline_status except Exception as e: return [], pipeline_status