| """ |
| MediaPipe Face Mesh integration for 468-point landmark detection. |
| Includes Gaussian filtering for landmark stability and EAR-based blink detection. |
| """ |
| import os |
| import math |
| import numpy as np |
| import scipy.ndimage as ndimage |
| from scipy.ndimage import gaussian_filter1d |
|
|
| |
| _face_mesh = None |
| _landmark_history = [] |
| _HISTORY_SIZE = 5 |
|
|
|
|
| def get_face_mesh(): |
| """Lazy-load MediaPipe Face Mesh using Modern Tasks API.""" |
| global _face_mesh |
| if _face_mesh is None: |
| try: |
| import mediapipe as mp |
| BaseOptions = mp.tasks.BaseOptions |
| FaceLandmarker = mp.tasks.vision.FaceLandmarker |
| FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions |
| VisionRunningMode = mp.tasks.vision.RunningMode |
| |
| task_path = os.path.join(os.path.dirname(__file__), "face_landmarker.task") |
| if not os.path.exists(task_path): |
| print("Warning: face_landmarker.task not found. Mesh disabled.") |
| _face_mesh = "DISABLED" |
| return _face_mesh |
| |
| options = FaceLandmarkerOptions( |
| base_options=BaseOptions(model_asset_path=task_path), |
| running_mode=VisionRunningMode.IMAGE, |
| num_faces=1, |
| min_face_detection_confidence=0.5, |
| min_face_presence_confidence=0.5, |
| min_tracking_confidence=0.5 |
| ) |
| |
| _face_mesh = FaceLandmarker.create_from_options(options) |
| print("MediaPipe Tasks FaceLandmarker successfully initialized.") |
| except Exception as e: |
| print(f"Warning: MediaPipe failed to load. Mesh disabled. {e}") |
| _face_mesh = "DISABLED" |
| return _face_mesh |
|
|
| def process_frame(frame_rgb): |
| """ |
| Process a frame and return face mesh landmarks. |
| Input: RGB image (numpy array) |
| Output: dict with landmarks, blink info, head pose estimate |
| """ |
| try: |
| import mediapipe as mp |
| mesh = get_face_mesh() |
| if mesh in ["DISABLED", None]: |
| return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}} |
| |
| |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb) |
| |
| |
| results = mesh.detect(mp_image) |
| |
| if not results.face_landmarks: |
| return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}} |
| except Exception as e: |
| print(f"MediaPipe processing error bypassed: {e}") |
| return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}} |
|
|
| face_landmarks = results.face_landmarks[0] |
| h, w = frame_rgb.shape[:2] |
|
|
| |
| raw_landmarks = [] |
| for lm in face_landmarks: |
| raw_landmarks.append([lm.x, lm.y, lm.z]) |
|
|
| raw_landmarks = np.array(raw_landmarks) |
|
|
| |
| smoothed = apply_gaussian_smoothing(raw_landmarks) |
|
|
| |
| pixel_landmarks = [] |
| for pt in smoothed: |
| pixel_landmarks.append({ |
| "x": round(float(pt[0]) * w, 2), |
| "y": round(float(pt[1]) * h, 2), |
| "z": round(float(pt[2]) * 1000, 2), |
| }) |
|
|
| |
| normalized_landmarks = [] |
| for pt in smoothed: |
| normalized_landmarks.append({ |
| "x": round(float(pt[0]), 4), |
| "y": round(float(pt[1]), 4), |
| "z": round(float(pt[2]), 4), |
| }) |
|
|
| |
| blink_info = detect_blink(smoothed) |
|
|
| |
| head_pose = estimate_head_pose(smoothed) |
|
|
| return { |
| "detected": True, |
| "landmarks": normalized_landmarks, |
| "pixel_landmarks": pixel_landmarks, |
| "blink": blink_info, |
| "head_pose": head_pose, |
| "landmark_count": len(normalized_landmarks), |
| } |
|
|
|
|
| def apply_gaussian_smoothing(landmarks, sigma=1.0): |
| """ |
| Apply Gaussian filtering across temporal frames for landmark stability. |
| Reduces jitter in real-time face mesh visualization. |
| """ |
| global _landmark_history |
|
|
| _landmark_history.append(landmarks.copy()) |
| if len(_landmark_history) > _HISTORY_SIZE: |
| _landmark_history.pop(0) |
|
|
| if len(_landmark_history) < 2: |
| return landmarks |
|
|
| |
| history_array = np.array(_landmark_history) |
| smoothed = np.zeros_like(landmarks) |
| for i in range(landmarks.shape[0]): |
| for j in range(3): |
| smoothed[i, j] = gaussian_filter1d(history_array[:, i, j], sigma=sigma)[-1] |
|
|
| return smoothed |
|
|
|
|
| def detect_blink(landmarks): |
| """ |
| Detect blinks using Eye Aspect Ratio (EAR). |
| Uses MediaPipe face mesh eye landmark indices. |
| """ |
| |
| LEFT_EYE = [362, 385, 387, 263, 373, 380] |
| |
| RIGHT_EYE = [33, 160, 158, 133, 153, 144] |
|
|
| def eye_aspect_ratio(eye_indices): |
| pts = landmarks[eye_indices] |
| |
| v1 = np.linalg.norm(pts[1] - pts[5]) |
| v2 = np.linalg.norm(pts[2] - pts[4]) |
| |
| h = np.linalg.norm(pts[0] - pts[3]) |
| if h == 0: |
| return 0.3 |
| return (v1 + v2) / (2.0 * h) |
|
|
| left_ear = eye_aspect_ratio(LEFT_EYE) |
| right_ear = eye_aspect_ratio(RIGHT_EYE) |
| avg_ear = (left_ear + right_ear) / 2.0 |
|
|
| BLINK_THRESHOLD = 0.21 |
| blinking = avg_ear < BLINK_THRESHOLD |
|
|
| return { |
| "left_ear": round(float(left_ear), 3), |
| "right_ear": round(float(right_ear), 3), |
| "avg_ear": round(float(avg_ear), 3), |
| "blinking": bool(blinking), |
| } |
|
|
|
|
| def estimate_head_pose(landmarks): |
| """Simplified head pose estimation based on nose and forehead landmarks.""" |
| nose_tip = landmarks[4] |
| forehead = landmarks[10] |
| chin = landmarks[152] |
| left_ear = landmarks[234] |
| right_ear = landmarks[454] |
|
|
| |
| pitch = float(nose_tip[1] - forehead[1]) * 100 |
|
|
| |
| center_x = (left_ear[0] + right_ear[0]) / 2 |
| yaw = float(nose_tip[0] - center_x) * 100 |
|
|
| |
| roll = float(left_ear[1] - right_ear[1]) * 100 |
|
|
| return { |
| "pitch": round(pitch, 2), |
| "yaw": round(yaw, 2), |
| "roll": round(roll, 2), |
| "looking_at_screen": abs(yaw) < 15 and abs(pitch) < 20, |
| } |
|
|
|
|
| def reset(): |
| """Reset landmark history (e.g., when starting a new session).""" |
| global _landmark_history |
| _landmark_history = [] |
|
|