r-vasanthkumar73-dev's picture
Deploying backend and frontend folder modules.
099d157 verified
Raw
History Blame Contribute Delete
7.05 kB
"""
MediaPipe Face Mesh integration for 468-point landmark detection.
Includes Gaussian filtering for landmark stability and EAR-based blink detection.
"""
import os
import math
import numpy as np
import scipy.ndimage as ndimage
from scipy.ndimage import gaussian_filter1d
# Lazy loading
_face_mesh = None
_landmark_history = []
_HISTORY_SIZE = 5 # Number of frames for temporal smoothing
def get_face_mesh():
"""Lazy-load MediaPipe Face Mesh using Modern Tasks API."""
global _face_mesh
if _face_mesh is None:
try:
import mediapipe as mp
BaseOptions = mp.tasks.BaseOptions
FaceLandmarker = mp.tasks.vision.FaceLandmarker
FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode
task_path = os.path.join(os.path.dirname(__file__), "face_landmarker.task")
if not os.path.exists(task_path):
print("Warning: face_landmarker.task not found. Mesh disabled.")
_face_mesh = "DISABLED"
return _face_mesh
options = FaceLandmarkerOptions(
base_options=BaseOptions(model_asset_path=task_path),
running_mode=VisionRunningMode.IMAGE,
num_faces=1,
min_face_detection_confidence=0.5,
min_face_presence_confidence=0.5,
min_tracking_confidence=0.5
)
_face_mesh = FaceLandmarker.create_from_options(options)
print("MediaPipe Tasks FaceLandmarker successfully initialized.")
except Exception as e:
print(f"Warning: MediaPipe failed to load. Mesh disabled. {e}")
_face_mesh = "DISABLED"
return _face_mesh
def process_frame(frame_rgb):
"""
Process a frame and return face mesh landmarks.
Input: RGB image (numpy array)
Output: dict with landmarks, blink info, head pose estimate
"""
try:
import mediapipe as mp
mesh = get_face_mesh()
if mesh in ["DISABLED", None]:
return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}}
# Convert NumPy array to MediaPipe Image object
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
# Detect using modern Tasks API
results = mesh.detect(mp_image)
if not results.face_landmarks:
return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}}
except Exception as e:
print(f"MediaPipe processing error bypassed: {e}")
return {"detected": False, "landmarks": [], "blink": {"left_ear": 0, "right_ear": 0, "blinking": False}}
face_landmarks = results.face_landmarks[0]
h, w = frame_rgb.shape[:2]
# Extract all 468 landmarks as [x, y, z] normalized
raw_landmarks = []
for lm in face_landmarks:
raw_landmarks.append([lm.x, lm.y, lm.z])
raw_landmarks = np.array(raw_landmarks)
# Apply Gaussian filtering for temporal stability
smoothed = apply_gaussian_smoothing(raw_landmarks)
# Convert to pixel coordinates for frontend
pixel_landmarks = []
for pt in smoothed:
pixel_landmarks.append({
"x": round(float(pt[0]) * w, 2),
"y": round(float(pt[1]) * h, 2),
"z": round(float(pt[2]) * 1000, 2),
})
# Normalized landmarks for frontend canvas (0-1 range)
normalized_landmarks = []
for pt in smoothed:
normalized_landmarks.append({
"x": round(float(pt[0]), 4),
"y": round(float(pt[1]), 4),
"z": round(float(pt[2]), 4),
})
# Blink detection using EAR
blink_info = detect_blink(smoothed)
# Head pose estimation (simplified)
head_pose = estimate_head_pose(smoothed)
return {
"detected": True,
"landmarks": normalized_landmarks,
"pixel_landmarks": pixel_landmarks,
"blink": blink_info,
"head_pose": head_pose,
"landmark_count": len(normalized_landmarks),
}
def apply_gaussian_smoothing(landmarks, sigma=1.0):
"""
Apply Gaussian filtering across temporal frames for landmark stability.
Reduces jitter in real-time face mesh visualization.
"""
global _landmark_history
_landmark_history.append(landmarks.copy())
if len(_landmark_history) > _HISTORY_SIZE:
_landmark_history.pop(0)
if len(_landmark_history) < 2:
return landmarks
# Stack history and apply Gaussian smoothing along the temporal axis
history_array = np.array(_landmark_history)
smoothed = np.zeros_like(landmarks)
for i in range(landmarks.shape[0]):
for j in range(3): # x, y, z
smoothed[i, j] = gaussian_filter1d(history_array[:, i, j], sigma=sigma)[-1]
return smoothed
def detect_blink(landmarks):
"""
Detect blinks using Eye Aspect Ratio (EAR).
Uses MediaPipe face mesh eye landmark indices.
"""
# Left eye landmark indices (MediaPipe)
LEFT_EYE = [362, 385, 387, 263, 373, 380]
# Right eye landmark indices
RIGHT_EYE = [33, 160, 158, 133, 153, 144]
def eye_aspect_ratio(eye_indices):
pts = landmarks[eye_indices]
# Vertical distances
v1 = np.linalg.norm(pts[1] - pts[5])
v2 = np.linalg.norm(pts[2] - pts[4])
# Horizontal distance
h = np.linalg.norm(pts[0] - pts[3])
if h == 0:
return 0.3
return (v1 + v2) / (2.0 * h)
left_ear = eye_aspect_ratio(LEFT_EYE)
right_ear = eye_aspect_ratio(RIGHT_EYE)
avg_ear = (left_ear + right_ear) / 2.0
BLINK_THRESHOLD = 0.21
blinking = avg_ear < BLINK_THRESHOLD
return {
"left_ear": round(float(left_ear), 3),
"right_ear": round(float(right_ear), 3),
"avg_ear": round(float(avg_ear), 3),
"blinking": bool(blinking),
}
def estimate_head_pose(landmarks):
"""Simplified head pose estimation based on nose and forehead landmarks."""
nose_tip = landmarks[4] # Nose tip
forehead = landmarks[10] # Forehead
chin = landmarks[152] # Chin
left_ear = landmarks[234] # Left ear
right_ear = landmarks[454] # Right ear
# Pitch (up/down) - based on nose-forehead vertical offset
pitch = float(nose_tip[1] - forehead[1]) * 100
# Yaw (left/right) - based on ear horizontal symmetry
center_x = (left_ear[0] + right_ear[0]) / 2
yaw = float(nose_tip[0] - center_x) * 100
# Roll (tilt) - based on ear vertical offset
roll = float(left_ear[1] - right_ear[1]) * 100
return {
"pitch": round(pitch, 2),
"yaw": round(yaw, 2),
"roll": round(roll, 2),
"looking_at_screen": abs(yaw) < 15 and abs(pitch) < 20,
}
def reset():
"""Reset landmark history (e.g., when starting a new session)."""
global _landmark_history
_landmark_history = []