Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Face Recognizer Module | |
| Face detection and recognition using InsightFace: | |
| - SCRFD for fast face detection | |
| - ArcFace for face embeddings and matching | |
| Used for person-specific filtering in highlight extraction. | |
| """ | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple, Union | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from utils.logger import get_logger, LogTimer | |
| from utils.helpers import ModelLoadError, InferenceError, validate_image_file | |
| from config import get_config, ModelConfig | |
| logger = get_logger("models.face_recognizer") | |
| class FaceDetection: | |
| """Represents a detected face in an image.""" | |
| bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) | |
| confidence: float # Detection confidence | |
| embedding: Optional[np.ndarray] # Face embedding (512-dim for ArcFace) | |
| landmarks: Optional[np.ndarray] # Facial landmarks (5 points) | |
| age: Optional[int] = None # Estimated age | |
| gender: Optional[str] = None # Estimated gender | |
| def center(self) -> Tuple[int, int]: | |
| """Center point of face bounding box.""" | |
| x1, y1, x2, y2 = self.bbox | |
| return ((x1 + x2) // 2, (y1 + y2) // 2) | |
| def area(self) -> int: | |
| """Area of face bounding box.""" | |
| x1, y1, x2, y2 = self.bbox | |
| return (x2 - x1) * (y2 - y1) | |
| def width(self) -> int: | |
| return self.bbox[2] - self.bbox[0] | |
| def height(self) -> int: | |
| return self.bbox[3] - self.bbox[1] | |
| class FaceMatch: | |
| """Result of face matching.""" | |
| detection: FaceDetection # The detected face | |
| similarity: float # Cosine similarity to reference (0-1) | |
| is_match: bool # Whether it matches reference | |
| reference_id: Optional[str] = None # ID of matched reference | |
| class FaceRecognizer: | |
| """ | |
| Face detection and recognition using InsightFace. | |
| Supports: | |
| - Multi-face detection per frame | |
| - Face embedding extraction | |
| - Similarity-based face matching | |
| - Reference image registration | |
| """ | |
| def __init__( | |
| self, | |
| config: Optional[ModelConfig] = None, | |
| load_model: bool = True, | |
| ): | |
| """ | |
| Initialize face recognizer. | |
| Args: | |
| config: Model configuration | |
| load_model: Whether to load model immediately | |
| Raises: | |
| ImportError: If insightface is not installed | |
| """ | |
| self.config = config or get_config().model | |
| self.model = None | |
| self._reference_embeddings: dict = {} | |
| if load_model: | |
| self._load_model() | |
| logger.info(f"FaceRecognizer initialized (threshold={self.config.face_similarity_threshold})") | |
| def _load_model(self) -> None: | |
| """Load InsightFace model.""" | |
| with LogTimer(logger, "Loading InsightFace model"): | |
| try: | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| # Initialize FaceAnalysis app | |
| self.model = FaceAnalysis( | |
| name=self.config.face_detection_model, | |
| providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| if self.config.device == "cuda" else ['CPUExecutionProvider'], | |
| ) | |
| # Prepare with detection size | |
| self.model.prepare(ctx_id=0 if self.config.device == "cuda" else -1) | |
| logger.info("InsightFace model loaded successfully") | |
| except ImportError as e: | |
| raise ImportError( | |
| "InsightFace is required for face recognition. " | |
| "Install with: pip install insightface onnxruntime-gpu" | |
| ) from e | |
| except Exception as e: | |
| logger.error(f"Failed to load InsightFace model: {e}") | |
| raise ModelLoadError(f"Could not load face recognition model: {e}") from e | |
| def detect_faces( | |
| self, | |
| image: Union[str, Path, np.ndarray], | |
| max_faces: int = 10, | |
| min_confidence: float = 0.5, | |
| ) -> List[FaceDetection]: | |
| """ | |
| Detect faces in an image. | |
| Args: | |
| image: Image path or numpy array (BGR format) | |
| max_faces: Maximum faces to detect | |
| min_confidence: Minimum detection confidence | |
| Returns: | |
| List of FaceDetection objects | |
| Raises: | |
| InferenceError: If detection fails | |
| """ | |
| if self.model is None: | |
| raise ModelLoadError("Model not loaded") | |
| try: | |
| import cv2 | |
| # Load image if path | |
| if isinstance(image, (str, Path)): | |
| img = cv2.imread(str(image)) | |
| if img is None: | |
| raise InferenceError(f"Could not load image: {image}") | |
| else: | |
| img = image | |
| # Detect faces | |
| faces = self.model.get(img, max_num=max_faces) | |
| # Convert to FaceDetection objects | |
| detections = [] | |
| for face in faces: | |
| if face.det_score < min_confidence: | |
| continue | |
| bbox = tuple(map(int, face.bbox)) | |
| detection = FaceDetection( | |
| bbox=bbox, | |
| confidence=float(face.det_score), | |
| embedding=face.embedding if hasattr(face, 'embedding') else None, | |
| landmarks=face.kps if hasattr(face, 'kps') else None, | |
| age=int(face.age) if hasattr(face, 'age') else None, | |
| gender='M' if hasattr(face, 'gender') and face.gender == 1 else 'F' if hasattr(face, 'gender') else None, | |
| ) | |
| detections.append(detection) | |
| logger.debug(f"Detected {len(detections)} faces") | |
| return detections | |
| except Exception as e: | |
| logger.error(f"Face detection failed: {e}") | |
| raise InferenceError(f"Face detection failed: {e}") from e | |
| def register_reference( | |
| self, | |
| reference_image: Union[str, Path, np.ndarray], | |
| reference_id: str = "target", | |
| ) -> bool: | |
| """ | |
| Register a reference face for matching. | |
| Args: | |
| reference_image: Image containing the reference face | |
| reference_id: Identifier for this reference | |
| Returns: | |
| True if registration successful | |
| Raises: | |
| InferenceError: If no face found in reference | |
| """ | |
| with LogTimer(logger, f"Registering reference face '{reference_id}'"): | |
| detections = self.detect_faces(reference_image, max_faces=1) | |
| if not detections: | |
| raise InferenceError("No face detected in reference image") | |
| if detections[0].embedding is None: | |
| raise InferenceError("Could not extract embedding from reference face") | |
| self._reference_embeddings[reference_id] = detections[0].embedding | |
| logger.info(f"Registered reference face: {reference_id}") | |
| return True | |
| def match_faces( | |
| self, | |
| image: Union[str, Path, np.ndarray], | |
| reference_id: str = "target", | |
| threshold: Optional[float] = None, | |
| ) -> List[FaceMatch]: | |
| """ | |
| Find faces matching a registered reference. | |
| Args: | |
| image: Image to search for matches | |
| reference_id: ID of reference to match against | |
| threshold: Similarity threshold (uses config if None) | |
| Returns: | |
| List of FaceMatch objects for all detected faces | |
| """ | |
| threshold = threshold or self.config.face_similarity_threshold | |
| if reference_id not in self._reference_embeddings: | |
| logger.warning(f"Reference '{reference_id}' not registered") | |
| return [] | |
| reference_embedding = self._reference_embeddings[reference_id] | |
| detections = self.detect_faces(image) | |
| matches = [] | |
| for detection in detections: | |
| if detection.embedding is None: | |
| continue | |
| similarity = self._cosine_similarity( | |
| reference_embedding, detection.embedding | |
| ) | |
| matches.append(FaceMatch( | |
| detection=detection, | |
| similarity=similarity, | |
| is_match=similarity >= threshold, | |
| reference_id=reference_id, | |
| )) | |
| # Sort by similarity descending | |
| matches.sort(key=lambda m: m.similarity, reverse=True) | |
| return matches | |
| def find_target_in_frame( | |
| self, | |
| image: Union[str, Path, np.ndarray], | |
| reference_id: str = "target", | |
| threshold: Optional[float] = None, | |
| ) -> Optional[FaceMatch]: | |
| """ | |
| Find the best matching face in a frame. | |
| Args: | |
| image: Frame to search | |
| reference_id: Reference to match against | |
| threshold: Similarity threshold | |
| Returns: | |
| Best FaceMatch if found, None otherwise | |
| """ | |
| matches = self.match_faces(image, reference_id, threshold) | |
| matching = [m for m in matches if m.is_match] | |
| if matching: | |
| return matching[0] # Return best match | |
| return None | |
| def compute_screen_time( | |
| self, | |
| frames: List[Union[str, Path, np.ndarray]], | |
| reference_id: str = "target", | |
| threshold: Optional[float] = None, | |
| ) -> float: | |
| """ | |
| Compute percentage of frames where target person appears. | |
| Args: | |
| frames: List of frames to analyze | |
| reference_id: Reference person to look for | |
| threshold: Match threshold | |
| Returns: | |
| Percentage of frames with target person (0-1) | |
| """ | |
| if not frames: | |
| return 0.0 | |
| matches = 0 | |
| for frame in frames: | |
| try: | |
| match = self.find_target_in_frame(frame, reference_id, threshold) | |
| if match is not None: | |
| matches += 1 | |
| except Exception as e: | |
| logger.debug(f"Frame analysis failed: {e}") | |
| screen_time = matches / len(frames) | |
| logger.info(f"Target screen time: {screen_time*100:.1f}% ({matches}/{len(frames)} frames)") | |
| return screen_time | |
| def get_face_crop( | |
| self, | |
| image: Union[str, Path, np.ndarray], | |
| detection: FaceDetection, | |
| margin: float = 0.2, | |
| ) -> np.ndarray: | |
| """ | |
| Extract face crop from image. | |
| Args: | |
| image: Source image | |
| detection: Face detection with bounding box | |
| margin: Margin around face (0.2 = 20%) | |
| Returns: | |
| Cropped face image as numpy array | |
| """ | |
| import cv2 | |
| if isinstance(image, (str, Path)): | |
| img = cv2.imread(str(image)) | |
| else: | |
| img = image | |
| h, w = img.shape[:2] | |
| x1, y1, x2, y2 = detection.bbox | |
| # Add margin | |
| margin_x = int((x2 - x1) * margin) | |
| margin_y = int((y2 - y1) * margin) | |
| x1 = max(0, x1 - margin_x) | |
| y1 = max(0, y1 - margin_y) | |
| x2 = min(w, x2 + margin_x) | |
| y2 = min(h, y2 + margin_y) | |
| return img[y1:y2, x1:x2] | |
| def _cosine_similarity( | |
| self, | |
| embedding1: np.ndarray, | |
| embedding2: np.ndarray, | |
| ) -> float: | |
| """Compute cosine similarity between embeddings.""" | |
| norm1 = np.linalg.norm(embedding1) | |
| norm2 = np.linalg.norm(embedding2) | |
| if norm1 == 0 or norm2 == 0: | |
| return 0.0 | |
| return float(np.dot(embedding1, embedding2) / (norm1 * norm2)) | |
| def clear_references(self) -> None: | |
| """Clear all registered reference faces.""" | |
| self._reference_embeddings.clear() | |
| logger.info("Cleared all reference faces") | |
| def get_registered_references(self) -> List[str]: | |
| """Get list of registered reference IDs.""" | |
| return list(self._reference_embeddings.keys()) | |
| # Export public interface | |
| __all__ = ["FaceRecognizer", "FaceDetection", "FaceMatch"] | |