""" MediaPipe Face Mesh integration for 468-point landmark detection. Includes Gaussian filtering for landmark stability and EAR-based blink detection. """ import os import math import numpy as np import scipy.ndimage as ndimage from scipy.ndimage import gaussian_filter1d # Lazy loading _face_mesh = None _landmark_history = [] _HISTORY_SIZE = 5 # Number of frames for temporal smoothing def get_face_mesh(): """Lazy-load MediaPipe Face Mesh using Modern Tasks API.""" global _face_mesh if _face_mesh is None: try: import mediapipe as mp BaseOptions = mp.tasks.BaseOptions FaceLandmarker = mp.tasks.vision.FaceLandmarker FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions VisionRunningMode = mp.tasks.vision.RunningMode task_path = os.path.join(os.path.dirname(__file__), "face_landmarker.task") if not os.path.exists(task_path): print("Warning: face_landmarker.task not found. Mesh disabled.") _face_mesh = "DISABLED" return _face_mesh options = FaceLandmarkerOptions( base_options=BaseOptions(model_asset_path=task_path), running_mode=VisionRunningMode.IMAGE, num_faces=1, min_face_detection_confidence=0.5, min_face_presence_confidence=0.5, min_tracking_confidence=0.5 ) _face_mesh = FaceLandmarker.create_from_options(options) print("MediaPipe Tasks FaceLandmarker successfully initialized.") except Exception as e: print(f"Warning: MediaPipe failed to load. Mesh disabled. {e}") _face_mesh = "DISABLED" return _face_mesh def process_frame(frame_rgb): """ Process a frame and return face mesh landmarks. Input: RGB image (numpy array) Output: dict with landmarks, blink info, head pose estimate """ try: import mediapipe as mp mesh = get_face_mesh() if mesh in ["DISABLED", None]: return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}} # Convert NumPy array to MediaPipe Image object mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb) # Detect using modern Tasks API results = mesh.detect(mp_image) if not results.face_landmarks: return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}} except Exception as e: print(f"MediaPipe processing error bypassed: {e}") return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}} face_landmarks = results.face_landmarks[0] h, w = frame_rgb.shape[:2] # Extract all 468 landmarks as [x, y, z] normalized raw_landmarks = [] for lm in face_landmarks: raw_landmarks.append([lm.x, lm.y, lm.z]) raw_landmarks = np.array(raw_landmarks) # Apply Gaussian filtering for temporal stability smoothed = apply_gaussian_smoothing(raw_landmarks) # Convert to pixel coordinates for frontend pixel_landmarks = [] for pt in smoothed: pixel_landmarks.append({ "x": round(float(pt[0]) * w, 2), "y": round(float(pt[1]) * h, 2), "z": round(float(pt[2]) * 1000, 2), }) # Normalized landmarks for frontend canvas (0-1 range) normalized_landmarks = [] for pt in smoothed: normalized_landmarks.append({ "x": round(float(pt[0]), 4), "y": round(float(pt[1]), 4), "z": round(float(pt[2]), 4), }) # Blink detection using EAR blink_info = detect_blink(smoothed) # Head pose estimation (simplified) head_pose = estimate_head_pose(smoothed) return { "detected": True, "landmarks": normalized_landmarks, "pixel_landmarks": pixel_landmarks, "blink": blink_info, "head_pose": head_pose, "landmark_count": len(normalized_landmarks), } def apply_gaussian_smoothing(landmarks, sigma=1.0): """ Apply Gaussian filtering across temporal frames for landmark stability. Reduces jitter in real-time face mesh visualization. """ global _landmark_history _landmark_history.append(landmarks.copy()) if len(_landmark_history) > _HISTORY_SIZE: _landmark_history.pop(0) if len(_landmark_history) < 2: return landmarks # Stack history and apply Gaussian smoothing along the temporal axis history_array = np.array(_landmark_history) smoothed = np.zeros_like(landmarks) for i in range(landmarks.shape[0]): for j in range(3): # x, y, z smoothed[i, j] = gaussian_filter1d(history_array[:, i, j], sigma=sigma)[-1] return smoothed def detect_blink(landmarks): """ Detect blinks using Eye Aspect Ratio (EAR). Uses MediaPipe face mesh eye landmark indices. """ # Left eye landmark indices (MediaPipe) LEFT_EYE = [362, 385, 387, 263, 373, 380] # Right eye landmark indices RIGHT_EYE = [33, 160, 158, 133, 153, 144] def eye_aspect_ratio(eye_indices): pts = landmarks[eye_indices] # Vertical distances v1 = np.linalg.norm(pts[1] - pts[5]) v2 = np.linalg.norm(pts[2] - pts[4]) # Horizontal distance h = np.linalg.norm(pts[0] - pts[3]) if h == 0: return 0.3 return (v1 + v2) / (2.0 * h) left_ear = eye_aspect_ratio(LEFT_EYE) right_ear = eye_aspect_ratio(RIGHT_EYE) avg_ear = (left_ear + right_ear) / 2.0 BLINK_THRESHOLD = 0.21 blinking = avg_ear < BLINK_THRESHOLD return { "left_ear": round(float(left_ear), 3), "right_ear": round(float(right_ear), 3), "avg_ear": round(float(avg_ear), 3), "blinking": bool(blinking), } def estimate_head_pose(landmarks): """Simplified head pose estimation based on nose and forehead landmarks.""" nose_tip = landmarks[4] # Nose tip forehead = landmarks[10] # Forehead chin = landmarks[152] # Chin left_ear = landmarks[234] # Left ear right_ear = landmarks[454] # Right ear # Pitch (up/down) - based on nose-forehead vertical offset pitch = float(nose_tip[1] - forehead[1]) * 100 # Yaw (left/right) - based on ear horizontal symmetry center_x = (left_ear[0] + right_ear[0]) / 2 yaw = float(nose_tip[0] - center_x) * 100 # Roll (tilt) - based on ear vertical offset roll = float(left_ear[1] - right_ear[1]) * 100 return { "pitch": round(pitch, 2), "yaw": round(yaw, 2), "roll": round(roll, 2), "looking_at_screen": abs(yaw) < 15 and abs(pitch) < 20, } def reset(): """Reset landmark history (e.g., when starting a new session).""" global _landmark_history _landmark_history = []