Spaces:
Sleeping
Sleeping
| import cv2 | |
| import os | |
| import time | |
| import base64 | |
| import numpy as np | |
| from deepface import DeepFace | |
| from core.config import DETECTOR, MODELS, THRESHOLDS, MIN_FACE_AREA, REQUIRED_AVERAGE_CONFIDENCE | |
| from core.state import KNOWN_VECTORS | |
| def process_frame_synchronous(frame): | |
| """ | |
| ULTRA-FAST VECTORIZED PIPELINE: | |
| 1. Optimized Detection. | |
| 2. Parallel-Ready Feature Extraction. | |
| 3. Matrix-Multiplication Search (1ms response time). | |
| """ | |
| report = [] | |
| pipeline_status = {"detected": 0, "identified": 0} | |
| try: | |
| # Optimized Detection pass | |
| raw_faces = DeepFace.extract_faces(img_path=frame, detector_backend=DETECTOR, enforce_detection=True, align=True) | |
| pipeline_status["detected"] = len(raw_faces) | |
| for face_obj in raw_faces: | |
| region = face_obj.get('facial_area', {}) | |
| w, h, x, y = region.get('w', 0), region.get('h', 0), region.get('x', 0), region.get('y', 0) | |
| face_info = { | |
| "status": "unknown", "name": "Unknown", "score": 0, | |
| "box": {"x": x, "y": y, "w": w, "h": h}, | |
| "crop_b64": "" | |
| } | |
| if w > 0 and h > 0: | |
| cropped_face = frame[y:y+h, x:x+w] | |
| if cropped_face.size != 0: | |
| # Quick JPEG encoding for UI debug | |
| _, buffer = cv2.imencode('.jpg', cropped_face, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) | |
| face_info["crop_b64"] = "data:image/jpeg;base64," + base64.b64encode(buffer).decode('utf-8') | |
| if (w * h) >= MIN_FACE_AREA: | |
| model_predictions = [] | |
| for model in MODELS: | |
| try: | |
| # Get current embedding | |
| emb_objs = DeepFace.represent(img_path=cropped_face, model_name=model, | |
| enforce_detection=False, detector_backend="skip") | |
| if not emb_objs: continue | |
| v_curr = np.array(emb_objs[0]["embedding"]) | |
| v_curr = v_curr / np.linalg.norm(v_curr) # Normalize | |
| # --- HIGH SPEED MATRIX SEARCH --- | |
| # Extract all known names and their vectors for this specific model | |
| names = list(KNOWN_VECTORS.keys()) | |
| vectors = np.array([KNOWN_VECTORS[n][model] for n in names if model in KNOWN_VECTORS[n]]) | |
| if len(vectors) > 0: | |
| # Calculate all Cosine Similarities in ONE math operation (Dot Product) | |
| # Similarity = dot(v_curr, v_matrix.T) | |
| similarities = np.dot(vectors, v_curr) | |
| best_idx = np.argmax(similarities) | |
| best_sim = similarities[best_idx] | |
| dist = 1 - best_sim | |
| if dist < THRESHOLDS[model]: | |
| model_predictions.append({ | |
| "name": names[best_idx], | |
| "conf": round(best_sim * 100, 1) | |
| }) | |
| except Exception as e: | |
| print(f"Model Error ({model}): {e}") | |
| # --- SMART CONSENSUS --- | |
| if model_predictions: | |
| # If we have multiple models, they must agree | |
| unique_names = set(p["name"] for p in model_predictions) | |
| avg_conf = sum(p["conf"] for p in model_predictions) / len(MODELS) | |
| if len(unique_names) == 1 and avg_conf >= REQUIRED_AVERAGE_CONFIDENCE: | |
| face_info["status"] = "match" | |
| face_info["name"] = list(unique_names)[0] | |
| face_info["score"] = avg_conf | |
| pipeline_status["identified"] += 1 | |
| else: | |
| face_info["status"] = "fail" | |
| face_info["name"] = model_predictions[0]["name"] | |
| face_info["score"] = avg_conf | |
| report.append(face_info) | |
| return report, pipeline_status | |
| except Exception as e: | |
| return [], pipeline_status | |