File size: 12,127 Bytes
ad4e58a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"""
ShortSmith v2 - Face Recognizer Module

Face detection and recognition using InsightFace:
- SCRFD for fast face detection
- ArcFace for face embeddings and matching

Used for person-specific filtering in highlight extraction.
"""

from pathlib import Path
from typing import List, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np

from utils.logger import get_logger, LogTimer
from utils.helpers import ModelLoadError, InferenceError, validate_image_file
from config import get_config, ModelConfig

logger = get_logger("models.face_recognizer")


@dataclass
class FaceDetection:
    """Represents a detected face in an image."""
    bbox: Tuple[int, int, int, int]  # (x1, y1, x2, y2)
    confidence: float                 # Detection confidence
    embedding: Optional[np.ndarray]   # Face embedding (512-dim for ArcFace)
    landmarks: Optional[np.ndarray]   # Facial landmarks (5 points)
    age: Optional[int] = None         # Estimated age
    gender: Optional[str] = None      # Estimated gender

    @property
    def center(self) -> Tuple[int, int]:
        """Center point of face bounding box."""
        x1, y1, x2, y2 = self.bbox
        return ((x1 + x2) // 2, (y1 + y2) // 2)

    @property
    def area(self) -> int:
        """Area of face bounding box."""
        x1, y1, x2, y2 = self.bbox
        return (x2 - x1) * (y2 - y1)

    @property
    def width(self) -> int:
        return self.bbox[2] - self.bbox[0]

    @property
    def height(self) -> int:
        return self.bbox[3] - self.bbox[1]


@dataclass
class FaceMatch:
    """Result of face matching."""
    detection: FaceDetection       # The detected face
    similarity: float              # Cosine similarity to reference (0-1)
    is_match: bool                 # Whether it matches reference
    reference_id: Optional[str] = None  # ID of matched reference


class FaceRecognizer:
    """
    Face detection and recognition using InsightFace.

    Supports:
    - Multi-face detection per frame
    - Face embedding extraction
    - Similarity-based face matching
    - Reference image registration
    """

    def __init__(
        self,
        config: Optional[ModelConfig] = None,
        load_model: bool = True,
    ):
        """
        Initialize face recognizer.

        Args:
            config: Model configuration
            load_model: Whether to load model immediately

        Raises:
            ImportError: If insightface is not installed
        """
        self.config = config or get_config().model
        self.model = None
        self._reference_embeddings: dict = {}

        if load_model:
            self._load_model()

        logger.info(f"FaceRecognizer initialized (threshold={self.config.face_similarity_threshold})")

    def _load_model(self) -> None:
        """Load InsightFace model."""
        with LogTimer(logger, "Loading InsightFace model"):
            try:
                import insightface
                from insightface.app import FaceAnalysis

                # Initialize FaceAnalysis app
                self.model = FaceAnalysis(
                    name=self.config.face_detection_model,
                    providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
                    if self.config.device == "cuda" else ['CPUExecutionProvider'],
                )

                # Prepare with detection size
                self.model.prepare(ctx_id=0 if self.config.device == "cuda" else -1)

                logger.info("InsightFace model loaded successfully")

            except ImportError as e:
                raise ImportError(
                    "InsightFace is required for face recognition. "
                    "Install with: pip install insightface onnxruntime-gpu"
                ) from e

            except Exception as e:
                logger.error(f"Failed to load InsightFace model: {e}")
                raise ModelLoadError(f"Could not load face recognition model: {e}") from e

    def detect_faces(
        self,
        image: Union[str, Path, np.ndarray],
        max_faces: int = 10,
        min_confidence: float = 0.5,
    ) -> List[FaceDetection]:
        """
        Detect faces in an image.

        Args:
            image: Image path or numpy array (BGR format)
            max_faces: Maximum faces to detect
            min_confidence: Minimum detection confidence

        Returns:
            List of FaceDetection objects

        Raises:
            InferenceError: If detection fails
        """
        if self.model is None:
            raise ModelLoadError("Model not loaded")

        try:
            import cv2

            # Load image if path
            if isinstance(image, (str, Path)):
                img = cv2.imread(str(image))
                if img is None:
                    raise InferenceError(f"Could not load image: {image}")
            else:
                img = image

            # Detect faces
            faces = self.model.get(img, max_num=max_faces)

            # Convert to FaceDetection objects
            detections = []
            for face in faces:
                if face.det_score < min_confidence:
                    continue

                bbox = tuple(map(int, face.bbox))
                detection = FaceDetection(
                    bbox=bbox,
                    confidence=float(face.det_score),
                    embedding=face.embedding if hasattr(face, 'embedding') else None,
                    landmarks=face.kps if hasattr(face, 'kps') else None,
                    age=int(face.age) if hasattr(face, 'age') else None,
                    gender='M' if hasattr(face, 'gender') and face.gender == 1 else 'F' if hasattr(face, 'gender') else None,
                )
                detections.append(detection)

            logger.debug(f"Detected {len(detections)} faces")
            return detections

        except Exception as e:
            logger.error(f"Face detection failed: {e}")
            raise InferenceError(f"Face detection failed: {e}") from e

    def register_reference(
        self,
        reference_image: Union[str, Path, np.ndarray],
        reference_id: str = "target",
    ) -> bool:
        """
        Register a reference face for matching.

        Args:
            reference_image: Image containing the reference face
            reference_id: Identifier for this reference

        Returns:
            True if registration successful

        Raises:
            InferenceError: If no face found in reference
        """
        with LogTimer(logger, f"Registering reference face '{reference_id}'"):
            detections = self.detect_faces(reference_image, max_faces=1)

            if not detections:
                raise InferenceError("No face detected in reference image")

            if detections[0].embedding is None:
                raise InferenceError("Could not extract embedding from reference face")

            self._reference_embeddings[reference_id] = detections[0].embedding
            logger.info(f"Registered reference face: {reference_id}")
            return True

    def match_faces(
        self,
        image: Union[str, Path, np.ndarray],
        reference_id: str = "target",
        threshold: Optional[float] = None,
    ) -> List[FaceMatch]:
        """
        Find faces matching a registered reference.

        Args:
            image: Image to search for matches
            reference_id: ID of reference to match against
            threshold: Similarity threshold (uses config if None)

        Returns:
            List of FaceMatch objects for all detected faces
        """
        threshold = threshold or self.config.face_similarity_threshold

        if reference_id not in self._reference_embeddings:
            logger.warning(f"Reference '{reference_id}' not registered")
            return []

        reference_embedding = self._reference_embeddings[reference_id]
        detections = self.detect_faces(image)

        matches = []
        for detection in detections:
            if detection.embedding is None:
                continue

            similarity = self._cosine_similarity(
                reference_embedding, detection.embedding
            )

            matches.append(FaceMatch(
                detection=detection,
                similarity=similarity,
                is_match=similarity >= threshold,
                reference_id=reference_id,
            ))

        # Sort by similarity descending
        matches.sort(key=lambda m: m.similarity, reverse=True)
        return matches

    def find_target_in_frame(
        self,
        image: Union[str, Path, np.ndarray],
        reference_id: str = "target",
        threshold: Optional[float] = None,
    ) -> Optional[FaceMatch]:
        """
        Find the best matching face in a frame.

        Args:
            image: Frame to search
            reference_id: Reference to match against
            threshold: Similarity threshold

        Returns:
            Best FaceMatch if found, None otherwise
        """
        matches = self.match_faces(image, reference_id, threshold)
        matching = [m for m in matches if m.is_match]

        if matching:
            return matching[0]  # Return best match
        return None

    def compute_screen_time(
        self,
        frames: List[Union[str, Path, np.ndarray]],
        reference_id: str = "target",
        threshold: Optional[float] = None,
    ) -> float:
        """
        Compute percentage of frames where target person appears.

        Args:
            frames: List of frames to analyze
            reference_id: Reference person to look for
            threshold: Match threshold

        Returns:
            Percentage of frames with target person (0-1)
        """
        if not frames:
            return 0.0

        matches = 0
        for frame in frames:
            try:
                match = self.find_target_in_frame(frame, reference_id, threshold)
                if match is not None:
                    matches += 1
            except Exception as e:
                logger.debug(f"Frame analysis failed: {e}")

        screen_time = matches / len(frames)
        logger.info(f"Target screen time: {screen_time*100:.1f}% ({matches}/{len(frames)} frames)")
        return screen_time

    def get_face_crop(
        self,
        image: Union[str, Path, np.ndarray],
        detection: FaceDetection,
        margin: float = 0.2,
    ) -> np.ndarray:
        """
        Extract face crop from image.

        Args:
            image: Source image
            detection: Face detection with bounding box
            margin: Margin around face (0.2 = 20%)

        Returns:
            Cropped face image as numpy array
        """
        import cv2

        if isinstance(image, (str, Path)):
            img = cv2.imread(str(image))
        else:
            img = image

        h, w = img.shape[:2]
        x1, y1, x2, y2 = detection.bbox

        # Add margin
        margin_x = int((x2 - x1) * margin)
        margin_y = int((y2 - y1) * margin)

        x1 = max(0, x1 - margin_x)
        y1 = max(0, y1 - margin_y)
        x2 = min(w, x2 + margin_x)
        y2 = min(h, y2 + margin_y)

        return img[y1:y2, x1:x2]

    def _cosine_similarity(
        self,
        embedding1: np.ndarray,
        embedding2: np.ndarray,
    ) -> float:
        """Compute cosine similarity between embeddings."""
        norm1 = np.linalg.norm(embedding1)
        norm2 = np.linalg.norm(embedding2)

        if norm1 == 0 or norm2 == 0:
            return 0.0

        return float(np.dot(embedding1, embedding2) / (norm1 * norm2))

    def clear_references(self) -> None:
        """Clear all registered reference faces."""
        self._reference_embeddings.clear()
        logger.info("Cleared all reference faces")

    def get_registered_references(self) -> List[str]:
        """Get list of registered reference IDs."""
        return list(self._reference_embeddings.keys())


# Export public interface
__all__ = ["FaceRecognizer", "FaceDetection", "FaceMatch"]