dev_caio / models /face_recognizer.py
Chaitanya-aitf's picture
Initializing project from local
ad4e58a verified
"""
ShortSmith v2 - Face Recognizer Module
Face detection and recognition using InsightFace:
- SCRFD for fast face detection
- ArcFace for face embeddings and matching
Used for person-specific filtering in highlight extraction.
"""
from pathlib import Path
from typing import List, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np
from utils.logger import get_logger, LogTimer
from utils.helpers import ModelLoadError, InferenceError, validate_image_file
from config import get_config, ModelConfig
logger = get_logger("models.face_recognizer")
@dataclass
class FaceDetection:
"""Represents a detected face in an image."""
bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
confidence: float # Detection confidence
embedding: Optional[np.ndarray] # Face embedding (512-dim for ArcFace)
landmarks: Optional[np.ndarray] # Facial landmarks (5 points)
age: Optional[int] = None # Estimated age
gender: Optional[str] = None # Estimated gender
@property
def center(self) -> Tuple[int, int]:
"""Center point of face bounding box."""
x1, y1, x2, y2 = self.bbox
return ((x1 + x2) // 2, (y1 + y2) // 2)
@property
def area(self) -> int:
"""Area of face bounding box."""
x1, y1, x2, y2 = self.bbox
return (x2 - x1) * (y2 - y1)
@property
def width(self) -> int:
return self.bbox[2] - self.bbox[0]
@property
def height(self) -> int:
return self.bbox[3] - self.bbox[1]
@dataclass
class FaceMatch:
"""Result of face matching."""
detection: FaceDetection # The detected face
similarity: float # Cosine similarity to reference (0-1)
is_match: bool # Whether it matches reference
reference_id: Optional[str] = None # ID of matched reference
class FaceRecognizer:
"""
Face detection and recognition using InsightFace.
Supports:
- Multi-face detection per frame
- Face embedding extraction
- Similarity-based face matching
- Reference image registration
"""
def __init__(
self,
config: Optional[ModelConfig] = None,
load_model: bool = True,
):
"""
Initialize face recognizer.
Args:
config: Model configuration
load_model: Whether to load model immediately
Raises:
ImportError: If insightface is not installed
"""
self.config = config or get_config().model
self.model = None
self._reference_embeddings: dict = {}
if load_model:
self._load_model()
logger.info(f"FaceRecognizer initialized (threshold={self.config.face_similarity_threshold})")
def _load_model(self) -> None:
"""Load InsightFace model."""
with LogTimer(logger, "Loading InsightFace model"):
try:
import insightface
from insightface.app import FaceAnalysis
# Initialize FaceAnalysis app
self.model = FaceAnalysis(
name=self.config.face_detection_model,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
if self.config.device == "cuda" else ['CPUExecutionProvider'],
)
# Prepare with detection size
self.model.prepare(ctx_id=0 if self.config.device == "cuda" else -1)
logger.info("InsightFace model loaded successfully")
except ImportError as e:
raise ImportError(
"InsightFace is required for face recognition. "
"Install with: pip install insightface onnxruntime-gpu"
) from e
except Exception as e:
logger.error(f"Failed to load InsightFace model: {e}")
raise ModelLoadError(f"Could not load face recognition model: {e}") from e
def detect_faces(
self,
image: Union[str, Path, np.ndarray],
max_faces: int = 10,
min_confidence: float = 0.5,
) -> List[FaceDetection]:
"""
Detect faces in an image.
Args:
image: Image path or numpy array (BGR format)
max_faces: Maximum faces to detect
min_confidence: Minimum detection confidence
Returns:
List of FaceDetection objects
Raises:
InferenceError: If detection fails
"""
if self.model is None:
raise ModelLoadError("Model not loaded")
try:
import cv2
# Load image if path
if isinstance(image, (str, Path)):
img = cv2.imread(str(image))
if img is None:
raise InferenceError(f"Could not load image: {image}")
else:
img = image
# Detect faces
faces = self.model.get(img, max_num=max_faces)
# Convert to FaceDetection objects
detections = []
for face in faces:
if face.det_score < min_confidence:
continue
bbox = tuple(map(int, face.bbox))
detection = FaceDetection(
bbox=bbox,
confidence=float(face.det_score),
embedding=face.embedding if hasattr(face, 'embedding') else None,
landmarks=face.kps if hasattr(face, 'kps') else None,
age=int(face.age) if hasattr(face, 'age') else None,
gender='M' if hasattr(face, 'gender') and face.gender == 1 else 'F' if hasattr(face, 'gender') else None,
)
detections.append(detection)
logger.debug(f"Detected {len(detections)} faces")
return detections
except Exception as e:
logger.error(f"Face detection failed: {e}")
raise InferenceError(f"Face detection failed: {e}") from e
def register_reference(
self,
reference_image: Union[str, Path, np.ndarray],
reference_id: str = "target",
) -> bool:
"""
Register a reference face for matching.
Args:
reference_image: Image containing the reference face
reference_id: Identifier for this reference
Returns:
True if registration successful
Raises:
InferenceError: If no face found in reference
"""
with LogTimer(logger, f"Registering reference face '{reference_id}'"):
detections = self.detect_faces(reference_image, max_faces=1)
if not detections:
raise InferenceError("No face detected in reference image")
if detections[0].embedding is None:
raise InferenceError("Could not extract embedding from reference face")
self._reference_embeddings[reference_id] = detections[0].embedding
logger.info(f"Registered reference face: {reference_id}")
return True
def match_faces(
self,
image: Union[str, Path, np.ndarray],
reference_id: str = "target",
threshold: Optional[float] = None,
) -> List[FaceMatch]:
"""
Find faces matching a registered reference.
Args:
image: Image to search for matches
reference_id: ID of reference to match against
threshold: Similarity threshold (uses config if None)
Returns:
List of FaceMatch objects for all detected faces
"""
threshold = threshold or self.config.face_similarity_threshold
if reference_id not in self._reference_embeddings:
logger.warning(f"Reference '{reference_id}' not registered")
return []
reference_embedding = self._reference_embeddings[reference_id]
detections = self.detect_faces(image)
matches = []
for detection in detections:
if detection.embedding is None:
continue
similarity = self._cosine_similarity(
reference_embedding, detection.embedding
)
matches.append(FaceMatch(
detection=detection,
similarity=similarity,
is_match=similarity >= threshold,
reference_id=reference_id,
))
# Sort by similarity descending
matches.sort(key=lambda m: m.similarity, reverse=True)
return matches
def find_target_in_frame(
self,
image: Union[str, Path, np.ndarray],
reference_id: str = "target",
threshold: Optional[float] = None,
) -> Optional[FaceMatch]:
"""
Find the best matching face in a frame.
Args:
image: Frame to search
reference_id: Reference to match against
threshold: Similarity threshold
Returns:
Best FaceMatch if found, None otherwise
"""
matches = self.match_faces(image, reference_id, threshold)
matching = [m for m in matches if m.is_match]
if matching:
return matching[0] # Return best match
return None
def compute_screen_time(
self,
frames: List[Union[str, Path, np.ndarray]],
reference_id: str = "target",
threshold: Optional[float] = None,
) -> float:
"""
Compute percentage of frames where target person appears.
Args:
frames: List of frames to analyze
reference_id: Reference person to look for
threshold: Match threshold
Returns:
Percentage of frames with target person (0-1)
"""
if not frames:
return 0.0
matches = 0
for frame in frames:
try:
match = self.find_target_in_frame(frame, reference_id, threshold)
if match is not None:
matches += 1
except Exception as e:
logger.debug(f"Frame analysis failed: {e}")
screen_time = matches / len(frames)
logger.info(f"Target screen time: {screen_time*100:.1f}% ({matches}/{len(frames)} frames)")
return screen_time
def get_face_crop(
self,
image: Union[str, Path, np.ndarray],
detection: FaceDetection,
margin: float = 0.2,
) -> np.ndarray:
"""
Extract face crop from image.
Args:
image: Source image
detection: Face detection with bounding box
margin: Margin around face (0.2 = 20%)
Returns:
Cropped face image as numpy array
"""
import cv2
if isinstance(image, (str, Path)):
img = cv2.imread(str(image))
else:
img = image
h, w = img.shape[:2]
x1, y1, x2, y2 = detection.bbox
# Add margin
margin_x = int((x2 - x1) * margin)
margin_y = int((y2 - y1) * margin)
x1 = max(0, x1 - margin_x)
y1 = max(0, y1 - margin_y)
x2 = min(w, x2 + margin_x)
y2 = min(h, y2 + margin_y)
return img[y1:y2, x1:x2]
def _cosine_similarity(
self,
embedding1: np.ndarray,
embedding2: np.ndarray,
) -> float:
"""Compute cosine similarity between embeddings."""
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(np.dot(embedding1, embedding2) / (norm1 * norm2))
def clear_references(self) -> None:
"""Clear all registered reference faces."""
self._reference_embeddings.clear()
logger.info("Cleared all reference faces")
def get_registered_references(self) -> List[str]:
"""Get list of registered reference IDs."""
return list(self._reference_embeddings.keys())
# Export public interface
__all__ = ["FaceRecognizer", "FaceDetection", "FaceMatch"]