Spaces:
Sleeping
Sleeping
fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified | """ | |
| Facial Recognition Module for DetectifAI | |
| This module handles facial recognition for suspicious activity frames: | |
| - Face detection using MTCNN (primary) or OpenCV Haar cascades (fallback) | |
| - Face embeddings using FaceNet (primary) or histogram-based (fallback) | |
| - FAISS vector similarity search (primary) or cosine similarity (fallback) | |
| - MongoDB metadata storage with local JSON fallback | |
| - Integration with suspicious activity detection pipeline | |
| Workflow (matches activity diagram): | |
| 1. Receive frame from suspicious event (object detection) | |
| 2. Run face detection | |
| 3. If faces detected: crop faces, generate embeddings, store in FAISS/index | |
| 4. Upload face crops to storage, save metadata to MongoDB/JSON | |
| 5. Search for similar embeddings, link with previous incidents | |
| 6. Assign new person ID if no match found | |
| Author: DetectifAI Team | |
| """ | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import logging | |
| import json | |
| import uuid | |
| import time | |
| import warnings | |
| from typing import List, Tuple, Optional, Dict, Any | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Advanced imports (with fallbacks) | |
| try: | |
| import torch | |
| from facenet_pytorch import MTCNN, InceptionResnetV1 | |
| import faiss | |
| from pymongo import MongoClient | |
| from dotenv import load_dotenv | |
| import joblib | |
| ADVANCED_AVAILABLE = True | |
| load_dotenv() | |
| except ImportError: | |
| ADVANCED_AVAILABLE = False | |
| warnings.filterwarnings('ignore') | |
| logger = logging.getLogger(__name__) | |
| # ======================================== | |
| # Configuration | |
| # ======================================== | |
| # MongoDB Configuration | |
| MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") if ADVANCED_AVAILABLE else None | |
| MONGO_DB_NAME = "detectifai" | |
| # FAISS Configuration | |
| FAISS_INDEX_PATH = "model/faiss_face_index.bin" | |
| FAISS_ID_MAP_PATH = "model/faiss_id_map.json" | |
| EMBEDDING_DIM = 512 # InceptionResnetV1 produces 512-dim embeddings | |
| # Trained Models Configuration | |
| TRAINED_MODEL_DIR = "model/trained_models" | |
| CLASSIFIER_PATH = os.path.join(TRAINED_MODEL_DIR, "classifier_svm.pkl") | |
| ENCODER_PATH = os.path.join(TRAINED_MODEL_DIR, "label_encoder.pkl") | |
| # Simple fallback configuration | |
| SIMPLE_INDEX_PATH = "model/simple_face_index.json" | |
| # Face storage | |
| FACES_DIR = "model/faces" | |
| # ======================================== | |
| # Data Models | |
| # ======================================== | |
| class FaceDetectionResult: | |
| """Result of face detection in a frame""" | |
| frame_path: str | |
| timestamp: float | |
| faces_detected: int | |
| face_embeddings: List[np.ndarray] | |
| face_bounding_boxes: List[Tuple[int, int, int, int]] | |
| face_confidence_scores: List[float] | |
| processing_time: float | |
| detected_face_ids: List[str] = None | |
| matched_persons: List[str] = None | |
| class SuspiciousPerson: | |
| """Information about a suspicious person""" | |
| person_id: str | |
| first_detected: float # timestamp | |
| last_seen: float # timestamp | |
| face_embedding: Optional[np.ndarray] | |
| associated_events: List[str] # event IDs where this person appeared | |
| threat_level: str | |
| notes: str | |
| detection_count: int | |
| face_id: str = "" # Primary face_id | |
| # ======================================== | |
| # Advanced Implementation (FAISS + FaceNet) | |
| # ======================================== | |
| class AdvancedFaceDetector: | |
| """Advanced face detector using MTCNN""" | |
| def __init__(self, device='cpu', min_face_size=60): # Increased from 40 to 60 for stricter filtering | |
| self.device = torch.device(device) | |
| self.mtcnn = MTCNN( | |
| image_size=160, | |
| margin=20, | |
| min_face_size=min_face_size, # Larger minimum to reject small circular objects | |
| thresholds=[0.8, 0.9, 0.9], # Very strict thresholds (was [0.7, 0.8, 0.8]) to eliminate false positives | |
| factor=0.709, | |
| keep_all=True, | |
| device=self.device | |
| ) | |
| logger.info(f"[AdvancedFaceDetector] Initialized MTCNN on {device} with min_face_size={min_face_size}, strict thresholds=[0.8, 0.9, 0.9]") | |
| def detect_faces(self, frame: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray], List[float]]: | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| boxes, probs = self.mtcnn.detect(rgb_frame, landmarks=False) | |
| if boxes is None: | |
| return [], [], [] | |
| faces = self.mtcnn.extract(rgb_frame, boxes, save_path=None) | |
| if faces is None: | |
| return [], [], [] | |
| valid_faces, valid_boxes, valid_probs = [], [], [] | |
| for face, prob, box in zip(faces, probs, boxes): | |
| # Very strict probability threshold (increased from 0.85 to 0.90) | |
| if face is not None and prob > 0.90: | |
| # Additional validation to filter false positives (e.g., tires, wheels) | |
| if self._is_valid_face(face, box): | |
| valid_faces.append(face) | |
| valid_boxes.append(box) | |
| valid_probs.append(prob) | |
| else: | |
| logger.debug(f"Rejected detection (prob={prob:.3f}) - failed quality validation") | |
| return valid_faces, valid_boxes, valid_probs | |
| def _is_valid_face(self, face_tensor: torch.Tensor, box: np.ndarray) -> bool: | |
| """Validate detected face to filter out false positives like tires, wheels, circular objects""" | |
| try: | |
| # 1. Check bounding box aspect ratio (faces should be ~1:1.2, not perfectly circular like tires) | |
| x1, y1, x2, y2 = box | |
| width = x2 - x1 | |
| height = y2 - y1 | |
| if width <= 0 or height <= 0: | |
| return False | |
| aspect_ratio = width / height | |
| # Reject if too circular (like tires) or too elongated - tightened range | |
| if aspect_ratio < 0.7 or aspect_ratio > 1.5: | |
| logger.debug(f"Rejected: aspect_ratio={aspect_ratio:.2f} (tires ~1.0, faces 0.75-1.35)") | |
| return False | |
| # 2. Check minimum face size (reject small detections) - increased to 60px | |
| if width < 60 or height < 60: | |
| logger.debug(f"Rejected: too small ({width}x{height}) - minimum is 60x60") | |
| return False | |
| # 3. Check face tensor for quality (reject blurry or low-contrast images like tire treads) | |
| face_np = face_tensor.permute(1, 2, 0).cpu().numpy() | |
| # Check variance (faces should have good contrast, tires are uniform) - increased threshold | |
| variance = np.var(face_np) | |
| if variance < 0.02: # Increased from 0.01 to 0.02 for stricter filtering | |
| logger.debug(f"Rejected: low variance={variance:.4f} (uniform object, likely tire)") | |
| return False | |
| # 4. Check edge density (faces have more complex edges than smooth tire surfaces) | |
| gray = cv2.cvtColor((face_np * 255).astype(np.uint8), cv2.COLOR_RGB2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| edge_density = np.sum(edges > 0) / (edges.shape[0] * edges.shape[1]) | |
| # Tires have uniform circular edges, faces have complex features - tightened range | |
| if edge_density < 0.08 or edge_density > 0.35: # Narrowed from (0.05, 0.4) to (0.08, 0.35) | |
| logger.debug(f"Rejected: edge_density={edge_density:.3f} (abnormal edge pattern)") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Face validation error: {e}") | |
| return False # Reject on error to be safe | |
| class AdvancedFaceEmbedder: | |
| """Advanced face embedder using FaceNet""" | |
| def __init__(self, device='cpu', weights='vggface2'): | |
| self.device = torch.device(device) | |
| self.model = InceptionResnetV1(pretrained=weights).eval().to(self.device) | |
| logger.info(f"[AdvancedFaceEmbedder] Loaded InceptionResnetV1 on {device}") | |
| def generate_embedding(self, face_tensor: torch.Tensor) -> np.ndarray: | |
| with torch.no_grad(): | |
| face_tensor = face_tensor.to(self.device).unsqueeze(0) | |
| embedding = self.model(face_tensor).cpu().numpy().flatten() | |
| return embedding | |
| class PersonClassifier: | |
| """Person identification using trained SVM classifier""" | |
| def __init__(self, classifier_path: str = CLASSIFIER_PATH, encoder_path: str = ENCODER_PATH, | |
| confidence_threshold: float = 0.5): | |
| self.confidence_threshold = confidence_threshold | |
| self.enabled = False | |
| if ADVANCED_AVAILABLE and os.path.exists(classifier_path) and os.path.exists(encoder_path): | |
| try: | |
| self.classifier = joblib.load(classifier_path) | |
| self.label_encoder = joblib.load(encoder_path) | |
| self.enabled = True | |
| logger.info(f"[PersonClassifier] ✅ Model loaded, {len(self.label_encoder.classes_)} identities recognized.") | |
| except Exception as e: | |
| logger.warning(f"[PersonClassifier] ⚠️ Failed to load model: {e}") | |
| else: | |
| logger.info("[PersonClassifier] Trained models not available, using generic face tracking") | |
| def identify_person(self, embedding: np.ndarray) -> Tuple[Optional[str], float]: | |
| """Identify person from face embedding using SVM classifier""" | |
| if not self.enabled: | |
| return None, 0.0 | |
| try: | |
| probs = self.classifier.predict_proba(embedding.reshape(1, -1))[0] | |
| best_idx = np.argmax(probs) | |
| conf = probs[best_idx] | |
| if conf >= self.confidence_threshold: | |
| return self.label_encoder.classes_[best_idx], float(conf) | |
| return None, float(conf) | |
| except Exception as e: | |
| logger.error(f"[PersonClassifier] Error: {e}") | |
| return None, 0.0 | |
| class FAISSFaceIndex: | |
| """FAISS index manager for fast similarity search""" | |
| def __init__(self, embedding_dim: int = 512, index_path: str = FAISS_INDEX_PATH, | |
| id_map_path: str = FAISS_ID_MAP_PATH): | |
| self.embedding_dim = embedding_dim | |
| self.index_path = index_path | |
| self.id_map_path = id_map_path | |
| self.index = None | |
| self.id_map = {} | |
| self.reverse_map = {} | |
| os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
| self._load_or_create_index() | |
| def _load_or_create_index(self): | |
| if os.path.exists(self.index_path) and os.path.exists(self.id_map_path): | |
| try: | |
| self.index = faiss.read_index(self.index_path) | |
| with open(self.id_map_path, 'r') as f: | |
| data = json.load(f) | |
| self.id_map = {int(k): v for k, v in data.items()} | |
| self.reverse_map = {v: int(k) for k, v in self.id_map.items()} | |
| logger.info(f"[FAISS] Loaded index with {self.index.ntotal} embeddings") | |
| except Exception as e: | |
| logger.warning(f"[FAISS] Error loading index: {e}") | |
| self._create_new_index() | |
| else: | |
| self._create_new_index() | |
| def _create_new_index(self): | |
| self.index = faiss.IndexFlatIP(self.embedding_dim) | |
| self.id_map = {} | |
| self.reverse_map = {} | |
| logger.info(f"[FAISS] Created new index (dim={self.embedding_dim})") | |
| def add_embedding(self, face_id: str, embedding: np.ndarray) -> int: | |
| if face_id in self.reverse_map: | |
| return self.reverse_map[face_id] | |
| embedding = embedding.astype('float32').reshape(1, -1) | |
| embedding = embedding / np.linalg.norm(embedding) | |
| idx = self.index.ntotal | |
| self.index.add(embedding) | |
| self.id_map[idx] = face_id | |
| self.reverse_map[face_id] = idx | |
| return idx | |
| def search(self, query_embedding: np.ndarray, k: int = 5, threshold: float = 0.6) -> List[Tuple[str, float]]: | |
| if self.index.ntotal == 0: | |
| return [] | |
| query_embedding = query_embedding.astype('float32').reshape(1, -1) | |
| query_embedding = query_embedding / np.linalg.norm(query_embedding) | |
| similarities, indices = self.index.search(query_embedding, min(k, self.index.ntotal)) | |
| results = [] | |
| for sim, idx in zip(similarities[0], indices[0]): | |
| if idx in self.id_map and sim >= threshold: | |
| results.append((self.id_map[idx], float(sim))) | |
| return results | |
| def save(self): | |
| os.makedirs(os.path.dirname(self.index_path), exist_ok=True) | |
| faiss.write_index(self.index, self.index_path) | |
| with open(self.id_map_path, 'w') as f: | |
| json.dump(self.id_map, f) | |
| class MongoDBFaceStorage: | |
| """MongoDB storage for face metadata""" | |
| def __init__(self, mongo_uri: str, db_name: str = MONGO_DB_NAME): | |
| try: | |
| self.client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) | |
| self.db = self.client[db_name] | |
| self.faces_collection = self.db['detected_faces'] | |
| self.client.server_info() # Test connection | |
| self.enabled = True | |
| logger.info("[MongoDB] Connected successfully") | |
| except Exception as e: | |
| logger.warning(f"[MongoDB] Connection failed: {e}") | |
| self.enabled = False | |
| def save_face(self, data: Dict) -> str: | |
| if not self.enabled: | |
| return "" | |
| data['detected_at'] = datetime.utcnow() | |
| if 'face_embedding' in data: | |
| del data['face_embedding'] # Don't store embeddings in MongoDB | |
| data['face_embedding'] = [] | |
| try: | |
| result = self.faces_collection.insert_one(data) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| logger.error(f"[MongoDB] Error saving face: {e}") | |
| return "" | |
| def close(self): | |
| if hasattr(self, 'client'): | |
| self.client.close() | |
| # ======================================== | |
| # Simple Implementation (OpenCV + Histograms) | |
| # ======================================== | |
| class SimpleFaceDetector: | |
| """Simple face detector using OpenCV Haar cascades""" | |
| def __init__(self, device='cpu'): | |
| self.device = device | |
| cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' | |
| self.face_cascade = cv2.CascadeClassifier(cascade_path) | |
| logger.info(f"[SimpleFaceDetector] Initialized with OpenCV Haar cascades") | |
| def detect_faces(self, frame: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray], List[float]]: | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = self.face_cascade.detectMultiScale(gray, 1.1, 4, minSize=(30, 30)) | |
| face_crops = [] | |
| boxes = [] | |
| confidences = [] | |
| for (x, y, w, h) in faces: | |
| face_crop = frame[y:y+h, x:x+w] | |
| face_crops.append(face_crop) | |
| boxes.append([x, y, x+w, y+h]) | |
| confidences.append(0.8) | |
| return face_crops, boxes, confidences | |
| class SimpleFaceEmbedder: | |
| """Simple face embedder using histograms""" | |
| def __init__(self, device='cpu'): | |
| self.device = device | |
| logger.info(f"[SimpleFaceEmbedder] Using histogram-based embeddings") | |
| def generate_embedding(self, face_crop: np.ndarray) -> np.ndarray: | |
| if isinstance(face_crop, np.ndarray) and len(face_crop.shape) == 3: | |
| face_resized = cv2.resize(face_crop, (64, 64)) | |
| hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV) | |
| hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180]) | |
| hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256]) | |
| hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256]) | |
| embedding = np.concatenate([hist_h.flatten(), hist_s.flatten(), hist_v.flatten()]) | |
| return embedding / np.linalg.norm(embedding) | |
| else: | |
| return np.random.rand(48) / np.linalg.norm(np.random.rand(48)) | |
| class SimpleFaceIndex: | |
| """Simple face index using cosine similarity""" | |
| def __init__(self, index_path: str = SIMPLE_INDEX_PATH): | |
| self.index_path = index_path | |
| self.faces_db = {} | |
| os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
| self._load_index() | |
| def _load_index(self): | |
| if os.path.exists(self.index_path): | |
| try: | |
| with open(self.index_path, 'r') as f: | |
| data = json.load(f) | |
| self.faces_db = {face_id: np.array(embedding) | |
| for face_id, embedding in data.items()} | |
| logger.info(f"[SimpleFaceIndex] Loaded {len(self.faces_db)} faces") | |
| except Exception as e: | |
| logger.warning(f"[SimpleFaceIndex] Error loading: {e}") | |
| self.faces_db = {} | |
| else: | |
| self.faces_db = {} | |
| def add_embedding(self, face_id: str, embedding: np.ndarray) -> int: | |
| if face_id in self.faces_db: | |
| return len(self.faces_db) | |
| self.faces_db[face_id] = embedding | |
| return len(self.faces_db) | |
| def search(self, query_embedding: np.ndarray, k: int = 5, threshold: float = 0.6) -> List[Tuple[str, float]]: | |
| if not self.faces_db: | |
| return [] | |
| similarities = [] | |
| for face_id, stored_embedding in self.faces_db.items(): | |
| similarity = np.dot(query_embedding, stored_embedding) / ( | |
| np.linalg.norm(query_embedding) * np.linalg.norm(stored_embedding)) | |
| if similarity >= threshold: | |
| similarities.append((face_id, float(similarity))) | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| return similarities[:k] | |
| def save(self): | |
| try: | |
| data = {face_id: embedding.tolist() | |
| for face_id, embedding in self.faces_db.items()} | |
| with open(self.index_path, 'w') as f: | |
| json.dump(data, f) | |
| logger.debug(f"[SimpleFaceIndex] Saved {len(self.faces_db)} faces") | |
| except Exception as e: | |
| logger.error(f"[SimpleFaceIndex] Error saving: {e}") | |
| # ======================================== | |
| # Main Facial Recognition Class | |
| # ======================================== | |
| class FacialRecognitionIntegrated: | |
| """ | |
| Unified facial recognition system for DetectifAI. | |
| Automatically uses advanced implementation (MTCNN + FaceNet + FAISS + MongoDB) | |
| if available, otherwise falls back to simple implementation (OpenCV + Histograms + JSON). | |
| Applies facial recognition ONLY to suspicious frames detected by object detection. | |
| """ | |
| def __init__(self, config): | |
| self.config = config | |
| self.enabled = getattr(config, 'enable_facial_recognition', False) | |
| self.confidence_threshold = getattr(config, 'face_recognition_confidence', 0.7) | |
| self.similarity_threshold = 0.6 | |
| self.device = 'cuda' if torch.cuda.is_available() and getattr(config, 'use_gpu_acceleration', False) else 'cpu' | |
| # Create faces directory | |
| self.faces_dir = Path(FACES_DIR) | |
| self.faces_dir.mkdir(exist_ok=True, parents=True) | |
| # Determine implementation mode | |
| self.advanced_mode = ADVANCED_AVAILABLE and self.enabled | |
| # Initialize components only if enabled | |
| if self.enabled: | |
| self._initialize_components() | |
| # Detection statistics | |
| self.detection_stats = { | |
| 'implementation_mode': 'advanced' if self.advanced_mode else 'simple', | |
| 'frames_processed': 0, | |
| 'faces_detected': 0, | |
| 'suspicious_persons_tracked': 0, | |
| 'reoccurrences_detected': 0, | |
| 'new_faces_added': 0, | |
| 'face_matches_found': 0 | |
| } | |
| # Suspicious persons database | |
| self.suspicious_persons_db = {} | |
| if not self.enabled: | |
| logger.info("[FacialRecognition] Disabled - skipping initialization") | |
| else: | |
| mode = "Advanced (MTCNN + FaceNet + FAISS)" if self.advanced_mode else "Simple (OpenCV + Histograms)" | |
| logger.info(f"[FacialRecognition] ✅ Initialized in {mode} mode") | |
| def _initialize_components(self): | |
| """Initialize facial recognition components based on available dependencies""" | |
| try: | |
| if self.advanced_mode: | |
| # Advanced implementation | |
| self.detector = AdvancedFaceDetector(self.device) | |
| self.embedder = AdvancedFaceEmbedder(self.device) | |
| self.face_index = FAISSFaceIndex() | |
| self.person_classifier = PersonClassifier() # Add trained SVM classifier | |
| # MongoDB storage (optional) | |
| if MONGO_URI: | |
| self.mongodb_storage = MongoDBFaceStorage(MONGO_URI) | |
| else: | |
| self.mongodb_storage = None | |
| logger.info("[FacialRecognition] MongoDB not configured, using local storage only") | |
| else: | |
| # Simple implementation | |
| self.detector = SimpleFaceDetector() | |
| self.embedder = SimpleFaceEmbedder() | |
| self.face_index = SimpleFaceIndex() | |
| self.person_classifier = None # No classifier in simple mode | |
| self.mongodb_storage = None | |
| except Exception as e: | |
| logger.error(f"[FacialRecognition] ❌ Initialization failed: {e}") | |
| self.enabled = False | |
| raise | |
| def _generate_face_id(self, frame_number: int, face_index: int, person_name: Optional[str] = None, event_id: str = "unknown") -> str: | |
| """Generate unique face ID""" | |
| prefix = f"{person_name.replace(' ', '_')}" if person_name else "unknown" | |
| unique_id = str(uuid.uuid4())[:8] | |
| return f"face_{prefix}_event_{event_id}_{frame_number:06d}_{face_index:02d}_{unique_id}" | |
| def _save_face_image(self, face_data, face_id: str) -> str: | |
| """Save face image to disk""" | |
| try: | |
| path = self.faces_dir / f"{face_id}.jpg" | |
| if self.advanced_mode and isinstance(face_data, torch.Tensor): | |
| # Convert tensor to numpy array (MTCNN returns normalized tensors in range [0, 1]) | |
| face_np = face_data.permute(1, 2, 0).cpu().numpy() | |
| # Convert from [0,1] float to [0,255] uint8 | |
| face_np = (face_np * 128 + 127.5).clip(0, 255).astype(np.uint8) | |
| # MTCNN outputs RGB, convert to BGR for OpenCV | |
| face_bgr = cv2.cvtColor(face_np, cv2.COLOR_RGB2BGR) | |
| # Resize to reasonable display size (e.g., 160x160) | |
| face_bgr = cv2.resize(face_bgr, (160, 160)) | |
| cv2.imwrite(str(path), face_bgr) | |
| logger.debug(f"Saved advanced face image to {path}") | |
| elif isinstance(face_data, np.ndarray): | |
| # Direct numpy array (from simple mode or already processed) | |
| # Ensure it's in proper format | |
| if face_data.dtype != np.uint8: | |
| face_data = (face_data * 255).astype(np.uint8) if face_data.max() <= 1.0 else face_data.astype(np.uint8) | |
| # Resize if too large | |
| if face_data.shape[0] > 300 or face_data.shape[1] > 300: | |
| face_data = cv2.resize(face_data, (160, 160)) | |
| cv2.imwrite(str(path), face_data) | |
| logger.debug(f"Saved simple face image to {path}") | |
| else: | |
| logger.error(f"Unknown face_data type: {type(face_data)}") | |
| return "" | |
| return str(path) | |
| except Exception as e: | |
| logger.error(f"[FacialRecognition] Error saving face image: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return "" | |
| def detect_faces_in_frame(self, frame_path: str, timestamp: float) -> FaceDetectionResult: | |
| """ | |
| Detect faces in a single frame (for suspicious frames only). | |
| Args: | |
| frame_path: Path to the frame image | |
| timestamp: Timestamp of the frame in video | |
| Returns: | |
| FaceDetectionResult with detected faces and metadata | |
| """ | |
| if not self.enabled: | |
| return FaceDetectionResult( | |
| frame_path=frame_path, | |
| timestamp=timestamp, | |
| faces_detected=0, | |
| face_embeddings=[], | |
| face_bounding_boxes=[], | |
| face_confidence_scores=[], | |
| processing_time=0.0 | |
| ) | |
| start_time = time.time() | |
| try: | |
| # Load frame | |
| frame = cv2.imread(frame_path) | |
| if frame is None: | |
| logger.error(f"Could not load frame: {frame_path}") | |
| return FaceDetectionResult( | |
| frame_path=frame_path, | |
| timestamp=timestamp, | |
| faces_detected=0, | |
| face_embeddings=[], | |
| face_bounding_boxes=[], | |
| face_confidence_scores=[], | |
| processing_time=0.0 | |
| ) | |
| # Detect faces | |
| faces, boxes, probs = self.detector.detect_faces(frame) | |
| # Generate embeddings and process faces | |
| face_embeddings = [] | |
| detected_face_ids = [] | |
| matched_persons = [] | |
| for i, (face, box, prob) in enumerate(zip(faces, boxes, probs)): | |
| # Generate embedding | |
| embedding = self.embedder.generate_embedding(face) | |
| face_embeddings.append(embedding) | |
| # Try person identification using trained classifier | |
| person_name, person_confidence = None, 0.0 | |
| if self.person_classifier and self.person_classifier.enabled: | |
| person_name, person_confidence = self.person_classifier.identify_person(embedding) | |
| # Search for similar faces in FAISS index | |
| matches = self.face_index.search(embedding, k=1, threshold=self.similarity_threshold) | |
| if matches: | |
| # Found matching face | |
| matched_face_id, similarity = matches[0] | |
| detected_face_ids.append(matched_face_id) | |
| if person_name: | |
| matched_persons.append(f"{person_name} (confidence: {person_confidence:.2f})") | |
| logger.info(f"👤 Known person identified: {person_name} (confidence: {person_confidence:.2f}, face similarity: {similarity:.3f})") | |
| else: | |
| matched_persons.append(f"person_{matched_face_id}") | |
| logger.info(f"👤 Face match found: {matched_face_id} (similarity: {similarity:.3f})") | |
| self.detection_stats['face_matches_found'] += 1 | |
| else: | |
| # New face - save to index | |
| frame_number = int(timestamp * 30) # Estimate frame number | |
| new_face_id = self._generate_face_id(frame_number, i, person_name, event_id=f"obj_detection_{int(timestamp)}") | |
| # Add to FAISS index | |
| self.face_index.add_embedding(new_face_id, embedding) | |
| # Save face image | |
| face_path = self._save_face_image(face, new_face_id) | |
| # Save metadata to MongoDB if available | |
| if self.mongodb_storage and self.mongodb_storage.enabled: | |
| face_metadata = { | |
| 'face_id': new_face_id, | |
| 'frame_path': frame_path, | |
| 'timestamp': timestamp, | |
| 'confidence': float(prob), | |
| 'person_name': person_name, | |
| 'person_confidence': float(person_confidence) if person_name else None, | |
| 'bounding_box': [int(x) for x in box], | |
| 'face_image_path': face_path | |
| } | |
| self.mongodb_storage.save_face(face_metadata) | |
| detected_face_ids.append(new_face_id) | |
| if person_name: | |
| matched_persons.append(f"{person_name} (NEW, confidence: {person_confidence:.2f})") | |
| logger.info(f"👤 NEW known person detected: {person_name} (confidence: {person_confidence:.2f})") | |
| else: | |
| matched_persons.append(f"new_unknown_person_{new_face_id}") | |
| logger.info(f"👤 NEW unknown face detected: {new_face_id}") | |
| self.detection_stats['new_faces_added'] += 1 | |
| # Save face index | |
| self.face_index.save() | |
| processing_time = time.time() - start_time | |
| self.detection_stats['frames_processed'] += 1 | |
| self.detection_stats['faces_detected'] += len(faces) | |
| # Convert boxes to expected format | |
| face_bounding_boxes = [(int(box[0]), int(box[1]), int(box[2]), int(box[3])) for box in boxes] | |
| result = FaceDetectionResult( | |
| frame_path=frame_path, | |
| timestamp=timestamp, | |
| faces_detected=len(faces), | |
| face_embeddings=face_embeddings, | |
| face_bounding_boxes=face_bounding_boxes, | |
| face_confidence_scores=probs, | |
| processing_time=processing_time, | |
| detected_face_ids=detected_face_ids, | |
| matched_persons=matched_persons | |
| ) | |
| if faces: | |
| logger.info(f"👤 Processed {len(faces)} faces in suspicious frame at {timestamp:.2f}s") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[FacialRecognition] Error processing frame {frame_path}: {e}") | |
| return FaceDetectionResult( | |
| frame_path=frame_path, | |
| timestamp=timestamp, | |
| faces_detected=0, | |
| face_embeddings=[], | |
| face_bounding_boxes=[], | |
| face_confidence_scores=[], | |
| processing_time=time.time() - start_time | |
| ) | |
| def track_suspicious_persons(self, face_results: List[FaceDetectionResult], | |
| detectifai_events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Track suspicious persons and detect re-occurrences.""" | |
| if not self.enabled or not face_results: | |
| logger.info("👤 Facial recognition disabled or no face results - skipping person tracking") | |
| return [] | |
| logger.info(f"👤 Tracking suspicious persons across {len(face_results)} face detection results") | |
| reoccurrence_events = [] | |
| person_timeline = {} # face_id -> list of timestamps | |
| # Build person timeline from face results | |
| for face_result in face_results: | |
| if face_result.detected_face_ids: | |
| for face_id in face_result.detected_face_ids: | |
| if face_id not in person_timeline: | |
| person_timeline[face_id] = [] | |
| person_timeline[face_id].append(face_result.timestamp) | |
| # Look for re-occurrences (same person appearing multiple times) | |
| for face_id, timestamps in person_timeline.items(): | |
| if len(timestamps) > 1: | |
| # Create re-occurrence event | |
| timestamps.sort() | |
| reoccurrence_event = { | |
| 'event_id': f"reoccurrence_{face_id}_{int(timestamps[-1])}", | |
| 'start_timestamp': timestamps[0], | |
| 'end_timestamp': timestamps[-1], | |
| 'event_type': 'suspicious_person_reoccurrence', | |
| 'confidence': 0.85, | |
| 'max_confidence': 0.85, | |
| 'keyframes': [r.frame_path for r in face_results if face_id in (r.detected_face_ids or [])], | |
| 'importance_score': 4.0, | |
| 'description': f"Suspicious person {face_id} appeared {len(timestamps)} times", | |
| 'detection_details': { | |
| 'person_id': face_id, | |
| 'appearances': len(timestamps), | |
| 'time_span': timestamps[-1] - timestamps[0], | |
| 'timestamps': timestamps | |
| } | |
| } | |
| reoccurrence_events.append(reoccurrence_event) | |
| self.detection_stats['reoccurrences_detected'] += 1 | |
| # Save face index | |
| if self.face_index: | |
| self.face_index.save() | |
| # Update statistics | |
| self.detection_stats['suspicious_persons_tracked'] = len(person_timeline) | |
| logger.info(f"👤 Person tracking complete: {len(person_timeline)} unique persons, {len(reoccurrence_events)} re-occurrences") | |
| return reoccurrence_events | |
| def search_person_by_image(self, image_path: str, k: int = 10, threshold: float = 0.6) -> List[Dict[str, Any]]: | |
| """ | |
| Search for a person by uploading their image. | |
| Args: | |
| image_path: Path to the uploaded image | |
| k: Number of top matches to return | |
| threshold: Similarity threshold for matches | |
| Returns: | |
| List of matched persons with their occurrences | |
| """ | |
| if not self.enabled: | |
| logger.warning("[FacialRecognition] System not enabled") | |
| return [] | |
| try: | |
| # Load the uploaded image | |
| frame = cv2.imread(image_path) | |
| if frame is None: | |
| logger.error(f"Could not load image: {image_path}") | |
| return [] | |
| # Detect faces in the uploaded image | |
| faces, boxes, probs = self.detector.detect_faces(frame) | |
| if not faces: | |
| logger.info("No faces detected in uploaded image") | |
| return [] | |
| # Use the first detected face for search | |
| query_face = faces[0] | |
| query_embedding = self.embedder.generate_embedding(query_face) | |
| # Search for similar faces in the database | |
| matches = self.face_index.search(query_embedding, k=k, threshold=threshold) | |
| if not matches: | |
| logger.info("No similar faces found in database") | |
| return [] | |
| # Group matches by person/event and gather occurrence information | |
| search_results = [] | |
| for face_id, similarity in matches: | |
| # Parse face_id to extract information | |
| # face_id format: face_{person}_{event}_{frame}_{face_index}_{unique_id} | |
| parts = face_id.split('_') | |
| if len(parts) >= 6: | |
| person_part = parts[1] if parts[1] != 'unknown' else 'Unknown Person' | |
| event_part = '_'.join(parts[2:4]) # event_obj_detection or similar | |
| # Check if we have face image saved | |
| face_image_path = str(self.faces_dir / f"{face_id}.jpg") | |
| has_face_image = os.path.exists(face_image_path) | |
| # Try to get person identification from trained classifier | |
| person_name, person_confidence = None, 0.0 | |
| if self.person_classifier and self.person_classifier.enabled: | |
| person_name, person_confidence = self.person_classifier.identify_person(query_embedding) | |
| result = { | |
| 'face_id': face_id, | |
| 'person_name': person_name if person_name else person_part.replace('_', ' ').title(), | |
| 'person_confidence': person_confidence, | |
| 'similarity_score': similarity, | |
| 'event_context': event_part, | |
| 'face_image_path': face_image_path if has_face_image else None, | |
| 'timestamp': self._extract_timestamp_from_face_id(face_id), | |
| 'detection_context': 'Suspicious Activity Detection' | |
| } | |
| search_results.append(result) | |
| else: | |
| # Fallback for differently formatted face_ids | |
| person_name, person_confidence = None, 0.0 | |
| if self.person_classifier and self.person_classifier.enabled: | |
| person_name, person_confidence = self.person_classifier.identify_person(query_embedding) | |
| result = { | |
| 'face_id': face_id, | |
| 'person_name': person_name if person_name else 'Unknown Person', | |
| 'person_confidence': person_confidence, | |
| 'similarity_score': similarity, | |
| 'event_context': 'security_event', | |
| 'face_image_path': str(self.faces_dir / f"{face_id}.jpg") if os.path.exists(self.faces_dir / f"{face_id}.jpg") else None, | |
| 'timestamp': 0.0, | |
| 'detection_context': 'Security Event' | |
| } | |
| search_results.append(result) | |
| # Sort by similarity score (highest first) | |
| search_results.sort(key=lambda x: x['similarity_score'], reverse=True) | |
| logger.info(f"👤 Image search complete: Found {len(search_results)} matches with similarity >= {threshold}") | |
| return search_results | |
| except Exception as e: | |
| logger.error(f"[FacialRecognition] Error in image search: {e}") | |
| return [] | |
| def _extract_timestamp_from_face_id(self, face_id: str) -> float: | |
| """Extract timestamp from face_id format""" | |
| try: | |
| parts = face_id.split('_') | |
| if len(parts) >= 6: | |
| # Try to extract from event part (e.g., event_obj_detection_123) | |
| for part in parts: | |
| if part.isdigit(): | |
| return float(part) | |
| return 0.0 | |
| except: | |
| return 0.0 | |
| def get_detection_stats(self) -> Dict[str, Any]: | |
| """Get facial recognition detection statistics""" | |
| stats = self.detection_stats.copy() | |
| if hasattr(self, 'face_index'): | |
| if self.advanced_mode: | |
| stats['total_faces_in_database'] = self.face_index.index.ntotal if self.face_index.index else 0 | |
| else: | |
| stats['total_faces_in_database'] = len(self.face_index.faces_db) if self.face_index else 0 | |
| return stats | |
| def cleanup(self): | |
| """Cleanup resources""" | |
| if hasattr(self, 'face_index'): | |
| self.face_index.save() | |
| if hasattr(self, 'mongodb_storage') and self.mongodb_storage: | |
| self.mongodb_storage.close() | |
| logger.info("[FacialRecognition] Cleanup completed") | |
| # For backward compatibility | |
| FacialRecognitionPlaceholder = FacialRecognitionIntegrated |