""" ShortSmith v2 - Face Recognizer Module Face detection and recognition using InsightFace: - SCRFD for fast face detection - ArcFace for face embeddings and matching Used for person-specific filtering in highlight extraction. """ from pathlib import Path from typing import List, Optional, Tuple, Union from dataclasses import dataclass import numpy as np from utils.logger import get_logger, LogTimer from utils.helpers import ModelLoadError, InferenceError, validate_image_file from config import get_config, ModelConfig logger = get_logger("models.face_recognizer") @dataclass class FaceDetection: """Represents a detected face in an image.""" bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) confidence: float # Detection confidence embedding: Optional[np.ndarray] # Face embedding (512-dim for ArcFace) landmarks: Optional[np.ndarray] # Facial landmarks (5 points) age: Optional[int] = None # Estimated age gender: Optional[str] = None # Estimated gender @property def center(self) -> Tuple[int, int]: """Center point of face bounding box.""" x1, y1, x2, y2 = self.bbox return ((x1 + x2) // 2, (y1 + y2) // 2) @property def area(self) -> int: """Area of face bounding box.""" x1, y1, x2, y2 = self.bbox return (x2 - x1) * (y2 - y1) @property def width(self) -> int: return self.bbox[2] - self.bbox[0] @property def height(self) -> int: return self.bbox[3] - self.bbox[1] @dataclass class FaceMatch: """Result of face matching.""" detection: FaceDetection # The detected face similarity: float # Cosine similarity to reference (0-1) is_match: bool # Whether it matches reference reference_id: Optional[str] = None # ID of matched reference class FaceRecognizer: """ Face detection and recognition using InsightFace. Supports: - Multi-face detection per frame - Face embedding extraction - Similarity-based face matching - Reference image registration """ def __init__( self, config: Optional[ModelConfig] = None, load_model: bool = True, ): """ Initialize face recognizer. Args: config: Model configuration load_model: Whether to load model immediately Raises: ImportError: If insightface is not installed """ self.config = config or get_config().model self.model = None self._reference_embeddings: dict = {} if load_model: self._load_model() logger.info(f"FaceRecognizer initialized (threshold={self.config.face_similarity_threshold})") def _load_model(self) -> None: """Load InsightFace model.""" with LogTimer(logger, "Loading InsightFace model"): try: import insightface from insightface.app import FaceAnalysis # Initialize FaceAnalysis app self.model = FaceAnalysis( name=self.config.face_detection_model, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] if self.config.device == "cuda" else ['CPUExecutionProvider'], ) # Prepare with detection size self.model.prepare(ctx_id=0 if self.config.device == "cuda" else -1) logger.info("InsightFace model loaded successfully") except ImportError as e: raise ImportError( "InsightFace is required for face recognition. " "Install with: pip install insightface onnxruntime-gpu" ) from e except Exception as e: logger.error(f"Failed to load InsightFace model: {e}") raise ModelLoadError(f"Could not load face recognition model: {e}") from e def detect_faces( self, image: Union[str, Path, np.ndarray], max_faces: int = 10, min_confidence: float = 0.5, ) -> List[FaceDetection]: """ Detect faces in an image. Args: image: Image path or numpy array (BGR format) max_faces: Maximum faces to detect min_confidence: Minimum detection confidence Returns: List of FaceDetection objects Raises: InferenceError: If detection fails """ if self.model is None: raise ModelLoadError("Model not loaded") try: import cv2 # Load image if path if isinstance(image, (str, Path)): img = cv2.imread(str(image)) if img is None: raise InferenceError(f"Could not load image: {image}") else: img = image # Detect faces faces = self.model.get(img, max_num=max_faces) # Convert to FaceDetection objects detections = [] for face in faces: if face.det_score < min_confidence: continue bbox = tuple(map(int, face.bbox)) detection = FaceDetection( bbox=bbox, confidence=float(face.det_score), embedding=face.embedding if hasattr(face, 'embedding') else None, landmarks=face.kps if hasattr(face, 'kps') else None, age=int(face.age) if hasattr(face, 'age') else None, gender='M' if hasattr(face, 'gender') and face.gender == 1 else 'F' if hasattr(face, 'gender') else None, ) detections.append(detection) logger.debug(f"Detected {len(detections)} faces") return detections except Exception as e: logger.error(f"Face detection failed: {e}") raise InferenceError(f"Face detection failed: {e}") from e def register_reference( self, reference_image: Union[str, Path, np.ndarray], reference_id: str = "target", ) -> bool: """ Register a reference face for matching. Args: reference_image: Image containing the reference face reference_id: Identifier for this reference Returns: True if registration successful Raises: InferenceError: If no face found in reference """ with LogTimer(logger, f"Registering reference face '{reference_id}'"): detections = self.detect_faces(reference_image, max_faces=1) if not detections: raise InferenceError("No face detected in reference image") if detections[0].embedding is None: raise InferenceError("Could not extract embedding from reference face") self._reference_embeddings[reference_id] = detections[0].embedding logger.info(f"Registered reference face: {reference_id}") return True def match_faces( self, image: Union[str, Path, np.ndarray], reference_id: str = "target", threshold: Optional[float] = None, ) -> List[FaceMatch]: """ Find faces matching a registered reference. Args: image: Image to search for matches reference_id: ID of reference to match against threshold: Similarity threshold (uses config if None) Returns: List of FaceMatch objects for all detected faces """ threshold = threshold or self.config.face_similarity_threshold if reference_id not in self._reference_embeddings: logger.warning(f"Reference '{reference_id}' not registered") return [] reference_embedding = self._reference_embeddings[reference_id] detections = self.detect_faces(image) matches = [] for detection in detections: if detection.embedding is None: continue similarity = self._cosine_similarity( reference_embedding, detection.embedding ) matches.append(FaceMatch( detection=detection, similarity=similarity, is_match=similarity >= threshold, reference_id=reference_id, )) # Sort by similarity descending matches.sort(key=lambda m: m.similarity, reverse=True) return matches def find_target_in_frame( self, image: Union[str, Path, np.ndarray], reference_id: str = "target", threshold: Optional[float] = None, ) -> Optional[FaceMatch]: """ Find the best matching face in a frame. Args: image: Frame to search reference_id: Reference to match against threshold: Similarity threshold Returns: Best FaceMatch if found, None otherwise """ matches = self.match_faces(image, reference_id, threshold) matching = [m for m in matches if m.is_match] if matching: return matching[0] # Return best match return None def compute_screen_time( self, frames: List[Union[str, Path, np.ndarray]], reference_id: str = "target", threshold: Optional[float] = None, ) -> float: """ Compute percentage of frames where target person appears. Args: frames: List of frames to analyze reference_id: Reference person to look for threshold: Match threshold Returns: Percentage of frames with target person (0-1) """ if not frames: return 0.0 matches = 0 for frame in frames: try: match = self.find_target_in_frame(frame, reference_id, threshold) if match is not None: matches += 1 except Exception as e: logger.debug(f"Frame analysis failed: {e}") screen_time = matches / len(frames) logger.info(f"Target screen time: {screen_time*100:.1f}% ({matches}/{len(frames)} frames)") return screen_time def get_face_crop( self, image: Union[str, Path, np.ndarray], detection: FaceDetection, margin: float = 0.2, ) -> np.ndarray: """ Extract face crop from image. Args: image: Source image detection: Face detection with bounding box margin: Margin around face (0.2 = 20%) Returns: Cropped face image as numpy array """ import cv2 if isinstance(image, (str, Path)): img = cv2.imread(str(image)) else: img = image h, w = img.shape[:2] x1, y1, x2, y2 = detection.bbox # Add margin margin_x = int((x2 - x1) * margin) margin_y = int((y2 - y1) * margin) x1 = max(0, x1 - margin_x) y1 = max(0, y1 - margin_y) x2 = min(w, x2 + margin_x) y2 = min(h, y2 + margin_y) return img[y1:y2, x1:x2] def _cosine_similarity( self, embedding1: np.ndarray, embedding2: np.ndarray, ) -> float: """Compute cosine similarity between embeddings.""" norm1 = np.linalg.norm(embedding1) norm2 = np.linalg.norm(embedding2) if norm1 == 0 or norm2 == 0: return 0.0 return float(np.dot(embedding1, embedding2) / (norm1 * norm2)) def clear_references(self) -> None: """Clear all registered reference faces.""" self._reference_embeddings.clear() logger.info("Cleared all reference faces") def get_registered_references(self) -> List[str]: """Get list of registered reference IDs.""" return list(self._reference_embeddings.keys()) # Export public interface __all__ = ["FaceRecognizer", "FaceDetection", "FaceMatch"]