import os import math import json import tempfile from dataclasses import dataclass from typing import Dict, List, Tuple, Optional import urllib.request import cv2 import numpy as np import pandas as pd import gradio as gr import mediapipe as mp from mediapipe import solutions from mediapipe.framework.formats import landmark_pb2 from mediapipe.tasks import python from mediapipe.tasks.python import vision # ------------------------- # Model download helper # ------------------------- def download_models(): """Download required MediaPipe models if not present""" models_dir = "/tmp/mediapipe_models" os.makedirs(models_dir, exist_ok=True) models = { "face_landmarker": { "url": "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task", "path": os.path.join(models_dir, "face_landmarker.task") }, "pose_landmarker": { "url": "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task", "path": os.path.join(models_dir, "pose_landmarker_heavy.task") } } for model_name, model_info in models.items(): if not os.path.exists(model_info["path"]): print(f"Downloading {model_name}...") urllib.request.urlretrieve(model_info["url"], model_info["path"]) print(f"✓ Downloaded {model_name}") return models["face_landmarker"]["path"], models["pose_landmarker"]["path"] # ------------------------- # Utils: geometry # ------------------------- def _dist(a: np.ndarray, b: np.ndarray) -> float: return float(np.linalg.norm(a - b)) def _safe_div(a: float, b: float, eps: float = 1e-8) -> float: return a / (b + eps) def eye_aspect_ratio(pts: Dict[int, np.ndarray], idx: List[int]) -> Optional[float]: """ EAR = (||p2-p6|| + ||p3-p5||) / (2*||p1-p4||) idx: [p1, p2, p3, p4, p5, p6] """ try: p1, p2, p3, p4, p5, p6 = [pts[i] for i in idx] except KeyError: return None A = _dist(p2, p6) B = _dist(p3, p5) C = _dist(p1, p4) return _safe_div((A + B), (2.0 * C)) def angle_3pts(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> Optional[float]: """ angle at point b in degrees formed by a-b-c """ ba = a - b bc = c - b nba = np.linalg.norm(ba) nbc = np.linalg.norm(bc) if nba < 1e-8 or nbc < 1e-8: return None cosang = float(np.dot(ba, bc) / (nba * nbc)) cosang = max(-1.0, min(1.0, cosang)) return float(np.degrees(np.arccos(cosang))) # ------------------------- # MediaPipe indices # ------------------------- # FaceMesh landmarks for EAR (same indices work for new API) LEFT_EYE_EAR_IDX = [33, 160, 158, 133, 153, 144] RIGHT_EYE_EAR_IDX = [362, 385, 387, 263, 373, 380] # Pose landmark indices for new API POSE_LANDMARKS = { "left_wrist": 15, "right_wrist": 16, "left_ankle": 27, "right_ankle": 28, "left_shoulder": 11, "right_shoulder": 12, "left_elbow": 13, "right_elbow": 14, "left_hip": 23, "right_hip": 24, "left_knee": 25, "right_knee": 26, } # ------------------------- # Drawing helpers for new API # ------------------------- mp_drawing = solutions.drawing_utils mp_drawing_styles = solutions.drawing_styles # Face mesh connections FACEMESH_TESSELATION = solutions.face_mesh.FACEMESH_TESSELATION FACEMESH_CONTOURS = solutions.face_mesh.FACEMESH_CONTOURS # Pose connections POSE_CONNECTIONS = solutions.pose.POSE_CONNECTIONS def draw_face_landmarks(image, face_landmarks): """Draw face landmarks on image using new API format - always draw full mesh""" if face_landmarks is None: return # Convert to landmark_pb2 format for drawing face_landmarks_proto = landmark_pb2.NormalizedLandmarkList() face_landmarks_proto.landmark.extend([ landmark_pb2.NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z) for lm in face_landmarks ]) # Always draw full tesselation mesh mp_drawing.draw_landmarks( image=image, landmark_list=face_landmarks_proto, connections=FACEMESH_TESSELATION, landmark_drawing_spec=None, connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_tesselation_style() ) # Also draw contours for clarity mp_drawing.draw_landmarks( image=image, landmark_list=face_landmarks_proto, connections=FACEMESH_CONTOURS, landmark_drawing_spec=None, connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_contours_style() ) def draw_pose_landmarks(image, pose_landmarks): """Draw pose landmarks on image using new API format""" if pose_landmarks is None: return # Convert to landmark_pb2 format for drawing pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList() pose_landmarks_proto.landmark.extend([ landmark_pb2.NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z) for lm in pose_landmarks ]) mp_drawing.draw_landmarks( image=image, landmark_list=pose_landmarks_proto, connections=POSE_CONNECTIONS, landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style() ) # ------------------------- # Blink detection # ------------------------- @dataclass class BlinkState: in_blink: bool = False blink_count: int = 0 consec_below: int = 0 def update_blink(state: BlinkState, ear: Optional[float], thr: float, min_consec: int) -> BlinkState: """ Basic blink logic: - ear below threshold for >= min_consec frames => blink start - when ear goes back above => blink end (count once) """ if ear is None: return state if ear < thr: state.consec_below += 1 if (not state.in_blink) and state.consec_below >= min_consec: state.in_blink = True else: if state.in_blink: state.blink_count += 1 state.in_blink = False state.consec_below = 0 return state # ------------------------- # Core processing with new API # ------------------------- def process_video( video_path: str, min_face_det_conf: float = 0.5, min_face_track_conf: float = 0.5, min_pose_det_conf: float = 0.5, min_pose_track_conf: float = 0.5, ear_threshold: float = 0.21, blink_min_consec: int = 2, max_frames: int = 0, ) -> Tuple[str, str, str, str]: """ Process video using new MediaPipe API with GPU support Face mesh is always drawn (not optional) """ # Download models first face_model_path, pose_model_path = download_models() cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError("Cannot open video. Please upload a valid video file.") fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 1e-6: fps = 30.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Output paths tmpdir = tempfile.mkdtemp(prefix="mp_analysis_") out_video = os.path.join(tmpdir, "annotated.mp4") out_csv = os.path.join(tmpdir, "per_frame_metrics.csv") out_json = os.path.join(tmpdir, "summary.json") out_report = os.path.join(tmpdir, "report.md") fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(out_video, fourcc, fps, (width, height)) # Create face landmarker with GPU delegate base_options_face = python.BaseOptions( model_asset_path=face_model_path, delegate=python.BaseOptions.Delegate.GPU ) face_options = vision.FaceLandmarkerOptions( base_options=base_options_face, running_mode=vision.RunningMode.VIDEO, num_faces=1, min_face_detection_confidence=min_face_det_conf, min_face_presence_confidence=min_face_track_conf, min_tracking_confidence=min_face_track_conf, output_face_blendshapes=False, output_facial_transformation_matrixes=False ) # Create pose landmarker with GPU delegate base_options_pose = python.BaseOptions( model_asset_path=pose_model_path, delegate=python.BaseOptions.Delegate.GPU ) pose_options = vision.PoseLandmarkerOptions( base_options=base_options_pose, running_mode=vision.RunningMode.VIDEO, num_poses=1, min_pose_detection_confidence=min_pose_det_conf, min_pose_presence_confidence=min_pose_track_conf, min_tracking_confidence=min_pose_track_conf, output_segmentation_masks=False ) with vision.FaceLandmarker.create_from_options(face_options) as face_landmarker, \ vision.PoseLandmarker.create_from_options(pose_options) as pose_landmarker: rows = [] prev_pts = {} left_blink = BlinkState() right_blink = BlinkState() frame_idx = 0 while True: ok, frame_bgr = cap.read() if not ok: break frame_idx += 1 if max_frames and frame_idx > max_frames: break # Convert to RGB and create MediaPipe Image frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb) # Timestamp in milliseconds timestamp_ms = int((frame_idx - 1) * 1000 / fps) # Process with new API face_result = face_landmarker.detect_for_video(mp_image, timestamp_ms) pose_result = pose_landmarker.detect_for_video(mp_image, timestamp_ms) # Extract face landmarks face_pts: Dict[int, np.ndarray] = {} face_landmarks = None if face_result.face_landmarks: face_landmarks = face_result.face_landmarks[0] for i, lm in enumerate(face_landmarks): face_pts[i] = np.array([lm.x * width, lm.y * height], dtype=np.float32) # Calculate EAR left_ear = eye_aspect_ratio(face_pts, LEFT_EYE_EAR_IDX) right_ear = eye_aspect_ratio(face_pts, RIGHT_EYE_EAR_IDX) left_blink = update_blink(left_blink, left_ear, ear_threshold, blink_min_consec) right_blink = update_blink(right_blink, right_ear, ear_threshold, blink_min_consec) # Extract pose landmarks pose_norm: Dict[str, Optional[np.ndarray]] = {} pose_px: Dict[str, Optional[np.ndarray]] = {} pose_landmarks = None if pose_result.pose_landmarks: pose_landmarks = pose_result.pose_landmarks[0] for name, idx in POSE_LANDMARKS.items(): if idx < len(pose_landmarks): lm = pose_landmarks[idx] pose_norm[name] = np.array([lm.x, lm.y], dtype=np.float32) pose_px[name] = np.array([lm.x * width, lm.y * height], dtype=np.float32) else: pose_norm[name] = None pose_px[name] = None else: for name in POSE_LANDMARKS: pose_norm[name] = None pose_px[name] = None # Movement metrics def movement_metrics(key: str): cur = pose_norm.get(key) if cur is None: return None, None prev = prev_pts.get(key) if prev is None: d = 0.0 else: d = float(np.linalg.norm(cur - prev)) v = d * fps prev_pts[key] = cur return d, v lw_d, lw_v = movement_metrics("left_wrist") rw_d, rw_v = movement_metrics("right_wrist") la_d, la_v = movement_metrics("left_ankle") ra_d, ra_v = movement_metrics("right_ankle") # Joint angles def get_angle(a, b, c): if a is None or b is None or c is None: return None return angle_3pts(a, b, c) left_elbow_ang = get_angle(pose_px["left_shoulder"], pose_px["left_elbow"], pose_px["left_wrist"]) right_elbow_ang = get_angle(pose_px["right_shoulder"], pose_px["right_elbow"], pose_px["right_wrist"]) left_knee_ang = get_angle(pose_px["left_hip"], pose_px["left_knee"], pose_px["left_ankle"]) right_knee_ang = get_angle(pose_px["right_hip"], pose_px["right_knee"], pose_px["right_ankle"]) # Draw overlays (face mesh is always drawn, not optional) draw_pose_landmarks(frame_bgr, pose_landmarks) draw_face_landmarks(frame_bgr, face_landmarks) # HUD text hud_lines = [ f"Frame: {frame_idx}/{total_frames if total_frames>0 else '?'} FPS:{fps:.1f}", f"EAR L:{left_ear:.3f}" if left_ear is not None else "EAR L:None", f"EAR R:{right_ear:.3f}" if right_ear is not None else "EAR R:None", f"Blinks L:{left_blink.blink_count} R:{right_blink.blink_count}", ] y0 = 24 for line in hud_lines: cv2.putText(frame_bgr, line, (12, y0), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) y0 += 22 writer.write(frame_bgr) rows.append({ "frame": frame_idx, "time_s": (frame_idx - 1) / fps, "left_ear": left_ear, "right_ear": right_ear, "lw_disp": lw_d, "rw_disp": rw_d, "la_disp": la_d, "ra_disp": ra_d, "lw_speed": lw_v, "rw_speed": rw_v, "la_speed": la_v, "ra_speed": ra_v, "left_elbow_angle": left_elbow_ang, "right_elbow_angle": right_elbow_ang, "left_knee_angle": left_knee_ang, "right_knee_angle": right_knee_ang, }) cap.release() writer.release() df = pd.DataFrame(rows) # Summaries def _sum_series(s: pd.Series): s2 = s.dropna() if len(s2) == 0: return {"mean": None, "min": None, "max": None} return {"mean": float(s2.mean()), "min": float(s2.min()), "max": float(s2.max())} summary = { "video": { "fps": float(fps), "width": width, "height": height, "frames_processed": int(len(df)), "duration_s": float(len(df) / fps), }, "blink": { "ear_threshold": float(ear_threshold), "min_consecutive_frames": int(blink_min_consec), "left_blinks": int(left_blink.blink_count), "right_blinks": int(right_blink.blink_count), "left_blinks_per_min": float(_safe_div(left_blink.blink_count, (len(df)/fps)/60.0)) if len(df) else 0.0, "right_blinks_per_min": float(_safe_div(right_blink.blink_count, (len(df)/fps)/60.0)) if len(df) else 0.0, "left_ear_stats": _sum_series(df["left_ear"]), "right_ear_stats": _sum_series(df["right_ear"]), }, "limb_movement": { "total_disp": { "left_wrist": float(df["lw_disp"].fillna(0).sum()), "right_wrist": float(df["rw_disp"].fillna(0).sum()), "left_ankle": float(df["la_disp"].fillna(0).sum()), "right_ankle": float(df["ra_disp"].fillna(0).sum()), }, "speed_stats": { "left_wrist": _sum_series(df["lw_speed"]), "right_wrist": _sum_series(df["rw_speed"]), "left_ankle": _sum_series(df["la_speed"]), "right_ankle": _sum_series(df["ra_speed"]), }, "angle_stats_deg": { "left_elbow": _sum_series(df["left_elbow_angle"]), "right_elbow": _sum_series(df["right_elbow_angle"]), "left_knee": _sum_series(df["left_knee_angle"]), "right_knee": _sum_series(df["right_knee_angle"]), } } } # Save outputs df.to_csv(out_csv, index=False) with open(out_json, "w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2) report_md = f"""# MediaPipe Face + Pose Analysis Report (GPU Accelerated) ## Video Information - Resolution: {width} x {height} - FPS: {fps:.2f} - Frames Processed: {len(df)} - Duration: {summary["video"]["duration_s"]:.2f} seconds ## Blink Analysis (EAR) - Threshold: {ear_threshold} - Minimum Consecutive Frames: {blink_min_consec} - Left Eye Blinks: {summary["blink"]["left_blinks"]} ({summary["blink"]["left_blinks_per_min"]:.2f} blinks/min) - Right Eye Blinks: {summary["blink"]["right_blinks"]} ({summary["blink"]["right_blinks_per_min"]:.2f} blinks/min) - Left Eye EAR: mean={summary["blink"]["left_ear_stats"]["mean"]} min={summary["blink"]["left_ear_stats"]["min"]} max={summary["blink"]["left_ear_stats"]["max"]} - Right Eye EAR: mean={summary["blink"]["right_ear_stats"]["mean"]} min={summary["blink"]["right_ear_stats"]["min"]} max={summary["blink"]["right_ear_stats"]["max"]} ## Limb Movement (Normalized Units) > Displacement/speed calculated based on normalized coordinates (0~1), suitable for relative comparison and trend analysis - Total Displacement (higher = more movement): - Left Wrist: {summary["limb_movement"]["total_disp"]["left_wrist"]:.6f} - Right Wrist: {summary["limb_movement"]["total_disp"]["right_wrist"]:.6f} - Left Ankle: {summary["limb_movement"]["total_disp"]["left_ankle"]:.6f} - Right Ankle: {summary["limb_movement"]["total_disp"]["right_ankle"]:.6f} ## Output Files - annotated.mp4: Video with pose skeleton and face mesh overlays - per_frame_metrics.csv: Frame-by-frame metrics - summary.json: Statistical summary **Processed with GPU acceleration | New Face Landmarker API | Full Face Mesh Always Enabled** """ with open(out_report, "w", encoding="utf-8") as f: f.write(report_md) return out_video, out_csv, out_json, out_report # ------------------------- # Gradio UI # ------------------------- def ui_process( video, min_face_det_conf, min_face_track_conf, min_pose_det_conf, min_pose_track_conf, ear_threshold, blink_min_consec, max_frames ): if isinstance(video, dict) and "path" in video: video_path = video["path"] else: video_path = video try: out_video, out_csv, out_json, out_report = process_video( video_path=str(video_path), min_face_det_conf=float(min_face_det_conf), min_face_track_conf=float(min_face_track_conf), min_pose_det_conf=float(min_pose_det_conf), min_pose_track_conf=float(min_pose_track_conf), ear_threshold=float(ear_threshold), blink_min_consec=int(blink_min_consec), max_frames=int(max_frames), ) with open(out_report, "r", encoding="utf-8") as f: report_text = f.read() return out_video, out_csv, out_json, report_text except Exception as e: import traceback error_msg = f"# Error Processing Video\n\n```\n{traceback.format_exc()}\n```" return None, None, None, error_msg demo = gr.Blocks(title="Video Pose + Face Analysis (GPU Accelerated)") with demo: gr.Markdown(""" ## Upload Video → MediaPipe GPU Acceleration → Pose + Face Mesh Tracking + Blink/Limb Analysis **Features:** - ✅ GPU Accelerated Processing - ✅ New Face Landmarker API (more accurate 478-point face mesh) - ✅ Full Face Mesh Always Enabled - ✅ Blink Detection (EAR Algorithm) - ✅ Limb Movement Quantification - ✅ Joint Angle Analysis """) with gr.Row(): video_in = gr.Video(label="Upload Video") with gr.Accordion("Parameters (defaults work well for most cases)", open=False): gr.Markdown("### Face Detection Parameters") min_face_det_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Face Detection Confidence Threshold") min_face_track_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Face Tracking Confidence Threshold") gr.Markdown("### Pose Detection Parameters") min_pose_det_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Pose Detection Confidence Threshold") min_pose_track_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Pose Tracking Confidence Threshold") gr.Markdown("### Blink Detection Parameters") ear_threshold = gr.Slider(0.10, 0.35, value=0.21, step=0.01, label="Blink Threshold (EAR, lower = stricter)") blink_min_consec = gr.Slider(1, 6, value=2, step=1, label="Blink Minimum Consecutive Frames (anti-jitter)") gr.Markdown("### Processing Options") max_frames = gr.Number(value=0, precision=0, label="Maximum Frames to Process (0 = process all, set to 300 for debugging)") run_btn = gr.Button("🚀 Start Analysis (GPU Accelerated)", variant="primary", size="lg") with gr.Row(): video_out = gr.Video(label="Output: Annotated Video") with gr.Row(): csv_out = gr.File(label="Per-Frame Metrics CSV") json_out = gr.File(label="Summary JSON") report_out = gr.Markdown() run_btn.click( fn=ui_process, inputs=[ video_in, min_face_det_conf, min_face_track_conf, min_pose_det_conf, min_pose_track_conf, ear_threshold, blink_min_consec, max_frames, ], outputs=[video_out, csv_out, json_out, report_out], ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)