File size: 5,917 Bytes
ab8cebe
 
 
 
 
 
 
 
 
 
bb776bc
ab8cebe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff31e2d
 
 
 
 
 
 
 
 
ab8cebe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from deepface import DeepFace
import os
import cv2
import numpy as np
import logging
import time

logger = logging.getLogger(__name__)

class FaceAnalyzer:
    def __init__(self, detector_backend='retinaface'):
        """
        Face recognition and analysis engine using DeepFace.
        'opencv' is used for lightweight deployment (avoids OOM on HF Spaces).
        """
        self.detector_backend = detector_backend
        logger.info(f"Initialized FaceAnalyzer with {detector_backend} backend.")

    @staticmethod
    def _sanitize_results(obj):
        """
        Recursively converts NumPy types to standard Python types for JSON serialization.
        Handles float32, int64, ndarray, etc.
        """
        if isinstance(obj, dict):
            return {k: FaceAnalyzer._sanitize_results(v) for k, v in obj.items()}
        elif isinstance(obj, (list, tuple)):
            return [FaceAnalyzer._sanitize_results(i) for i in obj]
        
        # Check for NumPy types (even if not imported as np)
        type_name = type(obj).__name__
        if 'float' in type_name:
            return float(obj)
        elif 'int' in type_name:
            return int(obj)
        elif 'ndarray' in type_name:
            return FaceAnalyzer._sanitize_results(obj.tolist())
        elif hasattr(obj, 'item') and callable(getattr(obj, 'item')):
            return obj.item()
            
        return obj

    def analyze(self, img_path):
        """
        Analyzes an image for age, gender, emotion, and race.
        Returns the findings as a dict.
        """
        try:
            start_time = time.time()
            # Try to run all actions first.
            results = DeepFace.analyze(
                img_path=img_path,
                actions=['age', 'gender', 'emotion', 'race'],
                detector_backend=self.detector_backend,
                enforce_detection=False,
                silent=True
            )
            process_time = time.time() - start_time
            logger.info(f"Analysis completed in {process_time:.2f}s")
            
            if isinstance(results, list):
                final_results = {"faces": results, "count": len(results), "process_time": process_time}
            else:
                final_results = {"faces": [results], "count": 1, "process_time": process_time}
            
            return FaceAnalyzer._sanitize_results(final_results)
            
        except Exception as e:
            logger.warning(f"Full analysis failed ({str(e)}), trying detection only...")
            try:
                # Fallback to basic detection if models are missing
                detection_results = DeepFace.extract_faces(
                    img_path=img_path,
                    detector_backend=self.detector_backend,
                    enforce_detection=False
                )
                faces = []
                for face in detection_results:
                    faces.append({
                        "face_confidence": face.get('confidence', 0),
                        "dominant_gender": "Unknown",
                        "gender": {"Unknown": 100.0},
                        "age": 0,
                        "dominant_emotion": "unknown",
                        "emotion": {"unknown": 100.0},
                        "dominant_race": "unknown",
                        "race": {"unknown": 100.0},
                        "warning": "AI model weights are missing. Face detected but analysis unavailable."
                    })
                fallback_results = {"faces": faces, "count": len(faces), "process_time": 0.5, "partial": True}
                return FaceAnalyzer._sanitize_results(fallback_results)
            except Exception as e2:
                logger.error(f"Detection fallback failed: {str(e2)}")
                return {"error": f"ML Engine error: {str(e)}"}

    def verify(self, img1_path, img2_path, model_name='VGG-Face'):
        """
        Verifies if two images contain the same person.
        Models: 'VGG-Face', 'Facenet', 'OpenFace', 'DeepFace', 'DeepID', 'ArcFace', 'Dlib', 'SFace', 'GhostFaceNet'
        """
        try:
            start_time = time.time()
            result = DeepFace.verify(
                img1_path=img1_path,
                img2_path=img2_path,
                model_name=model_name,
                detector_backend=self.detector_backend,
                enforce_detection=False,
                silent=True
            )
            process_time = time.time() - start_time
            result['process_time'] = process_time
            return FaceAnalyzer._sanitize_results(result)
        except Exception as e:
            logger.error(f"Verification failed: {str(e)}")
            return {"error": str(e)}

    def find_in_db(self, img_path, db_path, model_name='VGG-Face'):
        """
        Finds the closest matches in a database folder.
        """
        try:
            start_time = time.time()
            results = DeepFace.find(
                img_path=img_path,
                db_path=db_path,
                model_name=model_name,
                detector_backend=self.detector_backend,
                enforce_detection=False,
                silent=True
            )
            process_time = time.time() - start_time
            logger.info(f"Search in DB completed in {process_time:.2f}s")
            # results is a list of dataframes
            matches = []
            if isinstance(results, list):
                for df in results:
                    if not df.empty:
                        matches.append(df.to_dict('records'))
            final_matches = {"matches": matches, "process_time": process_time}
            return FaceAnalyzer._sanitize_results(final_matches)
        except Exception as e:
            logger.error(f"Database search failed: {str(e)}")
            return {"error": str(e)}

analyzer = FaceAnalyzer()