| | """ |
| | ShortSmith v2 - Face Recognizer Module |
| | |
| | Face detection and recognition using InsightFace: |
| | - SCRFD for fast face detection |
| | - ArcFace for face embeddings and matching |
| | |
| | Used for person-specific filtering in highlight extraction. |
| | """ |
| |
|
| | from pathlib import Path |
| | from typing import List, Optional, Tuple, Union |
| | from dataclasses import dataclass |
| | import numpy as np |
| |
|
| | from utils.logger import get_logger, LogTimer |
| | from utils.helpers import ModelLoadError, InferenceError |
| | from config import get_config, ModelConfig |
| |
|
| | logger = get_logger("models.face_recognizer") |
| |
|
| |
|
| | @dataclass |
| | class FaceDetection: |
| | """Represents a detected face in an image.""" |
| | bbox: Tuple[int, int, int, int] |
| | confidence: float |
| | embedding: Optional[np.ndarray] |
| | landmarks: Optional[np.ndarray] |
| | age: Optional[int] = None |
| | gender: Optional[str] = None |
| |
|
| | @property |
| | def center(self) -> Tuple[int, int]: |
| | """Center point of face bounding box.""" |
| | x1, y1, x2, y2 = self.bbox |
| | return ((x1 + x2) // 2, (y1 + y2) // 2) |
| |
|
| | @property |
| | def area(self) -> int: |
| | """Area of face bounding box.""" |
| | x1, y1, x2, y2 = self.bbox |
| | return (x2 - x1) * (y2 - y1) |
| |
|
| | @property |
| | def width(self) -> int: |
| | return self.bbox[2] - self.bbox[0] |
| |
|
| | @property |
| | def height(self) -> int: |
| | return self.bbox[3] - self.bbox[1] |
| |
|
| |
|
| | @dataclass |
| | class FaceMatch: |
| | """Result of face matching.""" |
| | detection: FaceDetection |
| | similarity: float |
| | is_match: bool |
| | reference_id: Optional[str] = None |
| |
|
| |
|
| | class FaceRecognizer: |
| | """ |
| | Face detection and recognition using InsightFace. |
| | |
| | Supports: |
| | - Multi-face detection per frame |
| | - Face embedding extraction |
| | - Similarity-based face matching |
| | - Reference image registration |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[ModelConfig] = None, |
| | load_model: bool = True, |
| | ): |
| | """ |
| | Initialize face recognizer. |
| | |
| | Args: |
| | config: Model configuration |
| | load_model: Whether to load model immediately |
| | |
| | Raises: |
| | ImportError: If insightface is not installed |
| | """ |
| | self.config = config or get_config().model |
| | self.model = None |
| | self._reference_embeddings: dict = {} |
| |
|
| | if load_model: |
| | self._load_model() |
| |
|
| | logger.info(f"FaceRecognizer initialized (threshold={self.config.face_similarity_threshold})") |
| |
|
| | def _load_model(self) -> None: |
| | """Load InsightFace model.""" |
| | with LogTimer(logger, "Loading InsightFace model"): |
| | try: |
| | import insightface |
| | from insightface.app import FaceAnalysis |
| |
|
| | |
| | self.model = FaceAnalysis( |
| | name=self.config.face_detection_model, |
| | providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] |
| | if self.config.device == "cuda" else ['CPUExecutionProvider'], |
| | ) |
| |
|
| | |
| | self.model.prepare(ctx_id=0 if self.config.device == "cuda" else -1) |
| |
|
| | logger.info("InsightFace model loaded successfully") |
| |
|
| | except ImportError as e: |
| | raise ImportError( |
| | "InsightFace is required for face recognition. " |
| | "Install with: pip install insightface onnxruntime-gpu" |
| | ) from e |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to load InsightFace model: {e}") |
| | raise ModelLoadError(f"Could not load face recognition model: {e}") from e |
| |
|
| | def detect_faces( |
| | self, |
| | image: Union[str, Path, np.ndarray], |
| | max_faces: int = 10, |
| | min_confidence: float = 0.5, |
| | ) -> List[FaceDetection]: |
| | """ |
| | Detect faces in an image. |
| | |
| | Args: |
| | image: Image path or numpy array (BGR format) |
| | max_faces: Maximum faces to detect |
| | min_confidence: Minimum detection confidence |
| | |
| | Returns: |
| | List of FaceDetection objects |
| | |
| | Raises: |
| | InferenceError: If detection fails |
| | """ |
| | if self.model is None: |
| | raise ModelLoadError("Model not loaded") |
| |
|
| | try: |
| | import cv2 |
| |
|
| | |
| | if isinstance(image, (str, Path)): |
| | img = cv2.imread(str(image)) |
| | if img is None: |
| | raise InferenceError(f"Could not load image: {image}") |
| | else: |
| | img = image |
| |
|
| | |
| | faces = self.model.get(img, max_num=max_faces) |
| |
|
| | |
| | detections = [] |
| | for face in faces: |
| | if face.det_score < min_confidence: |
| | continue |
| |
|
| | bbox = tuple(map(int, face.bbox)) |
| | detection = FaceDetection( |
| | bbox=bbox, |
| | confidence=float(face.det_score), |
| | embedding=face.embedding if hasattr(face, 'embedding') else None, |
| | landmarks=face.kps if hasattr(face, 'kps') else None, |
| | age=int(face.age) if hasattr(face, 'age') else None, |
| | gender='M' if hasattr(face, 'gender') and face.gender == 1 else 'F' if hasattr(face, 'gender') else None, |
| | ) |
| | detections.append(detection) |
| |
|
| | logger.debug(f"Detected {len(detections)} faces") |
| | return detections |
| |
|
| | except Exception as e: |
| | logger.error(f"Face detection failed: {e}") |
| | raise InferenceError(f"Face detection failed: {e}") from e |
| |
|
| | def register_reference( |
| | self, |
| | reference_image: Union[str, Path, np.ndarray], |
| | reference_id: str = "target", |
| | ) -> bool: |
| | """ |
| | Register a reference face for matching. |
| | |
| | Args: |
| | reference_image: Image containing the reference face |
| | reference_id: Identifier for this reference |
| | |
| | Returns: |
| | True if registration successful |
| | |
| | Raises: |
| | InferenceError: If no face found in reference |
| | """ |
| | with LogTimer(logger, f"Registering reference face '{reference_id}'"): |
| | detections = self.detect_faces(reference_image, max_faces=1) |
| |
|
| | if not detections: |
| | raise InferenceError("No face detected in reference image") |
| |
|
| | if detections[0].embedding is None: |
| | raise InferenceError("Could not extract embedding from reference face") |
| |
|
| | self._reference_embeddings[reference_id] = detections[0].embedding |
| | logger.info(f"Registered reference face: {reference_id}") |
| | return True |
| |
|
| | def match_faces( |
| | self, |
| | image: Union[str, Path, np.ndarray], |
| | reference_id: str = "target", |
| | threshold: Optional[float] = None, |
| | ) -> List[FaceMatch]: |
| | """ |
| | Find faces matching a registered reference. |
| | |
| | Args: |
| | image: Image to search for matches |
| | reference_id: ID of reference to match against |
| | threshold: Similarity threshold (uses config if None) |
| | |
| | Returns: |
| | List of FaceMatch objects for all detected faces |
| | """ |
| | threshold = threshold or self.config.face_similarity_threshold |
| |
|
| | if reference_id not in self._reference_embeddings: |
| | logger.warning(f"Reference '{reference_id}' not registered") |
| | return [] |
| |
|
| | reference_embedding = self._reference_embeddings[reference_id] |
| | detections = self.detect_faces(image) |
| |
|
| | matches = [] |
| | for detection in detections: |
| | if detection.embedding is None: |
| | continue |
| |
|
| | similarity = self._cosine_similarity( |
| | reference_embedding, detection.embedding |
| | ) |
| |
|
| | matches.append(FaceMatch( |
| | detection=detection, |
| | similarity=similarity, |
| | is_match=similarity >= threshold, |
| | reference_id=reference_id, |
| | )) |
| |
|
| | |
| | matches.sort(key=lambda m: m.similarity, reverse=True) |
| | return matches |
| |
|
| | def find_target_in_frame( |
| | self, |
| | image: Union[str, Path, np.ndarray], |
| | reference_id: str = "target", |
| | threshold: Optional[float] = None, |
| | ) -> Optional[FaceMatch]: |
| | """ |
| | Find the best matching face in a frame. |
| | |
| | Args: |
| | image: Frame to search |
| | reference_id: Reference to match against |
| | threshold: Similarity threshold |
| | |
| | Returns: |
| | Best FaceMatch if found, None otherwise |
| | """ |
| | matches = self.match_faces(image, reference_id, threshold) |
| | matching = [m for m in matches if m.is_match] |
| |
|
| | if matching: |
| | return matching[0] |
| | return None |
| |
|
| | def compute_screen_time( |
| | self, |
| | frames: List[Union[str, Path, np.ndarray]], |
| | reference_id: str = "target", |
| | threshold: Optional[float] = None, |
| | ) -> float: |
| | """ |
| | Compute percentage of frames where target person appears. |
| | |
| | Args: |
| | frames: List of frames to analyze |
| | reference_id: Reference person to look for |
| | threshold: Match threshold |
| | |
| | Returns: |
| | Percentage of frames with target person (0-1) |
| | """ |
| | if not frames: |
| | return 0.0 |
| |
|
| | matches = 0 |
| | for frame in frames: |
| | try: |
| | match = self.find_target_in_frame(frame, reference_id, threshold) |
| | if match is not None: |
| | matches += 1 |
| | except Exception as e: |
| | logger.debug(f"Frame analysis failed: {e}") |
| |
|
| | screen_time = matches / len(frames) |
| | logger.info(f"Target screen time: {screen_time*100:.1f}% ({matches}/{len(frames)} frames)") |
| | return screen_time |
| |
|
| | def get_face_crop( |
| | self, |
| | image: Union[str, Path, np.ndarray], |
| | detection: FaceDetection, |
| | margin: float = 0.2, |
| | ) -> np.ndarray: |
| | """ |
| | Extract face crop from image. |
| | |
| | Args: |
| | image: Source image |
| | detection: Face detection with bounding box |
| | margin: Margin around face (0.2 = 20%) |
| | |
| | Returns: |
| | Cropped face image as numpy array |
| | """ |
| | import cv2 |
| |
|
| | if isinstance(image, (str, Path)): |
| | img = cv2.imread(str(image)) |
| | else: |
| | img = image |
| |
|
| | h, w = img.shape[:2] |
| | x1, y1, x2, y2 = detection.bbox |
| |
|
| | |
| | margin_x = int((x2 - x1) * margin) |
| | margin_y = int((y2 - y1) * margin) |
| |
|
| | x1 = max(0, x1 - margin_x) |
| | y1 = max(0, y1 - margin_y) |
| | x2 = min(w, x2 + margin_x) |
| | y2 = min(h, y2 + margin_y) |
| |
|
| | return img[y1:y2, x1:x2] |
| |
|
| | def _cosine_similarity( |
| | self, |
| | embedding1: np.ndarray, |
| | embedding2: np.ndarray, |
| | ) -> float: |
| | """Compute cosine similarity between embeddings.""" |
| | norm1 = np.linalg.norm(embedding1) |
| | norm2 = np.linalg.norm(embedding2) |
| |
|
| | if norm1 == 0 or norm2 == 0: |
| | return 0.0 |
| |
|
| | return float(np.dot(embedding1, embedding2) / (norm1 * norm2)) |
| |
|
| | def clear_references(self) -> None: |
| | """Clear all registered reference faces.""" |
| | self._reference_embeddings.clear() |
| | logger.info("Cleared all reference faces") |
| |
|
| | def get_registered_references(self) -> List[str]: |
| | """Get list of registered reference IDs.""" |
| | return list(self._reference_embeddings.keys()) |
| |
|
| |
|
| | |
| | __all__ = ["FaceRecognizer", "FaceDetection", "FaceMatch"] |
| |
|