Spaces:
Runtime error
Runtime error
| # facial_detection.py | |
| import cv2 | |
| import numpy as np | |
| from scipy.spatial import distance as dist | |
| from collections import deque | |
| import time | |
| from datetime import datetime | |
| class OpenCVFaceDetector: | |
| """Face detection and landmark estimation using OpenCV""" | |
| def __init__(self): | |
| # Load OpenCV's pre-trained face detection models | |
| self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml') | |
| self.mouth_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_smile.xml') | |
| # Try to load MediaPipe for better landmark detection (fallback if not available) | |
| self.use_mediapipe = False | |
| try: | |
| import mediapipe as mp | |
| self.mp_face_mesh = mp.solutions.face_mesh | |
| self.mp_drawing = mp.solutions.drawing_utils | |
| self.face_mesh = self.mp_face_mesh.FaceMesh( | |
| static_image_mode=False, | |
| max_num_faces=1, | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| self.use_mediapipe = True | |
| print("✅ Using MediaPipe for enhanced landmark detection") | |
| except ImportError: | |
| print("⚠️ MediaPipe not available, using OpenCV cascade classifiers") | |
| # Define landmark indices for MediaPipe (68-point equivalent) | |
| self.LEFT_EYE_INDICES = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246] | |
| self.RIGHT_EYE_INDICES = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398] | |
| self.MOUTH_INDICES = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 415, 310, 311, 312, 13, 82, 81, 80, 62] | |
| def detect_faces_opencv(self, frame): | |
| """Detect faces using OpenCV Haar cascades""" | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = self.face_cascade.detectMultiScale(gray, 1.3, 5) | |
| return faces, gray | |
| def estimate_landmarks_opencv(self, frame, face_rect): | |
| """Estimate key facial landmarks using OpenCV cascades""" | |
| x, y, w, h = face_rect | |
| roi_gray = frame[y:y+h, x:x+w] | |
| roi_color = frame[y:y+h, x:x+w] | |
| # Detect eyes | |
| eyes = self.eye_cascade.detectMultiScale(roi_gray, 1.1, 3) | |
| # Detect mouth/smile | |
| mouths = self.mouth_cascade.detectMultiScale(roi_gray, 1.1, 3) | |
| landmarks = {} | |
| # Process eyes | |
| if len(eyes) >= 2: | |
| # Sort eyes by x-coordinate (left to right) | |
| eyes = sorted(eyes, key=lambda e: e[0]) | |
| landmarks['left_eye'] = (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]//2) | |
| landmarks['right_eye'] = (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]//2) | |
| # Estimate eye corners based on eye rectangles | |
| landmarks['left_eye_corners'] = [ | |
| (x + eyes[0][0], y + eyes[0][1] + eyes[0][3]//2), # left corner | |
| (x + eyes[0][0] + eyes[0][2], y + eyes[0][1] + eyes[0][3]//2), # right corner | |
| (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1]), # top | |
| (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]) # bottom | |
| ] | |
| landmarks['right_eye_corners'] = [ | |
| (x + eyes[1][0], y + eyes[1][1] + eyes[1][3]//2), | |
| (x + eyes[1][0] + eyes[1][2], y + eyes[1][1] + eyes[1][3]//2), | |
| (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1]), | |
| (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]) | |
| ] | |
| # Process mouth | |
| if len(mouths) > 0: | |
| mouth = mouths[0] # Take the first detected mouth | |
| landmarks['mouth_center'] = (x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]//2) | |
| landmarks['mouth_corners'] = [ | |
| (x + mouth[0], y + mouth[1] + mouth[3]//2), # left corner | |
| (x + mouth[0] + mouth[2], y + mouth[1] + mouth[3]//2), # right corner | |
| (x + mouth[0] + mouth[2]//2, y + mouth[1]), # top | |
| (x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]) # bottom | |
| ] | |
| # Estimate nose tip (center of face, slightly above mouth) | |
| landmarks['nose_tip'] = (x + w//2, y + int(h*0.6)) | |
| # Estimate chin (bottom center of face) | |
| landmarks['chin'] = (x + w//2, y + h) | |
| return landmarks | |
| def detect_landmarks_mediapipe(self, frame): | |
| """Detect landmarks using MediaPipe""" | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = self.face_mesh.process(rgb_frame) | |
| landmarks_dict = {} | |
| if results.multi_face_landmarks: | |
| face_landmarks = results.multi_face_landmarks[0] | |
| h, w, _ = frame.shape | |
| # Extract eye landmarks | |
| left_eye_points = [] | |
| right_eye_points = [] | |
| mouth_points = [] | |
| for i in self.LEFT_EYE_INDICES[:6]: # Take first 6 points for eye shape | |
| point = face_landmarks.landmark[i] | |
| left_eye_points.append((int(point.x * w), int(point.y * h))) | |
| for i in self.RIGHT_EYE_INDICES[:6]: | |
| point = face_landmarks.landmark[i] | |
| right_eye_points.append((int(point.x * w), int(point.y * h))) | |
| for i in self.MOUTH_INDICES[:8]: # Take key mouth points | |
| point = face_landmarks.landmark[i] | |
| mouth_points.append((int(point.x * w), int(point.y * h))) | |
| landmarks_dict['left_eye_corners'] = left_eye_points | |
| landmarks_dict['right_eye_corners'] = right_eye_points | |
| landmarks_dict['mouth_corners'] = mouth_points | |
| # Key points | |
| nose_tip = face_landmarks.landmark[1] # Nose tip | |
| chin = face_landmarks.landmark[175] # Chin | |
| landmarks_dict['nose_tip'] = (int(nose_tip.x * w), int(nose_tip.y * h)) | |
| landmarks_dict['chin'] = (int(chin.x * w), int(chin.y * h)) | |
| # Calculate face bounding box | |
| x_coords = [int(lm.x * w) for lm in face_landmarks.landmark] | |
| y_coords = [int(lm.y * h) for lm in face_landmarks.landmark] | |
| face_rect = (min(x_coords), min(y_coords), | |
| max(x_coords) - min(x_coords), | |
| max(y_coords) - min(y_coords)) | |
| return face_rect, landmarks_dict | |
| return None, {} | |
| def detect_landmarks(self, frame): | |
| """Main method to detect face and landmarks""" | |
| if self.use_mediapipe: | |
| face_rect, landmarks = self.detect_landmarks_mediapipe(frame) | |
| if face_rect is not None: | |
| return [face_rect], [landmarks] | |
| # Fallback to OpenCV | |
| faces, gray = self.detect_faces_opencv(frame) | |
| landmarks_list = [] | |
| face_rects = [] | |
| for face in faces: | |
| landmarks = self.estimate_landmarks_opencv(gray, face) | |
| if landmarks: | |
| landmarks_list.append(landmarks) | |
| face_rects.append(face) | |
| return face_rects, landmarks_list | |
| class MetricsCalculator: | |
| """Calculate drowsiness metrics from facial landmarks""" | |
| def calculate_ear_from_points(eye_points): | |
| """Calculate Eye Aspect Ratio from eye corner points""" | |
| if len(eye_points) < 4: | |
| return 0.3 # Default value | |
| # For 4-point eye estimation: [left, right, top, bottom] | |
| if len(eye_points) == 4: | |
| left, right, top, bottom = eye_points | |
| # Vertical distances | |
| vertical_dist = dist.euclidean(top, bottom) | |
| # Horizontal distance | |
| horizontal_dist = dist.euclidean(left, right) | |
| if horizontal_dist == 0: | |
| return 0.3 | |
| ear = vertical_dist / horizontal_dist | |
| return ear | |
| # For 6-point eye estimation (MediaPipe style) | |
| elif len(eye_points) >= 6: | |
| # Calculate vertical distances | |
| v1 = dist.euclidean(eye_points[1], eye_points[5]) | |
| v2 = dist.euclidean(eye_points[2], eye_points[4]) | |
| # Horizontal distance | |
| h = dist.euclidean(eye_points[0], eye_points[3]) | |
| if h == 0: | |
| return 0.3 | |
| ear = (v1 + v2) / (2.0 * h) | |
| return ear | |
| return 0.3 | |
| def calculate_mar_from_points(mouth_points): | |
| """Calculate Mouth Aspect Ratio from mouth points""" | |
| if len(mouth_points) < 4: | |
| return 0.3 # Default value | |
| if len(mouth_points) == 4: | |
| # [left, right, top, bottom] | |
| left, right, top, bottom = mouth_points | |
| vertical_dist = dist.euclidean(top, bottom) | |
| horizontal_dist = dist.euclidean(left, right) | |
| if horizontal_dist == 0: | |
| return 0.3 | |
| mar = vertical_dist / horizontal_dist | |
| return mar | |
| elif len(mouth_points) >= 8: | |
| # More sophisticated mouth analysis | |
| # Calculate multiple vertical distances | |
| v1 = dist.euclidean(mouth_points[1], mouth_points[7]) | |
| v2 = dist.euclidean(mouth_points[2], mouth_points[6]) | |
| v3 = dist.euclidean(mouth_points[3], mouth_points[5]) | |
| # Horizontal distance | |
| h = dist.euclidean(mouth_points[0], mouth_points[4]) | |
| if h == 0: | |
| return 0.3 | |
| mar = (v1 + v2 + v3) / (3.0 * h) | |
| return mar | |
| return 0.3 | |
| def estimate_head_pose_simple(nose_tip, chin, frame_center): | |
| """Simple head pose estimation using nose and chin""" | |
| if nose_tip is None or chin is None: | |
| return np.array([0, 0, 0]) | |
| # Calculate head tilt based on nose-chin line deviation from vertical | |
| nose_chin_vector = np.array([chin[0] - nose_tip[0], chin[1] - nose_tip[1]]) | |
| vertical_vector = np.array([0, 1]) | |
| # Calculate angle from vertical | |
| dot_product = np.dot(nose_chin_vector, vertical_vector) | |
| norms = np.linalg.norm(nose_chin_vector) * np.linalg.norm(vertical_vector) | |
| if norms == 0: | |
| return np.array([0, 0, 0]) | |
| cos_angle = dot_product / norms | |
| angle = np.arccos(np.clip(cos_angle, -1, 1)) * 180 / np.pi | |
| # Determine direction of tilt | |
| if nose_chin_vector[0] < 0: | |
| angle = -angle | |
| # Simple pitch estimation based on nose position relative to frame center | |
| pitch = (nose_tip[1] - frame_center[1]) / frame_center[1] * 30 # Scale to degrees | |
| return np.array([pitch, 0, angle]) # [pitch, yaw, roll] | |
| class DrowsinessAnalyzer: | |
| """Analyze drowsiness based on facial metrics""" | |
| def __init__(self): | |
| # Thresholds | |
| self.EAR_THRESHOLD = 0.20 # Adjusted for OpenCV detection | |
| self.EAR_CONSECUTIVE_FRAMES = 15 | |
| self.YAWN_THRESHOLD = 0.8 # Adjusted for mouth detection | |
| self.YAWN_CONSECUTIVE_FRAMES = 10 | |
| self.NOD_THRESHOLD = 20 | |
| # Counters | |
| self.ear_counter = 0 | |
| self.yawn_counter = 0 | |
| self.nod_counter = 0 | |
| # History tracking | |
| self.ear_history = deque(maxlen=30) | |
| self.yawn_history = deque(maxlen=30) | |
| self.head_pose_history = deque(maxlen=30) | |
| def analyze_drowsiness(self, ear, mar, head_angles): | |
| """Analyze current metrics and return drowsiness indicators""" | |
| drowsiness_indicators = [] | |
| # Update history | |
| self.ear_history.append(ear) | |
| self.yawn_history.append(mar) | |
| self.head_pose_history.append(head_angles[0]) | |
| # Check EAR (eyes closed detection) | |
| if ear < self.EAR_THRESHOLD: | |
| self.ear_counter += 1 | |
| if self.ear_counter >= self.EAR_CONSECUTIVE_FRAMES: | |
| drowsiness_indicators.append("EYES_CLOSED") | |
| else: | |
| self.ear_counter = 0 | |
| # Check yawning | |
| if mar > self.YAWN_THRESHOLD: | |
| self.yawn_counter += 1 | |
| if self.yawn_counter >= self.YAWN_CONSECUTIVE_FRAMES: | |
| drowsiness_indicators.append("YAWNING") | |
| else: | |
| self.yawn_counter = 0 | |
| # Check head nodding | |
| if abs(head_angles[0]) > self.NOD_THRESHOLD: | |
| self.nod_counter += 1 | |
| if self.nod_counter >= 8: | |
| drowsiness_indicators.append("HEAD_NOD") | |
| else: | |
| self.nod_counter = 0 | |
| return drowsiness_indicators | |
| def get_severity_level(self, indicators): | |
| """Determine severity based on indicators""" | |
| if len(indicators) >= 2: | |
| return "critical" | |
| elif "EYES_CLOSED" in indicators: | |
| return "high" | |
| elif indicators: | |
| return "medium" | |
| else: | |
| return "normal" | |
| class AlertManager: | |
| """Manage alert generation and timing""" | |
| def __init__(self, cooldown_seconds=8): | |
| self.last_alert_time = 0 | |
| self.cooldown_seconds = cooldown_seconds | |
| def should_trigger_alert(self, indicators): | |
| """Check if alert should be triggered""" | |
| current_time = time.time() | |
| if indicators and (current_time - self.last_alert_time) > self.cooldown_seconds: | |
| self.last_alert_time = current_time | |
| return True | |
| return False | |
| class VisualizationRenderer: | |
| """Handle visual rendering of detection results""" | |
| def draw_landmarks_and_contours(frame, landmarks, face_rect): | |
| """Draw facial landmarks and detection areas""" | |
| x, y, w, h = face_rect | |
| cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
| # Draw eye areas | |
| if 'left_eye_corners' in landmarks: | |
| points = np.array(landmarks['left_eye_corners'], np.int32) | |
| cv2.polylines(frame, [points], True, (0, 255, 0), 2) | |
| if 'right_eye_corners' in landmarks: | |
| points = np.array(landmarks['right_eye_corners'], np.int32) | |
| cv2.polylines(frame, [points], True, (0, 255, 0), 2) | |
| # Draw mouth area | |
| if 'mouth_corners' in landmarks: | |
| points = np.array(landmarks['mouth_corners'], np.int32) | |
| cv2.polylines(frame, [points], True, (0, 255, 255), 2) | |
| # Draw key points | |
| key_points = ['nose_tip', 'chin'] | |
| for point_name in key_points: | |
| if point_name in landmarks: | |
| cv2.circle(frame, landmarks[point_name], 3, (255, 0, 0), -1) | |
| def draw_metrics_overlay(frame, ear, mar, head_angle, indicators): | |
| """Draw metrics and alerts on frame""" | |
| # Metrics text | |
| cv2.putText(frame, f"EAR: {ear:.3f}", (10, frame.shape[0] - 80), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| cv2.putText(frame, f"MAR: {mar:.3f}", (10, frame.shape[0] - 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| cv2.putText(frame, f"Head: {head_angle:.1f}°", (10, frame.shape[0] - 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Alert overlay | |
| if indicators: | |
| cv2.putText(frame, "⚠️ DROWSINESS ALERT! ⚠️", (50, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) | |
| class StatusLogger: | |
| """Handle logging and status tracking""" | |
| def __init__(self, max_logs=100): | |
| self.status_log = deque(maxlen=max_logs) | |
| def log(self, message): | |
| """Add timestamped log entry""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| self.status_log.append(f"[{timestamp}] {message}") | |
| def get_recent_logs(self, count=10): | |
| """Get recent log entries""" | |
| return list(self.status_log)[-count:] |