Spaces:
Sleeping
Sleeping
| import cv2 | |
| from ultralytics import YOLO | |
| from deepface import DeepFace | |
| import numpy as np | |
| import pickle | |
| import os | |
| import logging | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| class FaceRecognitionService: | |
| def __init__(self, model_dir: str): | |
| self.model_dir = Path(model_dir) | |
| self.model_name = "ArcFace" | |
| self.detector_model = "yolov8n-face.pt" | |
| self.cache_file = self.model_dir / "embeddings_cache.pkl" | |
| self.num_best_frames = 10 | |
| self.min_blur_threshold = 10 | |
| self.blur_weight = 0.6 | |
| self.frontal_weight = 0.4 | |
| def calculate_blur_score(self, image): | |
| """Calculate sharpness using Laplacian variance.""" | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| return laplacian_var | |
| def calculate_frontal_score(self, face_data): | |
| """Calculate how frontal the face is based on facial area.""" | |
| try: | |
| facial_area = face_data.get('facial_area', {}) | |
| area = facial_area.get('w', 0) * facial_area.get('h', 0) | |
| frontal_score = min(area / 50000.0, 1.0) * 100 | |
| return frontal_score | |
| except: | |
| return 50.0 | |
| def extract_embeddings_from_video(self, video_path: str): | |
| """Extract high-quality face embeddings from a 360-degree scan video.""" | |
| logger.info(f"Processing reference video scan: {video_path}") | |
| logger.info("Phase 1: Analyzing frame quality...") | |
| cap_ref = cv2.VideoCapture(video_path) | |
| total_frames = int(cap_ref.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| candidate_frames = [] | |
| frame_idx = 0 | |
| while cap_ref.isOpened(): | |
| ret, frame = cap_ref.read() | |
| if not ret: | |
| break | |
| if frame_idx % 15 == 0: | |
| blur_score = self.calculate_blur_score(frame) | |
| if blur_score <= self.min_blur_threshold: | |
| logger.debug(f"Frame {frame_idx}: Blur={blur_score:.1f} (too blurry, skipped)") | |
| if blur_score > self.min_blur_threshold: | |
| try: | |
| face_data = DeepFace.represent(frame, model_name=self.model_name, enforce_detection=True)[0] | |
| frontal_score = self.calculate_frontal_score(face_data) | |
| quality_score = (blur_score * self.blur_weight) + (frontal_score * self.frontal_weight) | |
| candidate_frames.append({ | |
| 'frame_idx': frame_idx, | |
| 'frame': frame.copy(), | |
| 'blur_score': blur_score, | |
| 'frontal_score': frontal_score, | |
| 'quality_score': quality_score, | |
| 'embedding': face_data['embedding'] | |
| }) | |
| logger.debug(f"Frame {frame_idx}: Quality={quality_score:.1f}") | |
| except Exception as e: | |
| logger.debug(f"Frame {frame_idx}: No face detected - {e}") | |
| frame_idx += 1 | |
| cap_ref.release() | |
| if not candidate_frames: | |
| raise ValueError("No valid frames found in video") | |
| logger.info(f"Phase 2: Selecting top {self.num_best_frames} frames with temporal spacing...") | |
| candidate_frames.sort(key=lambda x: x['quality_score'], reverse=True) | |
| segment_size = total_frames // self.num_best_frames | |
| selected_frames = [] | |
| for segment_idx in range(self.num_best_frames): | |
| segment_start = segment_idx * segment_size | |
| segment_end = (segment_idx + 1) * segment_size | |
| best_in_segment = None | |
| best_quality = -1 | |
| for candidate in candidate_frames: | |
| if segment_start <= candidate['frame_idx'] < segment_end: | |
| if candidate['quality_score'] > best_quality: | |
| best_quality = candidate['quality_score'] | |
| best_in_segment = candidate | |
| if best_in_segment: | |
| selected_frames.append(best_in_segment) | |
| logger.debug(f"Segment {segment_idx+1}: Frame {best_in_segment['frame_idx']}") | |
| if len(selected_frames) < self.num_best_frames: | |
| for candidate in candidate_frames: | |
| if candidate not in selected_frames: | |
| selected_frames.append(candidate) | |
| if len(selected_frames) >= self.num_best_frames: | |
| break | |
| logger.info(f"Phase 3: Averaging {len(selected_frames)} embeddings...") | |
| embeddings_to_average = [frame['embedding'] for frame in selected_frames] | |
| master_embedding = np.mean(embeddings_to_average, axis=0).tolist() | |
| return [master_embedding], len(selected_frames) | |
| def extract_embeddings_from_image(self, image_path: str): | |
| """Extract face embedding from a single image.""" | |
| try: | |
| embedding = DeepFace.represent(img_path=image_path, model_name=self.model_name)[0]["embedding"] | |
| return [embedding] | |
| except Exception as e: | |
| logger.error(f"Could not extract embedding from {image_path}: {e}") | |
| raise | |
| def save_embeddings_cache(self, embeddings, video_path: str, num_frames_used: int): | |
| """Save embeddings to cache file.""" | |
| cache_data = { | |
| 'video_path': video_path, | |
| 'video_mtime': os.path.getmtime(video_path) if os.path.exists(video_path) else None, | |
| 'model_name': self.model_name, | |
| 'embeddings': embeddings, | |
| 'version': 2, | |
| 'num_frames_used': num_frames_used | |
| } | |
| with open(self.cache_file, 'wb') as f: | |
| pickle.dump(cache_data, f) | |
| logger.info(f"Saved embeddings cache to {self.cache_file}") | |
| def load_embeddings_cache(self): | |
| """Load embeddings from cache file.""" | |
| if not os.path.exists(self.cache_file): | |
| return None | |
| try: | |
| with open(self.cache_file, 'rb') as f: | |
| cache_data = pickle.load(f) | |
| return cache_data | |
| except Exception as e: | |
| logger.error(f"Could not load cache: {e}") | |
| return None | |